Spark job for demoing Spark metrics monitoring?

2015-01-21 Thread Otis Gospodnetic
Hi,

I'll be showing our Spark monitoring
http://blog.sematext.com/2014/10/07/apache-spark-monitoring/ at the
upcoming Spark Summit in NYC.  I'd like to run some/any Spark job that
really exercises Spark and makes it emit all its various metrics (so the
metrics charts are full of data and not blank or flat and boring).

Since we don't use Spark at Sematext yet, I was wondering if anyone could
recommend some Spark app/job that's easy to run, just to get some Spark job
to start emitting various Spark metrics?

Thanks,
Otis
--
Monitoring * Alerting * Anomaly Detection * Centralized Log Management
Solr  Elasticsearch Support * http://sematext.com/


Re: sparkcontext.objectFile return thousands of partitions

2015-01-21 Thread Sean Owen
You have 8 files, not 8 partitions. It does not follow that they should be
read as 8 partitions since they are presumably large and so you would be
stuck using at most 8 tasks in parallel to process. The number of
partitions is determined by Hadoop input splits and generally makes a
partition per block of data. If you know that this number is too high you
can request a number of partitions when you read it. Don't coalesce, just
read the desired number from the start.
On Jan 21, 2015 4:32 PM, Wang, Ningjun (LNG-NPV) 
ningjun.w...@lexisnexis.com wrote:

  Why sc.objectFile(…) return a Rdd with thousands of partitions?



 I save a rdd to file system using



 rdd.saveAsObjectFile(“file:///tmp/mydir”)



 Note that the rdd contains 7 millions object. I check the directory
 /tmp/mydir/, it contains 8 partitions



 part-0  part-2  part-4  part-6  _SUCCESS

 part-1  part-3  part-5  part-7



 I then load the rdd back using



 val rdd2 = sc.objectFile[LabeledPoint]( (“file:///tmp/mydir”, 8)



 I expect rdd2 to have 8 partitions. But from the master UI, I see that
 rdd2 has over 1000 partitions. This is very inefficient. How can I limit it
 to 8 partitions just like what is stored on the file system?



 Regards,



 *Ningjun Wang*

 Consulting Software Engineer

 LexisNexis

 121 Chanlon Road

 New Providence, NJ 07974-1541





Re: [SQL] Using HashPartitioner to distribute by column

2015-01-21 Thread Yin Huai
Hello Michael,

In Spark SQL, we have our internal concepts of Output Partitioning
(representing the partitioning scheme of an operator's output) and Required
Child Distribution (representing the requirement of input data distribution
of an operator) for a physical operator. Let's say we have two operators,
parent and child, and the parent takes the output of the child as its
input. At the end of query planning process, whenever the Output
Partitioning of the child does not satisfy the Required Child Distribution
of the parent, we will add an Exchange operator between the parent and
child to shuffle the data. Right now, we do not record the partitioning
scheme of an input table. So, I think even if you use partitionBy (or
DISTRIBUTE BY in SQL) to prepare your data, you still will see the Exchange
operator and your GROUP BY operation will be executed in a new stage (after
the Exchange).

Making Spark SQL aware of the partitioning scheme of input tables is a
useful optimization. I just created
https://issues.apache.org/jira/browse/SPARK-5354 to track it.

Thanks,

Yin



On Wed, Jan 21, 2015 at 8:43 AM, Michael Davies 
michael.belldav...@gmail.com wrote:

 Hi Cheng,

 Are you saying that by setting up the lineage schemaRdd.keyBy(_.getString(
 1)).partitionBy(new HashPartitioner(n)).values.applySchema(schema)
 then Spark SQL will know that an SQL “group by” on Customer Code will not
 have to shuffle?

 But the prepared will have already shuffled so we pay an upfront cost for
 future groupings (assuming we cache I suppose)

 Mick

 On 20 Jan 2015, at 20:44, Cheng Lian lian.cs@gmail.com wrote:

  First of all, even if the underlying dataset is partitioned as expected,
 a shuffle can’t be avoided. Because Spark SQL knows nothing about the
 underlying data distribution. However, this does reduce network IO.

 You can prepare your data like this (say CustomerCode is a string field
 with ordinal 1):

 val schemaRdd = sql(...)val schema = schemaRdd.schemaval prepared = 
 schemaRdd.keyBy(_.getString(1)).partitionBy(new 
 HashPartitioner(n)).values.applySchema(schema)

 n should be equal to spark.sql.shuffle.partitions.

 Cheng

 On 1/19/15 7:44 AM, Mick Davies wrote:


  Is it possible to use a HashPartioner or something similar to distribute a
 SchemaRDDs data by the hash of a particular column or set of columns.

 Having done this I would then hope that GROUP BY could avoid shuffle

 E.g. set up a HashPartioner on CustomerCode field so that

 SELECT CustomerCode, SUM(Cost)
 FROM Orders
 GROUP BY CustomerCode

 would not need to shuffle.

 Cheers
 Mick





 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/SQL-Using-HashPartitioner-to-distribute-by-column-tp21237.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



 ​





Re: Spark job stuck at RangePartitioner at Exchange.scala:79

2015-01-21 Thread Sunita Arvind
I was able to resolve this by adding rdd.collect() after every stage. This
enforced RDD evaluation and helped avoid the choke point.

regards
Sunita Kopppar

On Sat, Jan 17, 2015 at 12:56 PM, Sunita Arvind sunitarv...@gmail.com
wrote:

 Hi,

 My spark jobs suddenly started getting hung and here is the debug leading
 to it:
 Following the program, it seems to be stuck whenever I do any collect(),
 count or rdd.saveAsParquet file. AFAIK, any operation that requires data
 flow back to master causes this. I increased the memory to 5 MB. Also, as
 per the debug statements, the memory is sufficient enough. Also increased
 -Xss and

 15/01/17 11:44:16 INFO storage.MemoryStore: ensureFreeSpace(264808) called
 with curMem=0, maxMem=1019782103
 15/01/17 11:44:16 INFO storage.MemoryStore: Block broadcast_0 stored as
 values in memory (estimated size 258.6 KB, free 972.3 MB)
 15/01/17 11:44:16 INFO spark.SparkContext: Starting job: collect at
 SparkPlan.scala:85
 15/01/17 11:44:16 INFO storage.MemoryStore: ensureFreeSpace(210344) called
 with curMem=264808, maxMem=1019782103
 15/01/17 11:44:16 INFO storage.MemoryStore: Block broadcast_1 stored as
 values in memory (estimated size 205.4 KB, free 972.1 MB)
 15/01/17 11:44:16 INFO storage.MemoryStore: ensureFreeSpace(282200) called
 with curMem=475152, maxMem=1019782103
 15/01/17 11:44:16 INFO storage.MemoryStore: Block broadcast_2 stored as
 values in memory (estimated size 275.6 KB, free 971.8 MB)
 15/01/17 11:44:16 INFO spark.SparkContext: Starting job: RangePartitioner
 at Exchange.scala:79

 A bit of background which may or may not be relevant. The program was
 working fine in eclipse, however, was getting hung upon submission to the
 cluster. In an attempt to debug, I changed the version in build.sbt to
 match the one on the cluster

 sbt config when the program was working:
   org.apache.spark %% spark-core % 1.1.0 % provided,
   org.apache.spark %% spark-sql % 1.1.0 % provided,
   spark.jobserver % job-server-api % 0.4.0,
   com.github.nscala-time %% nscala-time % 1.6.0,
   org.apache.hadoop % hadoop-client % 2.3.0 % provided


 During debugging, I changed this to:
   org.apache.spark %% spark-core % 1.2.0 % provided,
   org.apache.spark %% spark-sql % 1.2.0 % provided,
   spark.jobserver % job-server-api % 0.4.0,
   com.github.nscala-time %% nscala-time % 1.6.0,
   org.apache.hadoop % hadoop-client % 2.5.0 % provided

 This is when the program started getting hung at the first rdd.count().
 Now, even after reverting the changes in build.sbt, my program is getting
 hung at the same point.

 Tried these config changes in addition to -Xmx and -Xss in the eclipse.ini
 to 5MB each and set the below vars programatically

 sparkConf.set(spark.akka.frameSize,10)
 sparkConf.set(spark.shuffle.spill,true)
 sparkConf.set(spark.driver.memory,512m)
 sparkConf.set(spark.executor.memory,1g)
 sparkConf.set(spark.driver.maxResultSize,1g)

 Please note. In eclipse as well as sbt the program kept throwing
 StackOverflow. Increasing Xss to 5 MB eliminated the problem,
 Could this be something unrelated to memory? The SchemaRDDs have close to
 400 columns and hence I am using StructType(StructField) and performing
 applySchema.

 My code cannot be shared right now. If required, I will edit it and post.
 regards
 Sunita





Re: SPARK-streaming app running 10x slower on YARN vs STANDALONE cluster

2015-01-21 Thread Mukesh Jha
numStreams is 5 in my case.

 ListJavaPairDStreambyte[], byte[] kafkaStreams = new
ArrayList(numStreams);
for (int i = 0; i  numStreams; i++) {
  kafkaStreams.add(KafkaUtils.createStream(sc, byte[].class,
byte[].class, DefaultDecoder.class, DefaultDecoder.class, kafkaConf,
topicMap, StorageLevel.MEMORY_ONLY_SER()));
}
JavaPairDStreambyte[], byte[] ks = sc.union(kafkaStreams.remove(0),
kafkaStreams);

On Wed, Jan 21, 2015 at 3:19 PM, Gerard Maas gerard.m...@gmail.com wrote:

 Hi Mukesh,

 How are you creating your receivers? Could you post the (relevant) code?

 -kr, Gerard.

 On Wed, Jan 21, 2015 at 9:42 AM, Mukesh Jha me.mukesh@gmail.com
 wrote:

 Hello Guys,

 I've re partitioned my kafkaStream so that it gets evenly distributed
 among the executors and the results are better.
 Still from the executors page it seems that only 1 executors all 8 cores
 are getting used and other executors are using just 1 core.

 Is this the correct interpretation based on the below data? If so how can
 we fix this?

 [image: Inline image 1]

 On Wed, Dec 31, 2014 at 7:22 AM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 Thats is kind of expected due to data locality. Though you should see
 some tasks running on the executors as the data gets replicated to
 other nodes and can therefore run tasks based on locality. You have
 two solutions

 1. kafkaStream.repartition() to explicitly repartition the received
 data across the cluster.
 2. Create multiple kafka streams and union them together.

 See
 http://spark.apache.org/docs/latest/streaming-programming-guide.html#reducing-the-processing-time-of-each-batch

 On Tue, Dec 30, 2014 at 1:43 AM, Mukesh Jha me.mukesh@gmail.com
 wrote:
  Thanks Sandy, It was the issue with the no of cores.
 
  Another issue I was facing is that tasks are not getting distributed
 evenly
  among all executors and are running on the NODE_LOCAL locality level
 i.e.
  all the tasks are running on the same executor where my
 kafkareceiver(s) are
  running even though other executors are idle.
 
  I configured spark.locality.wait=50 instead of the default 3000 ms,
 which
  forced the task rebalancing among nodes, let me know if there is a
 better
  way to deal with this.
 
 
  On Tue, Dec 30, 2014 at 12:09 AM, Mukesh Jha me.mukesh@gmail.com
  wrote:
 
  Makes sense, I've also tries it in standalone mode where all 3
 workers 
  driver were running on the same 8 core box and the results were
 similar.
 
  Anyways I will share the results in YARN mode with 8 core yarn
 containers.
 
  On Mon, Dec 29, 2014 at 11:58 PM, Sandy Ryza sandy.r...@cloudera.com
 
  wrote:
 
  When running in standalone mode, each executor will be able to use
 all 8
  cores on the box.  When running on YARN, each executor will only
 have access
  to 2 cores.  So the comparison doesn't seem fair, no?
 
  -Sandy
 
  On Mon, Dec 29, 2014 at 10:22 AM, Mukesh Jha 
 me.mukesh@gmail.com
  wrote:
 
  Nope, I am setting 5 executors with 2  cores each. Below is the
 command
  that I'm using to submit in YARN mode. This starts up 5 executor
 nodes and a
  drives as per the spark  application master UI.
 
  spark-submit --master yarn-cluster --num-executors 5 --driver-memory
  1024m --executor-memory 1024m --executor-cores 2 --class
  com.oracle.ci.CmsgK2H /homext/lib/MJ-ci-k2h.jar
 vm.cloud.com:2181/kafka
  spark-yarn avro 1 5000
 
  On Mon, Dec 29, 2014 at 11:45 PM, Sandy Ryza 
 sandy.r...@cloudera.com
  wrote:
 
  *oops, I mean are you setting --executor-cores to 8
 
  On Mon, Dec 29, 2014 at 10:15 AM, Sandy Ryza 
 sandy.r...@cloudera.com
  wrote:
 
  Are you setting --num-executors to 8?
 
  On Mon, Dec 29, 2014 at 10:13 AM, Mukesh Jha 
 me.mukesh@gmail.com
  wrote:
 
  Sorry Sandy, The command is just for reference but I can confirm
 that
  there are 4 executors and a driver as shown in the spark UI page.
 
  Each of these machines is a 8 core box with ~15G of ram.
 
  On Mon, Dec 29, 2014 at 11:23 PM, Sandy Ryza
  sandy.r...@cloudera.com wrote:
 
  Hi Mukesh,
 
  Based on your spark-submit command, it looks like you're only
  running with 2 executors on YARN.  Also, how many cores does
 each machine
  have?
 
  -Sandy
 
  On Mon, Dec 29, 2014 at 4:36 AM, Mukesh Jha
  me.mukesh@gmail.com wrote:
 
  Hello Experts,
  I'm bench-marking Spark on YARN
  (https://spark.apache.org/docs/latest/running-on-yarn.html)
 vs a standalone
  spark cluster (
 https://spark.apache.org/docs/latest/spark-standalone.html).
  I have a standalone cluster with 3 executors, and a spark app
  running on yarn with 4 executors as shown below.
 
  The spark job running inside yarn is 10x slower than the one
  running on the standalone cluster (even though the yarn has
 more number of
  workers), also in both the case all the executors are in the
 same datacenter
  so there shouldn't be any latency. On YARN each 5sec batch is
 reading data
  from kafka and processing it in 5sec  on the standalone
 cluster each 5sec
  

Re: Spark 1.1 (slow, working), Spark 1.2 (fast, freezing)

2015-01-21 Thread Davies Liu
We have not meet this issue, so not sure there are bugs related to
reused worker or not.

Could provide more details about it?

On Wed, Jan 21, 2015 at 2:27 AM, critikaled isasmani@gmail.com wrote:
 I'm also facing the same issue.
 is this a bug?



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-1-slow-working-Spark-1-2-fast-freezing-tp21278p21283.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How to delete graph checkpoints?

2015-01-21 Thread Cheuk Lam
This is a question about checkpointing on GraphX.

We'd like to automate deleting checkpoint files of old graphs.  The RDD
class has a getCheckpointFile() function, which allows us to retrieve the
checkpoint file of an old RDD and then delete it.  However, I couldn't find
a way to get hold of the corresponding checkpointed RDDs given the graph
reference; because the checkpoint of a GraphImpl is really done to the
underlying partitionsRDD in both VertexRDD and EdgeRDD, and that
partitionsRDD as defined today doesn't seem to be accessible from outside of
graphx.

Below is the declaration in VertexRDD.scala:
private[graphx] def partitionsRDD: RDD[ShippableVertexPartition[VD]]

We would really appreciate it if anyone could shed some light on solving
this problem, or anyone who has come across a similar problem could share a
solution or workaround.

Thank you,
Cheuk Lam





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-delete-graph-checkpoints-tp21296.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: sparkcontext.objectFile return thousands of partitions

2015-01-21 Thread Noam Barcay
maybe each of the file parts has many blocks?
did you try SparkContext.coalesce to reduce the number of partitions? can
be done w/ or w/o data-shuffle.

*Noam Barcay*
Developer // *Kenshoo*
*Office* +972 3 746-6500 *427 // *Mobile* +972 54 475-3142
__
*www.Kenshoo.com* http://kenshoo.com/

On Wed, Jan 21, 2015 at 5:31 PM, Wang, Ningjun (LNG-NPV) 
ningjun.w...@lexisnexis.com wrote:

  Why sc.objectFile(…) return a Rdd with thousands of partitions?



 I save a rdd to file system using



 rdd.saveAsObjectFile(“file:///tmp/mydir”)



 Note that the rdd contains 7 millions object. I check the directory
 /tmp/mydir/, it contains 8 partitions



 part-0  part-2  part-4  part-6  _SUCCESS

 part-1  part-3  part-5  part-7



 I then load the rdd back using



 val rdd2 = sc.objectFile[LabeledPoint]( (“file:///tmp/mydir”, 8)



 I expect rdd2 to have 8 partitions. But from the master UI, I see that
 rdd2 has over 1000 partitions. This is very inefficient. How can I limit it
 to 8 partitions just like what is stored on the file system?



 Regards,



 *Ningjun Wang*

 Consulting Software Engineer

 LexisNexis

 121 Chanlon Road

 New Providence, NJ 07974-1541




-- 
This e-mail, as well as any attached document, may contain material which 
is confidential and privileged and may include trademark, copyright and 
other intellectual property rights that are proprietary to Kenshoo Ltd, 
 its subsidiaries or affiliates (Kenshoo). This e-mail and its 
attachments may be read, copied and used only by the addressee for the 
purpose(s) for which it was disclosed herein. If you have received it in 
error, please destroy the message and any attachment, and contact us 
immediately. If you are not the intended recipient, be aware that any 
review, reliance, disclosure, copying, distribution or use of the contents 
of this message without Kenshoo's express permission is strictly prohibited.


Re: How to 'Pipe' Binary Data in Apache Spark

2015-01-21 Thread Frank Austin Nothaft
Hi Venkat/Nick,

The Spark RDD.pipe method pipes text data into a subprocess and then receives 
text data back from that process. Once you have the binary data loaded into an 
RDD properly, to pipe binary data to/from a subprocess (e.g., you want the data 
in the pipes to contain binary, not text), you need to implement your own, 
modified version of RDD.pipe. The implementation of RDD.pipe spawns a process 
per partition (IIRC), as well as threads for writing to and reading from the 
process (as well as stderr for the process). When writing via RDD.pipe, Spark 
calls *.toString on the object, and pushes that text representation down the 
pipe. There is an example of how to pipe binary data from within a 
mapPartitions call using the Scala API in lines 107-177 of this file. This 
specific code contains some nastiness around the packaging of downstream 
libraries that we rely on in that project, so I’m not sure if it is the 
cleanest way, but it is a workable way.

Regards,

Frank Austin Nothaft
fnoth...@berkeley.edu
fnoth...@eecs.berkeley.edu
202-340-0466

On Jan 21, 2015, at 9:17 AM, Venkat, Ankam ankam.ven...@centurylink.com wrote:

 I am trying to solve similar problem.  I am using option # 2 as suggested by 
 Nick. 
  
 I have created an RDD with sc.binaryFiles for a list of .wav files.  But, I 
 am not able to pipe it to the external programs. 
  
 For example:
  sq = sc.binaryFiles(wavfiles)  ß All .wav files stored on “wavfiles” 
  directory on HDFS
  sq.keys().collect() ß works fine.  Shows the list of file names.
  sq.values().collect() ß works fine.  Shows the content of the files.
  sq.values().pipe(lambda x: subprocess.call(['/usr/local/bin/sox', '-t' 
  'wav', '-', '-n', 'stats'])).collect() ß Does not work.  Tried different 
  options. 
 AttributeError: 'function' object has no attribute 'read'
  
 Any suggestions?
  
 Regards,
 Venkat Ankam
  
 From: Nick Allen [mailto:n...@nickallen.org] 
 Sent: Friday, January 16, 2015 11:46 AM
 To: user@spark.apache.org
 Subject: Re: How to 'Pipe' Binary Data in Apache Spark
  
 I just wanted to reiterate the solution for the benefit of the community.
  
 The problem is not from my use of 'pipe', but that 'textFile' cannot be used 
 to read in binary data. (Doh) There are a couple options to move forward.
  
 1. Implement a custom 'InputFormat' that understands the binary input data. 
 (Per Sean Owen)
  
 2. Use 'SparkContext.binaryFiles' to read in the entire binary file as a 
 single record. This will impact performance as it prevents the use of more 
 than one mapper on the file's data.
  
 In my specific case for #1 I can only find one project from RIPE-NCC 
 (https://github.com/RIPE-NCC/hadoop-pcap) that does this. Unfortunately, it 
 appears to only support a limited set of network protocols.
  
  
 On Fri, Jan 16, 2015 at 10:40 AM, Nick Allen n...@nickallen.org wrote:
 Per your last comment, it appears I need something like this:
  
 https://github.com/RIPE-NCC/hadoop-pcap
  
 Thanks a ton.  That get me oriented in the right direction.  
  
 On Fri, Jan 16, 2015 at 10:20 AM, Sean Owen so...@cloudera.com wrote:
 Well it looks like you're reading some kind of binary file as text.
 That isn't going to work, in Spark or elsewhere, as binary data is not
 even necessarily the valid encoding of a string. There are no line
 breaks to delimit lines and thus elements of the RDD.
 
 Your input has some record structure (or else it's not really useful
 to put it into an RDD). You can encode this as a SequenceFile and read
 it with objectFile.
 
 You could also write a custom InputFormat that knows how to parse pcap
 records directly.
 
 On Fri, Jan 16, 2015 at 3:09 PM, Nick Allen n...@nickallen.org wrote:
  I have an RDD containing binary data. I would like to use 'RDD.pipe' to pipe
  that binary data to an external program that will translate it to
  string/text data. Unfortunately, it seems that Spark is mangling the binary
  data before it gets passed to the external program.
 
  This code is representative of what I am trying to do. What am I doing
  wrong? How can I pipe binary data in Spark?  Maybe it is getting corrupted
  when I read it in initially with 'textFile'?
 
  bin = sc.textFile(binary-data.dat)
  csv = bin.pipe (/usr/bin/binary-to-csv.sh)
  csv.saveAsTextFile(text-data.csv)
 
  Specifically, I am trying to use Spark to transform pcap (packet capture)
  data to text/csv so that I can perform an analysis on it.
 
  Thanks!
 
  --
  Nick Allen n...@nickallen.org
 
 
  
 --
 Nick Allen n...@nickallen.org
 
 
  
 --
 Nick Allen n...@nickallen.org
 This communication is the property of CenturyLink and may contain 
 confidential or privileged information. Unauthorized use of this 
 communication is strictly prohibited and may be unlawful. If you have 
 received this communication in error, please immediately notify the sender by 
 reply e-mail and destroy all copies of the communication and any attachments.



Re: spark 1.2 three times slower than spark 1.1, why?

2015-01-21 Thread Kay Ousterhout
Is it possible to re-run your job with spark.eventLog.enabled to true, and
send the resulting logs to the list? Those have more per-task information
that can help diagnose this.

-Kay

On Wed, Jan 21, 2015 at 1:57 AM, Fengyun RAO raofeng...@gmail.com wrote:

 btw: Shuffle Write(11 GB) mean 11 GB per Executor, for each task, it's ~40
 MB


 2015-01-21 17:53 GMT+08:00 Fengyun RAO raofeng...@gmail.com:

 I don't know how to debug distributed application, any tools or
 suggestion?

 but from spark web UI,

 the GC time (~0.1 s), Shuffle Write(11 GB) are similar for spark 1.1 and
 1.2.
 there are no Shuffle Read and Spill.
 The only difference is Duration
 DurationMin25th percentileMedian75th percentileMaxspark 1.24s37s45s53s1.9
 minspark 1.12 s17 s18 s18 s34 s

 2015-01-21 16:56 GMT+08:00 Sean Owen so...@cloudera.com:

 I mean that if you had tasks running on 10 machines now instead of 3 for
 some reason you would have more than 3 times the read load on your source
 of data all at once. Same if you made more executors per machine. But from
 your additional info it does not sound like this is the case. I think you
 need more debugging to pinpoint what is slower.
 On Jan 21, 2015 9:30 AM, Fengyun RAO raofeng...@gmail.com wrote:

 thanks, Sean.

 I don't quite understand you have *more *partitions across *more *
 workers.

 It's within the same cluster, and the same data, thus I think the same
 partition, the same workers.

 we switched from spark 1.1 to 1.2, then it's 3x slower.

 (We upgrade from CDH 5.2.1 to CDH 5.3, hence spark 1.1 to 1.2, and
 found the problem.
 then we installed a standalone spark 1.1, stop the 1.2, run the same
 script, it's 3x faster.
 stop 1.1, start 1.2, 3x slower again)


 2015-01-21 15:45 GMT+08:00 Sean Owen so...@cloudera.com:

 I don't know of any reason to think the singleton pattern doesn't work
 or works differently. I wonder if, for example, task scheduling is
 different in 1.2 and you have more partitions across more workers and so
 are loading more copies more slowly into your singletons.
 On Jan 21, 2015 7:13 AM, Fengyun RAO raofeng...@gmail.com wrote:

 the LogParser instance is not serializable, and thus cannot be a
 broadcast,

 what’s worse, it contains an LRU cache, which is essential to the
 performance, and we would like to share among all the tasks on the same
 node.

 If it is the case, what’s the recommended way to share a variable
 among all the tasks within the same executor.
 ​

 2015-01-21 15:04 GMT+08:00 Davies Liu dav...@databricks.com:

 Maybe some change related to serialize the closure cause LogParser is
 not a singleton any more, then it is initialized for every task.

 Could you change it to a Broadcast?

 On Tue, Jan 20, 2015 at 10:39 PM, Fengyun RAO raofeng...@gmail.com
 wrote:
  Currently we are migrating from spark 1.1 to spark 1.2, but found
 the
  program 3x slower, with nothing else changed.
  note: our program in spark 1.1 has successfully processed a whole
 year data,
  quite stable.
 
  the main script is as below
 
  sc.textFile(inputPath)
  .flatMap(line = LogParser.parseLine(line))
  .groupByKey(new HashPartitioner(numPartitions))
  .mapPartitionsWithIndex(...)
  .foreach(_ = {})
 
  where LogParser is a singleton which may take some time to
 initialized and
  is shared across the execuator.
 
  the flatMap stage is 3x slower.
 
  We tried to change spark.shuffle.manager back to hash, and
  spark.shuffle.blockTransferService back to nio, but didn’t help.
 
  May somebody explain possible causes, or what should we test or
 change to
  find it out








Re: [SQL] Using HashPartitioner to distribute by column

2015-01-21 Thread Cheng Lian
Michael - I mean although preparing and repartitioning the underlying 
data can't avoid the shuffle introduced by Spark SQL (Yin has explained 
why), but it does help to reduce network IO.


On 1/21/15 10:01 AM, Yin Huai wrote:

Hello Michael,

In Spark SQL, we have our internal concepts of Output Partitioning 
(representing the partitioning scheme of an operator's output) and 
Required Child Distribution (representing the requirement of input 
data distribution of an operator) for a physical operator. Let's say 
we have two operators, parent and child, and the parent takes the 
output of the child as its input. At the end of query planning 
process, whenever the Output Partitioning of the child does not 
satisfy the Required Child Distribution of the parent, we will add an 
Exchange operator between the parent and child to shuffle the data. 
Right now, we do not record the partitioning scheme of an input table. 
So, I think even if you use partitionBy (or DISTRIBUTE BY in SQL) to 
prepare your data, you still will see the Exchange operator and your 
GROUP BY operation will be executed in a new stage (after the Exchange).


Making Spark SQL aware of the partitioning scheme of input tables is a 
useful optimization. I just created 
https://issues.apache.org/jira/browse/SPARK-5354 to track it.


Thanks,

Yin



On Wed, Jan 21, 2015 at 8:43 AM, Michael Davies 
michael.belldav...@gmail.com mailto:michael.belldav...@gmail.com 
wrote:


Hi Cheng,

Are you saying that by setting up the lineage

schemaRdd.keyBy(_.getString(1)).partitionBy(newHashPartitioner(n)).values.applySchema(schema)
then Spark SQL will know that an SQL “group by” on Customer Code
will not have to shuffle?

But the prepared will have already shuffled so we pay an upfront
cost for future groupings (assuming we cache I suppose)

Mick


On 20 Jan 2015, at 20:44, Cheng Lian lian.cs@gmail.com
mailto:lian.cs@gmail.com wrote:

First of all, even if the underlying dataset is partitioned as
expected, a shuffle can’t be avoided. Because Spark SQL knows
nothing about the underlying data distribution. However, this
does reduce network IO.

You can prepare your data like this (say |CustomerCode| is a
string field with ordinal 1):

|val  schemaRdd  =  sql(...)
val  schema  =  schemaRdd.schema
val  prepared  =  schemaRdd.keyBy(_.getString(1)).partitionBy(new  
HashPartitioner(n)).values.applySchema(schema)
|

|n| should be equal to |spark.sql.shuffle.partitions|.

Cheng

On 1/19/15 7:44 AM, Mick Davies wrote:




Is it possible to use a HashPartioner or something similar to distribute a
SchemaRDDs data by the hash of a particular column or set of columns.

Having done this I would then hope that GROUP BY could avoid shuffle

E.g. set up a HashPartioner on CustomerCode field so that

SELECT CustomerCode, SUM(Cost)
FROM Orders
GROUP BY CustomerCode

would not need to shuffle.

Cheers
Mick





--
View this message in 
context:http://apache-spark-user-list.1001560.n3.nabble.com/SQL-Using-HashPartitioner-to-distribute-by-column-tp21237.html
Sent from the Apache Spark User List mailing list archive atNabble.com  
http://Nabble.com.

-
To unsubscribe, e-mail:user-unsubscr...@spark.apache.org  
mailto:user-unsubscr...@spark.apache.org
For additional commands, e-mail:user-h...@spark.apache.org  
mailto:user-h...@spark.apache.org





​







RE: How to 'Pipe' Binary Data in Apache Spark

2015-01-21 Thread Venkat, Ankam
I am trying to solve similar problem.  I am using option # 2 as suggested by 
Nick.

I have created an RDD with sc.binaryFiles for a list of .wav files.  But, I am 
not able to pipe it to the external programs.

For example:
 sq = sc.binaryFiles(wavfiles)  -- All .wav files stored on “wavfiles” 
 directory on HDFS
 sq.keys().collect() -- works fine.  Shows the list of file names.
 sq.values().collect() -- works fine.  Shows the content of the files.
 sq.values().pipe(lambda x: subprocess.call(['/usr/local/bin/sox', '-t' 
 'wav', '-', '-n', 'stats'])).collect()  -- Does not work.  Tried different 
 options.
AttributeError: 'function' object has no attribute 'read'

Any suggestions?

Regards,
Venkat Ankam

From: Nick Allen [mailto:n...@nickallen.org]
Sent: Friday, January 16, 2015 11:46 AM
To: user@spark.apache.org
Subject: Re: How to 'Pipe' Binary Data in Apache Spark

I just wanted to reiterate the solution for the benefit of the community.

The problem is not from my use of 'pipe', but that 'textFile' cannot be used to 
read in binary data. (Doh) There are a couple options to move forward.

1. Implement a custom 'InputFormat' that understands the binary input data. 
(Per Sean Owen)

2. Use 'SparkContext.binaryFiles' to read in the entire binary file as a single 
record. This will impact performance as it prevents the use of more than one 
mapper on the file's data.

In my specific case for #1 I can only find one project from RIPE-NCC 
(https://github.com/RIPE-NCC/hadoop-pcap) that does this. Unfortunately, it 
appears to only support a limited set of network protocols.


On Fri, Jan 16, 2015 at 10:40 AM, Nick Allen 
n...@nickallen.orgmailto:n...@nickallen.org wrote:
Per your last comment, it appears I need something like this:

https://github.com/RIPE-NCC/hadoop-pcap

Thanks a ton.  That get me oriented in the right direction.

On Fri, Jan 16, 2015 at 10:20 AM, Sean Owen 
so...@cloudera.commailto:so...@cloudera.com wrote:
Well it looks like you're reading some kind of binary file as text.
That isn't going to work, in Spark or elsewhere, as binary data is not
even necessarily the valid encoding of a string. There are no line
breaks to delimit lines and thus elements of the RDD.

Your input has some record structure (or else it's not really useful
to put it into an RDD). You can encode this as a SequenceFile and read
it with objectFile.

You could also write a custom InputFormat that knows how to parse pcap
records directly.

On Fri, Jan 16, 2015 at 3:09 PM, Nick Allen 
n...@nickallen.orgmailto:n...@nickallen.org wrote:
 I have an RDD containing binary data. I would like to use 'RDD.pipe' to pipe
 that binary data to an external program that will translate it to
 string/text data. Unfortunately, it seems that Spark is mangling the binary
 data before it gets passed to the external program.

 This code is representative of what I am trying to do. What am I doing
 wrong? How can I pipe binary data in Spark?  Maybe it is getting corrupted
 when I read it in initially with 'textFile'?

 bin = sc.textFile(binary-data.dat)
 csv = bin.pipe (/usr/bin/binary-to-csv.sh)
 csv.saveAsTextFile(text-data.csv)

 Specifically, I am trying to use Spark to transform pcap (packet capture)
 data to text/csv so that I can perform an analysis on it.

 Thanks!

 --
 Nick Allen n...@nickallen.orgmailto:n...@nickallen.org



--
Nick Allen n...@nickallen.orgmailto:n...@nickallen.org



--
Nick Allen n...@nickallen.orgmailto:n...@nickallen.org
This communication is the property of CenturyLink and may contain confidential 
or privileged information. Unauthorized use of this communication is strictly 
prohibited and may be unlawful. If you have received this communication in 
error, please immediately notify the sender by reply e-mail and destroy all 
copies of the communication and any attachments.


RE: Spark 1.1.0 - spark-submit failed

2015-01-21 Thread ey-chih chow
Thanks for help.  I added the following dependency in my pom file and the 
problem went away.
dependency !-- default Netty --
  groupIdio.netty/groupId
  artifactIdnetty/artifactId
  version3.6.6.Final/version
/dependency
Ey-Chih
Date: Tue, 20 Jan 2015 16:57:20 -0800 Subject: Re: Spark 1.1.0 - spark-submit 
failedFrom: yuzhih...@gmail.com
To: eyc...@hotmail.com
CC: user@spark.apache.org

Please check which netty jar(s) are on the classpath.
NioWorkerPool(Executor workerExecutor, int workerCount) was added in netty 3.5.4

Cheers
On Tue, Jan 20, 2015 at 4:15 PM, ey-chih chow eyc...@hotmail.com wrote:
Hi,



I issued the following command in a ec2 cluster launched using spark-ec2:



~/spark/bin/spark-submit --class com.crowdstar.cluster.etl.ParseAndClean

--master spark://ec2-54-185-107-113.us-west-2.compute.amazonaws.com:7077

--deploy-mode cluster --total-executor-cores 4

file:///tmp/etl-admin/jar/spark-etl-0.0.1-SNAPSHOT.jar

/ETL/input/2015/01/10/12/10Jan2015.avro

file:///tmp/etl-admin/vertica/VERTICA.avdl

file:///tmp/etl-admin/vertica/extras.json

file:///tmp/etl-admin/jar/spark-etl-0.0.1-SNAPSHOT.jar



The command failed with the following error logs in Spark-UI.  Is there any

suggestion on how to fix the problem?  Thanks.



Ey-Chih Chow



==



Launch Command: /usr/lib/jvm/java-1.7.0/bin/java -cp

/root/spark/work/driver-20150120200843-/spark-etl-0.0.1-SNAPSHOT.jar/root/ephemeral-hdfs/conf:/root/spark/conf:/root/spark/lib/spark-assembly-1.1.0-hadoop1.0.4.jar:/root/spark/lib/datanucleus-api-jdo-3.2.1.jar:/root/spark/lib/datanucleus-core-3.2.2.jar:/root/spark/lib/datanucleus-rdbms-3.2.1.jar

-XX:MaxPermSize=128m

-Dspark.executor.extraLibraryPath=/root/ephemeral-hdfs/lib/native/

-Dspark.executor.memory=13000m -Dspark.akka.askTimeout=10

-Dspark.cores.max=4

-Dspark.app.name=com.crowdstar.cluster.etl.ParseAndClean

-Dspark.jars=file:///tmp/etl-admin/jar/spark-etl-0.0.1-SNAPSHOT.jar

-Dspark.executor.extraClassPath=/root/ephemeral-hdfs/conf

-Dspark.master=spark://ec2-54-203-58-2.us-west-2.compute.amazonaws.com:7077

-Dakka.loglevel=WARNING -Xms512M -Xmx512M

org.apache.spark.deploy.worker.DriverWrapper

akka.tcp://sparkwor...@ip-10-33-140-157.us-west-2.compute.internal:47585/user/Worker

com.crowdstar.cluster.etl.ParseAndClean

/ETL/input/2015/01/10/12/10Jan2015.avro

file:///tmp/etl-admin/vertica/VERTICA.avdl

file:///tmp/etl-admin/vertica/extras.json

file:///tmp/etl-admin/jar/spark-etl-0.0.1-SNAPSHOT.jar





SLF4J: Class path contains multiple SLF4J bindings.

SLF4J: Found binding in

[jar:file:/root/spark/work/driver-20150120200843-/spark-etl-0.0.1-SNAPSHOT.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: Found binding in

[jar:file:/root/spark/lib/spark-assembly-1.1.0-hadoop1.0.4.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an

explanation.

SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]

15/01/20 20:08:45 INFO spark.SecurityManager: Changing view acls to: root,

15/01/20 20:08:45 INFO spark.SecurityManager: Changing modify acls to: root,

15/01/20 20:08:45 INFO spark.SecurityManager: SecurityManager:

authentication disabled; ui acls disabled; users with view permissions:

Set(root, ); users with modify permissions: Set(root, )

15/01/20 20:08:45 INFO slf4j.Slf4jLogger: Slf4jLogger started

15/01/20 20:08:45 ERROR actor.ActorSystemImpl: Uncaught fatal error from

thread [Driver-akka.actor.default-dispatcher-3] shutting down ActorSystem

[Driver]

java.lang.NoSuchMethodError:

org.jboss.netty.channel.socket.nio.NioWorkerPool.init(Ljava/util/concurrent/Executor;I)V

at

akka.remote.transport.netty.NettyTransport.init(NettyTransport.scala:282)

at

akka.remote.transport.netty.NettyTransport.init(NettyTransport.scala:239)

at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)

at

sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)

at

sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)

at java.lang.reflect.Constructor.newInstance(Constructor.java:526)

at

akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$2.apply(DynamicAccess.scala:78)

at scala.util.Try$.apply(Try.scala:161)

at

akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:73)

at

akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84)

at

akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84)

at scala.util.Success.flatMap(Try.scala:200)

at

akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:84)

at 

Re: [SparkSQL] Try2: Parquet predicate pushdown troubles

2015-01-21 Thread Cheng Lian
Oh yes, thanks for adding that using sc.hadoopConfiguration.set also works
:-)
​

On Wed, Jan 21, 2015 at 7:11 AM, Yana Kadiyska yana.kadiy...@gmail.com
wrote:

 Thanks for looking Cheng. Just to clarify in case other people need this
 sooner, setting sc.hadoopConfiguration.set(parquet.task.side.metadata,
 false)did work well in terms of dropping rowgroups/showing small input
 size. What was odd about that is that the overall time wasn't much
 better...but maybe that was overhead from sending the metadata clientside.

 Thanks again and looking forward to your fix

 On Tue, Jan 20, 2015 at 9:07 PM, Cheng Lian lian.cs@gmail.com wrote:

  Hey Yana,

 Sorry for the late reply, missed this important thread somehow. And many
 thanks for reporting this. It turned out to be a bug — filter pushdown is
 only enabled when using client side metadata, which is not expected,
 because task side metadata code path is more performant. And I guess that
 the reason why setting parquet.task.side.metadata to false didn’t reduce
 input size for you is because you set the configuration with Spark API, or
 put it into spark-defaults.conf. This configuration goes to Hadoop
 Configuration, and Spark only merge properties whose names start with
 spark.hadoop into Hadoop Configuration instances. You may try to put
 parquet.task.side.metadata config into Hadoop core-site.xml, and then
 re-run the query. I can see significant differences by doing so.

 I’ll open a JIRA and deliver a fix for this ASAP. Thanks again for
 reporting all the details!

 Cheng

 On 1/13/15 12:56 PM, Yana Kadiyska wrote:

   Attempting to bump this up in case someone can help out after all. I
 spent a few good hours stepping through the code today, so I'll summarize
 my observations both in hope I get some help and to help others that might
 be looking into this:

  1. I am setting *spark.sql.parquet.**filterPushdown=true*
 2. I can see by stepping through the driver debugger that
 PaquetTableOperations.execute sets the filters via
 ParquetInputFormat.setFilterPredicate (I checked the conf object, things
 appear OK there)
 3. In FilteringParquetRowInputFormat, I get through the codepath for
 getTaskSideSplits. It seems that the codepath for getClientSideSplits would
 try to drop rowGroups but I don't see similar in getTaskSideSplit.

  Does anyone have pointers on where to look after this? Where is
 rowgroup filtering happening in the case of getTaskSideSplits? I can attach
 to the executor but am not quite sure what code related to Parquet gets
 called executor side...also don't see any messages in the executor logs
 related to Filtering predicates.

 For comparison, I went through the getClientSideSplits and can see that
 predicate pushdown works OK:


 sc.hadoopConfiguration.set(parquet.task.side.metadata,false)

 15/01/13 20:04:49 INFO FilteringParquetRowInputFormat: Using Client Side 
 Metadata Split Strategy
 15/01/13 20:05:13 INFO FilterCompat: Filtering using predicate: eq(epoch, 
 1417384800)
 15/01/13 20:06:45 INFO FilteringParquetRowInputFormat: Dropping 572 row 
 groups that do not pass filter predicate (28 %) !

 ​

  Is it possible that this is just a UI bug? I can see Input=4G when
 using (parquet.task.side.metadata,false) and Input=140G when using
 (parquet.task.side.metadata,true) but the runtimes are very comparable?

  [image: Inline image 1]


  JobId 4 is the ClientSide split, JobId 5 is the TaskSide split.



  On Fri, Jan 9, 2015 at 2:56 PM, Yana Kadiyska yana.kadiy...@gmail.com
 wrote:

 I am running the following (connecting to an external Hive Metastore)

   /a/shark/spark/bin/spark-shell --master spark://ip:7077  --conf
 *spark.sql.parquet.filterPushdown=true*

  val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)

  and then ran two queries:

 sqlContext.sql(select count(*) from table where partition='blah' )
 andsqlContext.sql(select count(*) from table where partition='blah' and 
 epoch=1415561604)

 ​

  According to the Input tab in the UI both scan about 140G of data
 which is the size of my whole partition. So I have two questions --

  1. is there a way to tell from the plan if a predicate pushdown is
 supposed to happen?
 I see this for the second query

 res0: org.apache.spark.sql.SchemaRDD =
 SchemaRDD[0] at RDD at SchemaRDD.scala:108
 == Query Plan ==
 == Physical Plan ==
 Aggregate false, [], [Coalesce(SUM(PartialCount#49L),0) AS _c0#0L]
  Exchange SinglePartition
   Aggregate true, [], [COUNT(1) AS PartialCount#49L]
OutputFaker []
 Project []
  ParquetTableScan [epoch#139L], (ParquetRelation list of hdfs files

 ​
  2. am I doing something obviously wrong that this is not working? (Im
 guessing it's not woring because the input size for the second query shows
 unchanged and the execution time is almost 2x as long)

  thanks in advance for any insights


​





Re: [SQL] Using HashPartitioner to distribute by column

2015-01-21 Thread Michael Davies
Hi Cheng, 

Are you saying that by setting up the lineage 
schemaRdd.keyBy(_.getString(1)).partitionBy(new 
HashPartitioner(n)).values.applySchema(schema)
then Spark SQL will know that an SQL “group by” on Customer Code will not have 
to shuffle?

But the prepared will have already shuffled so we pay an upfront cost for 
future groupings (assuming we cache I suppose) 

Mick

 On 20 Jan 2015, at 20:44, Cheng Lian lian.cs@gmail.com wrote:
 
 First of all, even if the underlying dataset is partitioned as expected, a 
 shuffle can’t be avoided. Because Spark SQL knows nothing about the 
 underlying data distribution. However, this does reduce network IO.
 
 You can prepare your data like this (say CustomerCode is a string field with 
 ordinal 1):
 
 val schemaRdd = sql(...)
 val schema = schemaRdd.schema
 val prepared = schemaRdd.keyBy(_.getString(1)).partitionBy(new 
 HashPartitioner(n)).values.applySchema(schema)
 n should be equal to spark.sql.shuffle.partitions.
 
 Cheng
 
 On 1/19/15 7:44 AM, Mick Davies wrote:
 
 
 
 Is it possible to use a HashPartioner or something similar to distribute a
 SchemaRDDs data by the hash of a particular column or set of columns.
 
 Having done this I would then hope that GROUP BY could avoid shuffle
 
 E.g. set up a HashPartioner on CustomerCode field so that 
 
 SELECT CustomerCode, SUM(Cost)
 FROM Orders
 GROUP BY CustomerCode
 
 would not need to shuffle.
 
 Cheers 
 Mick
 
 
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/SQL-Using-HashPartitioner-to-distribute-by-column-tp21237.html
  
 http://apache-spark-user-list.1001560.n3.nabble.com/SQL-Using-HashPartitioner-to-distribute-by-column-tp21237.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
 mailto:user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org 
 mailto:user-h...@spark.apache.org
 
 
 
 



Re: Discourse: A proposed alternative to the Spark User list

2015-01-21 Thread Nicholas Chammas
Josh / Patrick,

What do y’all think of the idea of promoting Stack Overflow as a place to
ask questions over this list, as long as the questions fit SO’s guidelines (
how-to-ask http://stackoverflow.com/help/how-to-ask, dont-ask
http://stackoverflow.com/help/dont-ask)?

The apache-spark http://stackoverflow.com/questions/tagged/apache-spark
tag is very active on there.

Discussions of all types are still on-topic here, but when possible we want
to encourage people to use SO.

Nick

On Wed Jan 21 2015 at 8:37:05 AM Jay Vyas jayunit100.apa...@gmail.com
http://mailto:jayunit100.apa...@gmail.com wrote:

Its a very valid  idea indeed, but... It's a tricky  subject since the
 entire ASF is run on mailing lists , hence there are so many different but
 equally sound ways of looking at this idea, which conflict with one another.

  On Jan 21, 2015, at 7:03 AM, btiernay btier...@hotmail.com wrote:
 
  I think this is a really great idea for really opening up the discussions
  that happen here. Also, it would be nice to know why there doesn't seem
 to
  be much interest. Maybe I'm misunderstanding some nuance of Apache
 projects.
 
  Cheers
 
 
 
  --
  View this message in context: http://apache-spark-user-list.
 1001560.n3.nabble.com/Discourse-A-proposed-alternative-to-the-Spark-User-
 list-tp20851p21288.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org

  ​


[mllib] Decision Tree - prediction probabilites of label classes

2015-01-21 Thread Zsolt Tóth
Hi,

I use DecisionTree for multi class classification.
I can get the probability of the predicted label for every node in the
decision tree from node.predict().prob(). Is it possible to retrieve or
count the probability of every possible label class in the node?
To be more clear:
Say in Node A there are 4 of label 0.0, 2 of label 1.0 and 3 of label 2.0.
If I'm correct predict.prob() is 4/9 in this case. I need the values 2/9
and 3/9 for the 2 other labels.

It would be great to retrieve the exact count of label classes ([4,2,3] in
the example) but I don't think thats possible now. Is something like this
planned for a future release?

Thanks!


Re: spark 1.2 three times slower than spark 1.1, why?

2015-01-21 Thread Davies Liu
On Tue, Jan 20, 2015 at 11:13 PM, Fengyun RAO raofeng...@gmail.com wrote:
 the LogParser instance is not serializable, and thus cannot be a broadcast,

You could create a empty LogParser object (it's serializable), then
load the data
in executor lazily.

Could you add some logging to LogParser to check the behavior between
Spark 1.1 and 1.2 (the number of times to load data)?

 what’s worse, it contains an LRU cache, which is essential to the
 performance, and we would like to share among all the tasks on the same
 node.

 If it is the case, what’s the recommended way to share a variable among all
 the tasks within the same executor.


 2015-01-21 15:04 GMT+08:00 Davies Liu dav...@databricks.com:

 Maybe some change related to serialize the closure cause LogParser is
 not a singleton any more, then it is initialized for every task.

 Could you change it to a Broadcast?

 On Tue, Jan 20, 2015 at 10:39 PM, Fengyun RAO raofeng...@gmail.com
 wrote:
  Currently we are migrating from spark 1.1 to spark 1.2, but found the
  program 3x slower, with nothing else changed.
  note: our program in spark 1.1 has successfully processed a whole year
  data,
  quite stable.
 
  the main script is as below
 
  sc.textFile(inputPath)
  .flatMap(line = LogParser.parseLine(line))
  .groupByKey(new HashPartitioner(numPartitions))
  .mapPartitionsWithIndex(...)
  .foreach(_ = {})
 
  where LogParser is a singleton which may take some time to initialized
  and
  is shared across the execuator.
 
  the flatMap stage is 3x slower.
 
  We tried to change spark.shuffle.manager back to hash, and
  spark.shuffle.blockTransferService back to nio, but didn’t help.
 
  May somebody explain possible causes, or what should we test or change
  to
  find it out



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Connect to Hive metastore (on YARN) from Spark Shell?

2015-01-21 Thread YaoPau
Is this possible, and if so what steps do I need to take to make this happen?  



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Connect-to-Hive-metastore-on-YARN-from-Spark-Shell-tp21300.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Connect to Hive metastore (on YARN) from Spark Shell?

2015-01-21 Thread Zhan Zhang
You can put hive-site.xml in your conf/ directory. It will connect to Hive when 
HiveContext is initialized.

Thanks.

Zhan Zhang

On Jan 21, 2015, at 12:35 PM, YaoPau jonrgr...@gmail.com wrote:

 Is this possible, and if so what steps do I need to take to make this happen? 
  
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Connect-to-Hive-metastore-on-YARN-from-Spark-Shell-tp21300.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 


-- 
CONFIDENTIALITY NOTICE
NOTICE: This message is intended for the use of the individual or entity to 
which it is addressed and may contain information that is confidential, 
privileged and exempt from disclosure under applicable law. If the reader 
of this message is not the intended recipient, you are hereby notified that 
any printing, copying, dissemination, distribution, disclosure or 
forwarding of this communication is strictly prohibited. If you have 
received this communication in error, please contact the sender immediately 
and delete it from your system. Thank You.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Discourse: A proposed alternative to the Spark User list

2015-01-21 Thread pzecevic
Hi,
I tried to find the last reply by Nick Chammas (that I received in the
digest) using the Nabble web interface, but I cannot find it (perhaps he
didn't reply directly to the user list?). That's one example of Nabble's
usability.

Anyhow, I wanted to add my two cents...

Apache user group could be frozen (not accepting new questions, if that's
possible) and redirect users to Stack Overflow (automatic reply?). Old
questions remain (and are searchable) on Nabble, new questions go to Stack
Exchange, so no need for migration. That's the idea, at least, as I'm not
sure if that's technically doable... Is it?
dev mailing list could perhaps stay on Nabble (it's not that busy), or have
a special tag on Stack Exchange.

Other thing, about new Stack Exchange site I proposed earlier. If a new site
is created, there is no problem with guidelines, I think, because Spark
community can apply different guidelines for the new site. 

There is a FAQ about creating new sites: http://area51.stackexchange.com/faq
It says: Stack Exchange sites are free to create and free to use. All we
ask is that you have an enthusiastic, committed group of expert users who
check in regularly, asking and answering questions.
I think this requirement is satisfied...
Someone expressed a concern that they won't allow creating a
project-specific site, but there already exist some project-specific sites,
like Tor, Drupal, Ubuntu...

Later, though, the FAQ also says:
If Y already exists, it already has a tag for X, and nobody is complaining
(then you should not create a new site). But we could complain :)

The advantage of having a separate site is that users, who should have more
privileges, would need to earn them through Spark questions and answers
only. The other thing, already mentioned, is that the community could create
Spark specific guidelines. There are also  'meta' sites for asking questions
like this one, etc.

There is a process for starting a site - it's not instantaneous. New site
needs to go through private beta and public beta, so that could be a
drawback.


Like btiernay, I must say: there might be something about Apache projects
and mailing lists that I do not know, so excuse me if that is the case...




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Discourse-A-proposed-alternative-to-the-Spark-User-list-tp20851p21299.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Is Apache Spark less accurate than Scikit Learn?

2015-01-21 Thread JacquesH
I've recently been trying to get to know Apache Spark as a replacement for
Scikit Learn, however it seems to me that even in simple cases, Scikit
converges to an accurate model far faster than Spark does.
For example I generated 1000 data points for a very simple linear function
(z=x+y) with the following script:

http://pastebin.com/ceRkh3nb

I then ran the following Scikit script:

http://pastebin.com/1aECPfvq

And then this Spark script: (with spark-submit filename, no other
arguments)

http://pastebin.com/s281cuTL

Strangely though, the error given by spark is an order of magnitude larger
than that given by Scikit (0.185 and 0.045 respectively) despite the two
models having a nearly identical setup (as far as I can tell)
I understand that this is using SGD with very few iterations and so the
results may differ but I wouldn't have thought that it would be anywhere
near such a large difference or such a large error, especially given the
exceptionally simple data.

Is there something I'm misunderstanding in Spark? Is it not correctly
configured? Surely I should be getting a smaller error than that?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Is-Apache-Spark-less-accurate-than-Scikit-Learn-tp21301.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Is Apache Spark less accurate than Scikit Learn?

2015-01-21 Thread Robin East
I don’t get those results. I get:

spark   0.14
scikit-learn0.85

The scikit-learn mse is due to the very low eta0 setting. Tweak that to 0.1 and 
push iterations to 400 and you get a mse ~= 0. Of course the coefficients are 
both ~1 and the intercept ~0. Similarly if you change the mllib step size to 
0.5 and number of iterations to 1200 you again get a very low mse. One of the 
issues with SGD is you have to tweak these parameters to tune the algorithm.

FWIW I wouldn’t see Spark MLlib as a replacement for scikit-learn. MLLib is 
nowhere as mature as scikit learn. However if you have large datasets that 
won’t sensibly fit the scikit-learn in-core model MLLib is one of the top 
choices. Similarly if you are running proof of concepts that you are eventually 
going to scale up to production environments then there is a definite argument 
for using MLlib at both the PoC and production stages.


On 21 Jan 2015, at 20:39, JacquesH jaaksem...@gmail.com wrote:

 I've recently been trying to get to know Apache Spark as a replacement for
 Scikit Learn, however it seems to me that even in simple cases, Scikit
 converges to an accurate model far faster than Spark does.
 For example I generated 1000 data points for a very simple linear function
 (z=x+y) with the following script:
 
 http://pastebin.com/ceRkh3nb
 
 I then ran the following Scikit script:
 
 http://pastebin.com/1aECPfvq
 
 And then this Spark script: (with spark-submit filename, no other
 arguments)
 
 http://pastebin.com/s281cuTL
 
 Strangely though, the error given by spark is an order of magnitude larger
 than that given by Scikit (0.185 and 0.045 respectively) despite the two
 models having a nearly identical setup (as far as I can tell)
 I understand that this is using SGD with very few iterations and so the
 results may differ but I wouldn't have thought that it would be anywhere
 near such a large difference or such a large error, especially given the
 exceptionally simple data.
 
 Is there something I'm misunderstanding in Spark? Is it not correctly
 configured? Surely I should be getting a smaller error than that?
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Is-Apache-Spark-less-accurate-than-Scikit-Learn-tp21301.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to use more executors

2015-01-21 Thread Nan Zhu
…not sure when will it be reviewed…

but for now you can work around by allowing multiple worker instances on a 
single machine  

http://spark.apache.org/docs/latest/spark-standalone.html

search SPARK_WORKER_INSTANCES

Best,  

--  
Nan Zhu
http://codingcat.me


On Wednesday, January 21, 2015 at 6:50 PM, Larry Liu wrote:

 Will  SPARK-1706 be included in next release?
  
 On Wed, Jan 21, 2015 at 2:50 PM, Ted Yu yuzhih...@gmail.com 
 (mailto:yuzhih...@gmail.com) wrote:
  Please see SPARK-1706
   
  On Wed, Jan 21, 2015 at 2:43 PM, Larry Liu larryli...@gmail.com 
  (mailto:larryli...@gmail.com) wrote:
   I tried to submit a job with  --conf spark.cores.max=6  or 
   --total-executor-cores 6 on a standalone cluster. But I don't see more 
   than 1 executor on each worker. I am wondering how to use multiple 
   executors when submitting jobs.

   Thanks
   larry


   
   
   
  



Announcing SF / East Bay Area Stream Processing Meetup

2015-01-21 Thread Siva Jagadeesan
Hi All

I have been running Bay Area Storm meetup for almost 2 years. Instead of
having meetups for storm and spark, I changed the storm meetup to be stream
processing meetup where we can discuss about all stream processing
frameworks.

http://www.meetup.com/Bay-Area-Stream-Processing/events/218816482/?action=detaileventId=218816482

We meet every month in East Bay (Emeryville, CA). I am looking for someone
to give a talk about Spark for the next meetup (Feb 5th)

Let me know if you are interested in giving a talk.

Thanks,

-- Siva Jagadeesan


Re: Exception in connection from worker to worker

2015-01-21 Thread vantoniuk
I have temporary fix for my case. My sample file was 2G / 50M lines in size.
My initial configuration was 1000 splits.

Based on my understanding of distributed algorithms, number of splits can
affect the memory pressure in operations such as distinct and reduceByKey.
So i tried to reduce the number of splits from 1000 to 100. Now I can run
distinct and reduceByKey on files that are 2G / 50M lines.

Unfortunately it still doesn't scale well.

Thanks. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Exception-in-connection-from-worker-to-worker-tp20983p21302.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to use more executors

2015-01-21 Thread Larry Liu
Will  SPARK-1706 be included in next release?

On Wed, Jan 21, 2015 at 2:50 PM, Ted Yu yuzhih...@gmail.com wrote:

 Please see SPARK-1706

 On Wed, Jan 21, 2015 at 2:43 PM, Larry Liu larryli...@gmail.com wrote:

 I tried to submit a job with  --conf spark.cores.max=6
  or --total-executor-cores 6 on a standalone cluster. But I don't see more
 than 1 executor on each worker. I am wondering how to use multiple
 executors when submitting jobs.

 Thanks
 larry





Re: Spark 1.1 (slow, working), Spark 1.2 (fast, freezing)

2015-01-21 Thread Tassilo Klein
I set spark.python.worker.reuse = false and now it seems to run longer than
before (it has not crashed yet). However, it is very very slow. How to
proceed?

On Wed, Jan 21, 2015 at 2:21 AM, Davies Liu dav...@databricks.com wrote:

 Could you try to disable the new feature of reused worker by:
 spark.python.worker.reuse = false

 On Tue, Jan 20, 2015 at 11:12 PM, Tassilo Klein tjkl...@bwh.harvard.edu
 wrote:
  Hi,
 
  It's a bit of a longer script that runs some deep learning training.
  Therefore it is a bit hard to wrap up easily.
 
  Essentially I am having a loop, in which a gradient is computed on each
 node
  and collected (this is where it freezes at some point).
 
   grads = zipped_trainData.map(distributed_gradient_computation).collect()
 
 
  The distributed_gradient_computation mainly contains a Theano derived
  function. The theano function itself is a broadcast variable.
 
  Let me know if you need more information.
 
  Best,
   Tassilo
 
  On Wed, Jan 21, 2015 at 1:17 AM, Davies Liu dav...@databricks.com
 wrote:
 
  Could you provide a short script to reproduce this issue?
 
  On Tue, Jan 20, 2015 at 9:00 PM, TJ Klein tjkl...@gmail.com wrote:
   Hi,
  
   I just recently tried to migrate from Spark 1.1 to Spark 1.2 - using
   PySpark. Initially, I was super glad, noticing that Spark 1.2 is way
   faster
   than Spark 1.1. However, the initial joy faded quickly when I noticed
   that
   all my stuff didn't successfully terminate operations anymore. Using
   Spark
   1.1 it still works perfectly fine, though.
   Specifically, the execution just freezes without any error output at
 one
   point, when calling a joint map() and collect() statement (after
 having
   it
   called many times successfully before in a loop).
  
   Any clue? Or do I have to wait for the next version?
  
   Best,
Tassilo
  
  
  
   --
   View this message in context:
  
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-1-slow-working-Spark-1-2-fast-freezing-tp21278.html
   Sent from the Apache Spark User List mailing list archive at
 Nabble.com.
  
   -
   To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
   For additional commands, e-mail: user-h...@spark.apache.org
  
 
 


 The information in this e-mail is intended only for the person to whom it
 is
 addressed. If you believe this e-mail was sent to you in error and the
 e-mail
 contains patient information, please contact the Partners Compliance
 HelpLine at
 http://www.partners.org/complianceline . If the e-mail was sent to you in
 error
 but does not contain patient information, please contact the sender and
 properly
 dispose of the e-mail.



reading a csv dynamically

2015-01-21 Thread daze5112
Hi all, im currently reading a csv file shich has the following format:
(String, Double, Double,Double, Double, Double)
and can map this no problems using:

val dataRDD = sc.textFile(file.csv).
map(_.split (,)).
map(a= (Array(a(0)), Array(a(1).toDouble, a(2).toDouble), a(3),
Array(a(4).toDouble, a(5).toDouble)))

What i would like to do is because the input file may have a different
number of fields ie it might have an extra double which needs to go in the
first array of doubles ie:

(String, Double, Double,Double, Double,Double, Double)
which would see my map as
val dataRDD = sc.textFile(file.csv).
map(_.split (,)).
map(a= (Array(a(0)), Array(a(1).toDouble, a(2).toDouble,
a(3).toDouble), a(4), Array(a(5).toDouble, a(6).toDouble)))

Is there a way i can make this map more dynamic ie if i create vals:
val Array_1 = 3
val Array_2 = 2

Then use these to pick up the values for array 1 which we know should
contain 3 values and say okay give me a(1) through to a(3)

thanks in advance




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/reading-a-csv-dynamically-tp21304.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to use more executors

2015-01-21 Thread Ted Yu
Please see SPARK-1706

On Wed, Jan 21, 2015 at 2:43 PM, Larry Liu larryli...@gmail.com wrote:

 I tried to submit a job with  --conf spark.cores.max=6
  or --total-executor-cores 6 on a standalone cluster. But I don't see more
 than 1 executor on each worker. I am wondering how to use multiple
 executors when submitting jobs.

 Thanks
 larry



Announcing SF / East Bay Area Stream Processing Meetup

2015-01-21 Thread Siva Jagadeesan
Hi All

I have been running Bay Area Storm meetup for almost 2 years. Instead of
having meetups for storm and spark, I changed the storm meetup to be stream
processing meetup where we can discuss about all stream processing
frameworks.

http://www.meetup.com/Bay-Area-Stream-Processing/events/218816482/?action=detaileventId=218816482

We meet every month in East Bay (Emeryville, CA). I am looking for someone
to give a talk about Spark for the next meetup (Feb 5th)

Let me know if you are interested in giving a talk.

Thanks,

-- Siva Jagadeesan


spark 1.1.0 save data to hdfs failed

2015-01-21 Thread ey-chih chow
Hi,  

I used the following fragment of a scala program to save data to hdfs:

contextAwareEvents
.map(e = (new AvroKey(e), null))
.saveAsNewAPIHadoopFile(hdfs:// + masterHostname + :9000/ETL/output/
+ dateDir,
classOf[AvroKey[GenericRecord]],
classOf[NullWritable],
classOf[AvroKeyOutputFormat[GenericRecord]],
job.getConfiguration)

But it failed with the following error messages.  Is there any people who
can help?  Thanks.

Ey-Chih Chow

=

Exception in thread main java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:40)
at 
org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
Caused by: java.io.IOException: Failed on local exception:
java.io.EOFException; Host Details : local host is:
ip-10-33-140-157/10.33.140.157; destination host is:
ec2-54-203-58-2.us-west-2.compute.amazonaws.com:9000; 
at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:764)
at org.apache.hadoop.ipc.Client.call(Client.java:1415)
at org.apache.hadoop.ipc.Client.call(Client.java:1364)
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
at com.sun.proxy.$Proxy14.getFileInfo(Unknown Source)
at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:744)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy15.getFileInfo(Unknown Source)
at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:1925)
at
org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1079)
at
org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1075)
at
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at
org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1075)
at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1400)
at
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:145)
at
org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:900)
at
org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopFile(PairRDDFunctions.scala:832)
at com.crowdstar.etl.ParseAndClean$.main(ParseAndClean.scala:101)
at com.crowdstar.etl.ParseAndClean.main(ParseAndClean.scala)
... 6 more
Caused by: java.io.EOFException
at java.io.DataInputStream.readInt(DataInputStream.java:392)
at
org.apache.hadoop.ipc.Client$Connection.receiveRpcResponse(Client.java:1055)
at org.apache.hadoop.ipc.Client$Connection.run(Client.java:950)

===





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-1-1-0-save-data-to-hdfs-failed-tp21305.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark 1.1.0 save data to hdfs failed

2015-01-21 Thread Ted Yu
What hdfs release are you using ?

Can you check namenode log around time of error below to see if there is
some clue ?

Cheers

On Wed, Jan 21, 2015 at 4:51 PM, ey-chih chow eyc...@hotmail.com wrote:

 Hi,

 I used the following fragment of a scala program to save data to hdfs:

 contextAwareEvents
 .map(e = (new AvroKey(e), null))
 .saveAsNewAPIHadoopFile(hdfs:// + masterHostname +
 :9000/ETL/output/
 + dateDir,
 classOf[AvroKey[GenericRecord]],
 classOf[NullWritable],
 classOf[AvroKeyOutputFormat[GenericRecord]],
 job.getConfiguration)

 But it failed with the following error messages.  Is there any people who
 can help?  Thanks.

 Ey-Chih Chow

 =

 Exception in thread main java.lang.reflect.InvocationTargetException
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at

 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at

 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at
 org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:40)
 at
 org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
 Caused by: java.io.IOException: Failed on local exception:
 java.io.EOFException; Host Details : local host is:
 ip-10-33-140-157/10.33.140.157; destination host is:
 ec2-54-203-58-2.us-west-2.compute.amazonaws.com:9000;
 at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:764)
 at org.apache.hadoop.ipc.Client.call(Client.java:1415)
 at org.apache.hadoop.ipc.Client.call(Client.java:1364)
 at

 org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
 at com.sun.proxy.$Proxy14.getFileInfo(Unknown Source)
 at

 org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:744)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at

 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at

 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at

 org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
 at

 org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
 at com.sun.proxy.$Proxy15.getFileInfo(Unknown Source)
 at
 org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:1925)
 at

 org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1079)
 at

 org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1075)
 at

 org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
 at

 org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1075)
 at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1400)
 at

 org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:145)
 at

 org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:900)
 at

 org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopFile(PairRDDFunctions.scala:832)
 at com.crowdstar.etl.ParseAndClean$.main(ParseAndClean.scala:101)
 at com.crowdstar.etl.ParseAndClean.main(ParseAndClean.scala)
 ... 6 more
 Caused by: java.io.EOFException
 at java.io.DataInputStream.readInt(DataInputStream.java:392)
 at

 org.apache.hadoop.ipc.Client$Connection.receiveRpcResponse(Client.java:1055)
 at org.apache.hadoop.ipc.Client$Connection.run(Client.java:950)

 ===





 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/spark-1-1-0-save-data-to-hdfs-failed-tp21305.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




How to use more executors

2015-01-21 Thread Larry Liu
I tried to submit a job with  --conf spark.cores.max=6
 or --total-executor-cores 6 on a standalone cluster. But I don't see more
than 1 executor on each worker. I am wondering how to use multiple
executors when submitting jobs.

Thanks
larry


Re: Discourse: A proposed alternative to the Spark User list

2015-01-21 Thread Nicholas Chammas
I think a few things need to be laid out clearly:

   1. This mailing list is the “official” user discussion platform. That
   is, it is sponsored and managed by the ASF.
   2. Users are free to organize independent discussion platforms focusing
   on Spark, and there is already one such platform in Stack Overflow under
   the apache-spark and related tags. Stack Overflow works quite well.
   3. The ASF will not agree to deprecating or migrating this user list to
   a platform that they do not control.
   4. This mailing list has grown to an unwieldy size and discussions are
   hard to find or follow; discussion tooling is also lacking. We want to
   improve the utility and user experience of this mailing list.
   5. We don’t want to fragment this “official” discussion community.
   6. Nabble is an independent product not affiliated with the ASF. It
   offers a slightly better interface to the Apache mailing list archives.

So to respond to some of your points, pzecevic:

Apache user group could be frozen (not accepting new questions, if that’s
possible) and redirect users to Stack Overflow (automatic reply?).

From what I understand of the ASF’s policies, this is not possible. :( This
mailing list must remain the official Spark user discussion platform.

Other thing, about new Stack Exchange site I proposed earlier. If a new
site is created, there is no problem with guidelines, I think, because
Spark community can apply different guidelines for the new site.

I think Stack Overflow and the various Spark tags are working fine. I don’t
see a compelling need for a Stack Exchange dedicated to Spark, either now
or in the near future. Also, I doubt a Spark-specific site can pass the 4
tests in the Area 51 FAQ http://area51.stackexchange.com/faq:

   - Almost all Spark questions are on-topic for Stack Overflow
   - Stack Overflow already exists, it already has a tag for Spark, and
   nobody is complaining
   - You’re not creating such a big group that you don’t have enough
   experts to answer all possible questions
   - There’s a high probability that users of Stack Overflow would enjoy
   seeing the occasional question about Spark

I think complaining won’t be sufficient. :)

Someone expressed a concern that they won’t allow creating a
project-specific site, but there already exist some project-specific sites,
like Tor, Drupal, Ubuntu…

The communities for these projects are many, many times larger than the
Spark community is or likely ever will be, simply due to the nature of the
problems they are solving.

What we need is an improvement to this mailing list. We need better tooling
than Nabble to sit on top of the Apache archives, and we also need some way
to control the volume and quality of mail on the list so that it remains a
useful resource for the majority of users.

Nick
​

On Wed Jan 21 2015 at 3:13:21 PM pzecevic petar.zece...@gmail.com wrote:

 Hi,
 I tried to find the last reply by Nick Chammas (that I received in the
 digest) using the Nabble web interface, but I cannot find it (perhaps he
 didn't reply directly to the user list?). That's one example of Nabble's
 usability.

 Anyhow, I wanted to add my two cents...

 Apache user group could be frozen (not accepting new questions, if that's
 possible) and redirect users to Stack Overflow (automatic reply?). Old
 questions remain (and are searchable) on Nabble, new questions go to Stack
 Exchange, so no need for migration. That's the idea, at least, as I'm not
 sure if that's technically doable... Is it?
 dev mailing list could perhaps stay on Nabble (it's not that busy), or have
 a special tag on Stack Exchange.

 Other thing, about new Stack Exchange site I proposed earlier. If a new
 site
 is created, there is no problem with guidelines, I think, because Spark
 community can apply different guidelines for the new site.

 There is a FAQ about creating new sites: http://area51.stackexchange.
 com/faq
 It says: Stack Exchange sites are free to create and free to use. All we
 ask is that you have an enthusiastic, committed group of expert users who
 check in regularly, asking and answering questions.
 I think this requirement is satisfied...
 Someone expressed a concern that they won't allow creating a
 project-specific site, but there already exist some project-specific sites,
 like Tor, Drupal, Ubuntu...

 Later, though, the FAQ also says:
 If Y already exists, it already has a tag for X, and nobody is
 complaining
 (then you should not create a new site). But we could complain :)

 The advantage of having a separate site is that users, who should have more
 privileges, would need to earn them through Spark questions and answers
 only. The other thing, already mentioned, is that the community could
 create
 Spark specific guidelines. There are also  'meta' sites for asking
 questions
 like this one, etc.

 There is a process for starting a site - it's not instantaneous. New site
 needs to go through private beta and public beta, so that could be a

Re: Saving a mllib model in Spark SQL

2015-01-21 Thread Divyansh Jain
Hey,


Thanks Xiangrui Meng and Cheng Lian for your valuable suggestions.
It works!

Divyansh Jain.

On Tue, January 20, 2015 2:49 pm, Xiangrui Meng wrote:
 You can save the cluster centers as a SchemaRDD of two columns (id:
 Int, center: Array[Double]). When you load it back, you can construct
 the k-means model from its cluster centers. -Xiangrui

 On Tue, Jan 20, 2015 at 11:55 AM, Cheng Lian lian.cs@gmail.com
 wrote:

 This is because KMeanModel is neither a built-in type nor a user
 defined type recognized by Spark SQL. I think you can write your own UDT
 version of KMeansModel in this case. You may refer to
 o.a.s.mllib.linalg.Vector and o.a.s.mllib.linalg.VectorUDT as an
 example.

 Cheng


 On 1/20/15 5:34 AM, Divyansh Jain wrote:


 Hey people,


 I have run into some issues regarding saving the k-means mllib model in
  Spark SQL by converting to a schema RDD. This is what I am doing:


 case class Model(id: String, model:
 org.apache.spark.mllib.clustering.KMeansModel)     import
 sqlContext.createSchemaRDD     val rowRdd = sc.makeRDD(Seq(id,
 model)).map(p = Model(id, model))

 This is the error that I get :


 scala.MatchError:
 org.apache.spark.mllib.classification.ClassificationModel (of class
 scala.reflect.internal.Types$TypeRef$anon$6)   at
 org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflectio
 n.scala:53)
   at
 org.apache.spark.sql.catalyst.ScalaReflection$anonfun$schemaFor$1.apply
 (ScalaReflection.scala:64)
   at
 org.apache.spark.sql.catalyst.ScalaReflection$anonfun$schemaFor$1.apply
 (ScalaReflection.scala:62)
   at
 scala.collection.TraversableLike$anonfun$map$1.apply(TraversableLike.sc
 ala:244)
   at
 scala.collection.TraversableLike$anonfun$map$1.apply(TraversableLike.sc
 ala:244)
   at scala.collection.immutable.List.foreach(List.scala:318)   at
 scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
   at scala.collection.AbstractTraversable.map(Traversable.scala:105)
    at
 org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflectio
 n.scala:62)
   at
 org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflectio
 n.scala:50)
   at
 org.apache.spark.sql.catalyst.ScalaReflection$.attributesFor(ScalaRefle
 ction.scala:44)
   at
 org.apache.spark.sql.execution.ExistingRdd$.fromProductRdd(basicOperato
 rs.scala:229)
   at
 org.apache.spark.sql.SQLContext.createSchemaRDD(SQLContext.scala:94)


 Any help would be appreciated. Thanks!








 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Saving-a-mllib-model
 -in-Spark-SQL-tp21264.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Is Apache Spark less accurate than Scikit Learn?

2015-01-21 Thread Jacques Heunis
Ah I see, thanks!
I was just confused because given the same configuration, I would have
thought that Spark and Scikit would give more similar results, but I guess
this is simply not the case (as in your example, in order to get spark to
give an mse sufficiently close to scikit's you have to give it a
significantly larger step and iteration count).

Would that then be a result of MLLib and Scikit differing slightly in their
exact implementation of the optimizer? Or rather a case of (as you say)
Scikit being a far more mature system (and therefore that MLLib would 'get
better' over time)? Surely it is far from ideal that to get the same
results you need more iterations (IE more computation), or do you think
that that is simply coincidence and that given a different model/dataset it
may be the other way around?

I ask because I encountered this situation on other, larger datasets, so
this is not an isolated case (though being the simplest example I could
think of I would imagine that it's somewhat indicative of general behaviour)

On Thu, Jan 22, 2015 at 1:57 AM, Robin East robin.e...@xense.co.uk wrote:

 I don’t get those results. I get:

 spark   0.14
 scikit-learn0.85

 The scikit-learn mse is due to the very low eta0 setting. Tweak that to
 0.1 and push iterations to 400 and you get a mse ~= 0. Of course the
 coefficients are both ~1 and the intercept ~0. Similarly if you change the
 mllib step size to 0.5 and number of iterations to 1200 you again get a
 very low mse. One of the issues with SGD is you have to tweak these
 parameters to tune the algorithm.

 FWIW I wouldn’t see Spark MLlib as a replacement for scikit-learn. MLLib
 is nowhere as mature as scikit learn. However if you have large datasets
 that won’t sensibly fit the scikit-learn in-core model MLLib is one of the
 top choices. Similarly if you are running proof of concepts that you are
 eventually going to scale up to production environments then there is a
 definite argument for using MLlib at both the PoC and production stages.


 On 21 Jan 2015, at 20:39, JacquesH jaaksem...@gmail.com wrote:

  I've recently been trying to get to know Apache Spark as a replacement
 for
  Scikit Learn, however it seems to me that even in simple cases, Scikit
  converges to an accurate model far faster than Spark does.
  For example I generated 1000 data points for a very simple linear
 function
  (z=x+y) with the following script:
 
  http://pastebin.com/ceRkh3nb
 
  I then ran the following Scikit script:
 
  http://pastebin.com/1aECPfvq
 
  And then this Spark script: (with spark-submit filename, no other
  arguments)
 
  http://pastebin.com/s281cuTL
 
  Strangely though, the error given by spark is an order of magnitude
 larger
  than that given by Scikit (0.185 and 0.045 respectively) despite the two
  models having a nearly identical setup (as far as I can tell)
  I understand that this is using SGD with very few iterations and so the
  results may differ but I wouldn't have thought that it would be anywhere
  near such a large difference or such a large error, especially given the
  exceptionally simple data.
 
  Is there something I'm misunderstanding in Spark? Is it not correctly
  configured? Surely I should be getting a smaller error than that?
 
 
 
  --
  View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Is-Apache-Spark-less-accurate-than-Scikit-Learn-tp21301.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 




Is there a way to delete hdfs file/directory using spark API?

2015-01-21 Thread LinQili
Hi, allI wonder how to delete hdfs file/directory using spark API?  
  

Re: Spark 1.1 (slow, working), Spark 1.2 (fast, freezing)

2015-01-21 Thread Davies Liu
Because that you have large broadcast, they need to be loaded into
Python worker for each tasks, if the worker is not reused.

We will really appreciate that if you could provide a short script to
reproduce the freeze, then we can investigate the root cause and fix
it. Also, fire a JIRA for it, thanks!

On Wed, Jan 21, 2015 at 4:56 PM, Tassilo Klein tjkl...@gmail.com wrote:
 I set spark.python.worker.reuse = false and now it seems to run longer than
 before (it has not crashed yet). However, it is very very slow. How to
 proceed?

 On Wed, Jan 21, 2015 at 2:21 AM, Davies Liu dav...@databricks.com wrote:

 Could you try to disable the new feature of reused worker by:
 spark.python.worker.reuse = false

 On Tue, Jan 20, 2015 at 11:12 PM, Tassilo Klein tjkl...@bwh.harvard.edu
 wrote:
  Hi,
 
  It's a bit of a longer script that runs some deep learning training.
  Therefore it is a bit hard to wrap up easily.
 
  Essentially I am having a loop, in which a gradient is computed on each
  node
  and collected (this is where it freezes at some point).
 
   grads =
  zipped_trainData.map(distributed_gradient_computation).collect()
 
 
  The distributed_gradient_computation mainly contains a Theano derived
  function. The theano function itself is a broadcast variable.
 
  Let me know if you need more information.
 
  Best,
   Tassilo
 
  On Wed, Jan 21, 2015 at 1:17 AM, Davies Liu dav...@databricks.com
  wrote:
 
  Could you provide a short script to reproduce this issue?
 
  On Tue, Jan 20, 2015 at 9:00 PM, TJ Klein tjkl...@gmail.com wrote:
   Hi,
  
   I just recently tried to migrate from Spark 1.1 to Spark 1.2 - using
   PySpark. Initially, I was super glad, noticing that Spark 1.2 is way
   faster
   than Spark 1.1. However, the initial joy faded quickly when I noticed
   that
   all my stuff didn't successfully terminate operations anymore. Using
   Spark
   1.1 it still works perfectly fine, though.
   Specifically, the execution just freezes without any error output at
   one
   point, when calling a joint map() and collect() statement (after
   having
   it
   called many times successfully before in a loop).
  
   Any clue? Or do I have to wait for the next version?
  
   Best,
Tassilo
  
  
  
   --
   View this message in context:
  
   http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-1-slow-working-Spark-1-2-fast-freezing-tp21278.html
   Sent from the Apache Spark User List mailing list archive at
   Nabble.com.
  
   -
   To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
   For additional commands, e-mail: user-h...@spark.apache.org
  
 
 


 The information in this e-mail is intended only for the person to whom it
 is
 addressed. If you believe this e-mail was sent to you in error and the
 e-mail
 contains patient information, please contact the Partners Compliance
 HelpLine at
 http://www.partners.org/complianceline . If the e-mail was sent to you in
 error
 but does not contain patient information, please contact the sender and
 properly
 dispose of the e-mail.



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark 1.1 (slow, working), Spark 1.2 (fast, freezing)

2015-01-21 Thread Tassilo Klein
What do you suggest? Should I send you the script so you can run it
yourself?
 Yes, my broadcast variables are fairly large (1.7 MBytes).

On Wed, Jan 21, 2015 at 8:20 PM, Davies Liu dav...@databricks.com wrote:

 Because that you have large broadcast, they need to be loaded into
 Python worker for each tasks, if the worker is not reused.

 We will really appreciate that if you could provide a short script to
 reproduce the freeze, then we can investigate the root cause and fix
 it. Also, fire a JIRA for it, thanks!

 On Wed, Jan 21, 2015 at 4:56 PM, Tassilo Klein tjkl...@gmail.com wrote:
  I set spark.python.worker.reuse = false and now it seems to run longer
 than
  before (it has not crashed yet). However, it is very very slow. How to
  proceed?
 
  On Wed, Jan 21, 2015 at 2:21 AM, Davies Liu dav...@databricks.com
 wrote:
 
  Could you try to disable the new feature of reused worker by:
  spark.python.worker.reuse = false
 
  On Tue, Jan 20, 2015 at 11:12 PM, Tassilo Klein 
 tjkl...@bwh.harvard.edu
  wrote:
   Hi,
  
   It's a bit of a longer script that runs some deep learning training.
   Therefore it is a bit hard to wrap up easily.
  
   Essentially I am having a loop, in which a gradient is computed on
 each
   node
   and collected (this is where it freezes at some point).
  
grads =
   zipped_trainData.map(distributed_gradient_computation).collect()
  
  
   The distributed_gradient_computation mainly contains a Theano derived
   function. The theano function itself is a broadcast variable.
  
   Let me know if you need more information.
  
   Best,
Tassilo
  
   On Wed, Jan 21, 2015 at 1:17 AM, Davies Liu dav...@databricks.com
   wrote:
  
   Could you provide a short script to reproduce this issue?
  
   On Tue, Jan 20, 2015 at 9:00 PM, TJ Klein tjkl...@gmail.com wrote:
Hi,
   
I just recently tried to migrate from Spark 1.1 to Spark 1.2 -
 using
PySpark. Initially, I was super glad, noticing that Spark 1.2 is
 way
faster
than Spark 1.1. However, the initial joy faded quickly when I
 noticed
that
all my stuff didn't successfully terminate operations anymore.
 Using
Spark
1.1 it still works perfectly fine, though.
Specifically, the execution just freezes without any error output
 at
one
point, when calling a joint map() and collect() statement (after
having
it
called many times successfully before in a loop).
   
Any clue? Or do I have to wait for the next version?
   
Best,
 Tassilo
   
   
   
--
View this message in context:
   
   
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-1-slow-working-Spark-1-2-fast-freezing-tp21278.html
Sent from the Apache Spark User List mailing list archive at
Nabble.com.
   
   
 -
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org
   
  
  
 
 
  The information in this e-mail is intended only for the person to whom
 it
  is
  addressed. If you believe this e-mail was sent to you in error and the
  e-mail
  contains patient information, please contact the Partners Compliance
  HelpLine at
  http://www.partners.org/complianceline . If the e-mail was sent to you
 in
  error
  but does not contain patient information, please contact the sender and
  properly
  dispose of the e-mail.
 
 



Re: reading a csv dynamically

2015-01-21 Thread Pankaj Narang
Yes I think you need to create one map first which will keep the number of
values in every line. Now you can group all the records with same number of
values. Now you know how many types of arrays you will have.


val dataRDD = sc.textFile(file.csv) 
val dataLengthRDD =   dataRDD .map(line=(_.split(,).length,line))
val groupedData = dataLengthRDD.groupByKey()

now you can process the groupedData as it will have arrays of length x in
one RDD.

groupByKey([numTasks])  When called on a dataset of (K, V) pairs, returns a
dataset of (K, IterableV) pairs. 


I hope this helps

Regards
Pankaj 
Infoshore Software
India




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/reading-a-csv-dynamically-tp21304p21307.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark 1.2 – How to change Default (Random) port ….

2015-01-21 Thread Shailesh Birari
Hello,

Recently, I have upgraded my setup to Spark 1.2 from Spark 1.1.

I have 4 node Ubuntu Spark Cluster.
With Spark 1.1, I used to write Spark Scala program in Eclipse on my Windows
development host and submit the job on Ubuntu Cluster, from Eclipse (Windows
machine).

As on my network not all ports between Spark cluster and development machine
are open, I set spark process ports to valid ports. 
On Spark 1.1 this works perfectly.

When I try to run the same program with same user defined ports on Spark 1.2
cluster it gives me connection time out for port *56117*.

I referred the Spark 1.2 configuration page
(http://spark.apache.org/docs/1.2.0/configuration.html) but there are no new
ports mentioned.

*Here is my code for reference:*
   
val conf = new SparkConf()
.setMaster(sparkMaster)
.setAppName(Spark SVD)

.setSparkHome(/usr/local/spark)
.setJars(jars)
  .set(spark.driver.host, 
consb2a)  //Windows host
(Development machine)
.set(spark.driver.port, 
51810)
.set(spark.fileserver.port, 
51811)
.set(spark.broadcast.port, 
51812)

.set(spark.replClassServer.port, 51813)
.set(spark.blockManager.port, 
51814)
.set(spark.executor.port, 
51815)
.set(spark.executor.memory, 
2g)
.set(spark.driver.memory, 
4g)
val sc = new SparkContext(conf)

*Here is Exception:*
15/01/21 15:44:08 INFO BlockManagerMasterActor: Registering block manager
wynchcs217.wyn.cnw.co.nz:37173 with 1059.9 MB RAM, BlockManagerId(2,
wynchcs217.wyn.cnw.co.nz, 37173)
15/01/21 15:44:08 INFO BlockManagerMasterActor: Registering block manager
wynchcs219.wyn.cnw.co.nz:53850 with 1059.9 MB RAM, BlockManagerId(1,
wynchcs219.wyn.cnw.co.nz, 53850)
15/01/21 15:44:08 INFO BlockManagerMasterActor: Registering block manager
wynchcs220.wyn.cnw.co.nz:35670 with 1060.3 MB RAM, BlockManagerId(0,
wynchcs220.wyn.cnw.co.nz, 35670)
15/01/21 15:44:08 INFO BlockManagerMasterActor: Registering block manager
wynchcs218.wyn.cnw.co.nz:46890 with 1059.9 MB RAM, BlockManagerId(3,
wynchcs218.wyn.cnw.co.nz, 46890)
15/01/21 15:52:23 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0,
wynchcs217.wyn.cnw.co.nz): java.io.IOException: Connecting to
CONSB2A.cnw.co.nz/143.96.130.27:56117 timed out (12 ms)
at
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:188)
at
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
at
org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78)
at
org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
at
org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
at
org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
at java.util.concurrent.FutureTask.run(FutureTask.java:166)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1146)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:701)

15/01/21 15:52:23 INFO TaskSetManager: Starting task 0.1 in stage 0.0 (TID
2, wynchcs220.wyn.cnw.co.nz, NODE_LOCAL, 1366 bytes)
15/01/21 15:55:35 INFO TaskSchedulerImpl: Cancelling stage 0
15/01/21 15:55:35 INFO TaskSchedulerImpl: Stage 0 was cancelled
15/01/21 15:55:35 INFO DAGScheduler: Job 0 failed: count at
RowMatrix.scala:76, took 689.331309 s
Exception in thread main org.apache.spark.SparkException: Job 0 cancelled
because Stage 0 was cancelled


Can you please let me know how can I define the port 56117 to some other
port ?

Thanks,
  Shailesh





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-2-How-to-change-Default-Random-port-tp21306.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Discourse: A proposed alternative to the Spark User list

2015-01-21 Thread Bob Tiernay
Very well stated. Thanks for putting in the effort to formalize your thoughts 
of which I agree entirely.
How are these type of decisions made traditionally in the Spark community? Is 
there a formal process? What's the next step?
Thanks again

From: nicholas.cham...@gmail.com
Date: Thu, 22 Jan 2015 02:55:33 +
Subject: Re: Discourse: A proposed alternative to the Spark User list
To: petar.zece...@gmail.com; user@spark.apache.org

I think a few things need to be laid out clearly:

This mailing list is the “official” user discussion platform. That is, it is 
sponsored and managed by the ASF.
Users are free to organize independent discussion platforms focusing on Spark, 
and there is already one such platform in Stack Overflow under the apache-spark 
and related tags. Stack Overflow works quite well.
The ASF will not agree to deprecating or migrating this user list to a platform 
that they do not control.
This mailing list has grown to an unwieldy size and discussions are hard to 
find or follow; discussion tooling is also lacking. We want to improve the 
utility and user experience of this mailing list.
We don’t want to fragment this “official” discussion community.
Nabble is an independent product not affiliated with the ASF. It offers a 
slightly better interface to the Apache mailing list archives. 

So to respond to some of your points, pzecevic:

Apache user group could be frozen (not accepting new questions, if that’s 
possible) and redirect users to Stack Overflow (automatic reply?).

From what I understand of the ASF’s policies, this is not possible. :( This 
mailing list must remain the official Spark user discussion platform.

Other thing, about new Stack Exchange site I proposed earlier. If a new site is 
created, there is no problem with guidelines, I think, because Spark community 
can apply different guidelines for the new site.

I think Stack Overflow and the various Spark tags are working fine. I don’t see 
a compelling need for a Stack Exchange dedicated to Spark, either now or in the 
near future. Also, I doubt a Spark-specific site can pass the 4 tests in the 
Area 51 FAQ:

Almost all Spark questions are on-topic for Stack Overflow
Stack Overflow already exists, it already has a tag for Spark, and nobody is 
complaining
You’re not creating such a big group that you don’t have enough experts to 
answer all possible questions
There’s a high probability that users of Stack Overflow would enjoy seeing the 
occasional question about Spark

I think complaining won’t be sufficient. :)

Someone expressed a concern that they won’t allow creating a project-specific 
site, but there already exist some project-specific sites, like Tor, Drupal, 
Ubuntu…

The communities for these projects are many, many times larger than the Spark 
community is or likely ever will be, simply due to the nature of the problems 
they are solving.
What we need is an improvement to this mailing list. We need better tooling 
than Nabble to sit on top of the Apache archives, and we also need some way to 
control the volume and quality of mail on the list so that it remains a useful 
resource for the majority of users.
Nick
​
On Wed Jan 21 2015 at 3:13:21 PM pzecevic petar.zece...@gmail.com wrote:
Hi,

I tried to find the last reply by Nick Chammas (that I received in the

digest) using the Nabble web interface, but I cannot find it (perhaps he

didn't reply directly to the user list?). That's one example of Nabble's

usability.



Anyhow, I wanted to add my two cents...



Apache user group could be frozen (not accepting new questions, if that's

possible) and redirect users to Stack Overflow (automatic reply?). Old

questions remain (and are searchable) on Nabble, new questions go to Stack

Exchange, so no need for migration. That's the idea, at least, as I'm not

sure if that's technically doable... Is it?

dev mailing list could perhaps stay on Nabble (it's not that busy), or have

a special tag on Stack Exchange.



Other thing, about new Stack Exchange site I proposed earlier. If a new site

is created, there is no problem with guidelines, I think, because Spark

community can apply different guidelines for the new site.



There is a FAQ about creating new sites: http://area51.stackexchange.com/faq

It says: Stack Exchange sites are free to create and free to use. All we

ask is that you have an enthusiastic, committed group of expert users who

check in regularly, asking and answering questions.

I think this requirement is satisfied...

Someone expressed a concern that they won't allow creating a

project-specific site, but there already exist some project-specific sites,

like Tor, Drupal, Ubuntu...



Later, though, the FAQ also says:

If Y already exists, it already has a tag for X, and nobody is complaining

(then you should not create a new site). But we could complain :)



The advantage of having a separate site is that users, who should have more

privileges, would need to earn them through Spark 

Re: Confused why I'm losing workers/executors when writing a large file to S3

2015-01-21 Thread Tsai Li Ming
I’m getting the same issue on Spark 1.2.0. Despite having set 
“spark.core.connection.ack.wait.timeout” in spark-defaults.conf and verified in 
the job UI (port 4040) environment tab, I still get the “no heartbeat in 60 
seconds” error. 

spark.core.connection.ack.wait.timeout=3600

15/01/22 07:29:36 WARN master.Master: Removing 
worker-20150121231529-numaq1-4-34948 because we got no heartbeat in 60 seconds


On 14 Nov, 2014, at 3:04 pm, Reynold Xin r...@databricks.com wrote:

 Darin,
 
 You might want to increase these config options also:
 
 spark.akka.timeout 300
 spark.storage.blockManagerSlaveTimeoutMs 30
 
 On Thu, Nov 13, 2014 at 11:31 AM, Darin McBeath ddmcbe...@yahoo.com.invalid 
 wrote:
 For one of my Spark jobs, my workers/executors are dying and leaving the 
 cluster.
 
 On the master, I see something like the following in the log file.  I'm 
 surprised to see the '60' seconds in the master log below because I 
 explicitly set it to '600' (or so I thought) in my spark job (see below).   
 This is happening at the end of my job when I'm trying to persist a large RDD 
 (probably around 300+GB) back to S3 (in 256 partitions).  My cluster consists 
 of 6 r3.8xlarge machines.  The job successfully works when I'm outputting 
 100GB or 200GB.
 
 If  you have any thoughts/insights, it would be appreciated. 
 
 Thanks.
 
 Darin.
 
 Here is where I'm setting the 'timeout' in my spark job.
 
 SparkConf conf = new SparkConf()
 .setAppName(SparkSync Application)
 .set(spark.serializer, org.apache.spark.serializer.KryoSerializer)
 .set(spark.rdd.compress,true)   
 .set(spark.core.connection.ack.wait.timeout,600);
 ​
 On the master, I see the following in the log file.
 
 4/11/13 17:20:39 WARN master.Master: Removing 
 worker-20141113134801-ip-10-35-184-232.ec2.internal-51877 because we got no 
 heartbeat in 60 seconds
 14/11/13 17:20:39 INFO master.Master: Removing worker 
 worker-20141113134801-ip-10-35-184-232.ec2.internal-51877 on 
 ip-10-35-184-232.ec2.internal:51877
 14/11/13 17:20:39 INFO master.Master: Telling app of lost executor: 2
 
 On a worker, I see something like the following in the log file.
 
 14/11/13 17:20:58 WARN util.AkkaUtils: Error sending message in 1 attempts
 java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
   at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
   at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
   at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
   at 
 scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
   at scala.concurrent.Await$.result(package.scala:107)
   at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:176)
   at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:362)
 14/11/13 17:21:11 INFO httpclient.HttpMethodDirector: I/O exception 
 (java.net.SocketException) caught when processing request: Broken pipe
 14/11/13 17:21:11 INFO httpclient.HttpMethodDirector: Retrying request
 14/11/13 17:21:32 INFO httpclient.HttpMethodDirector: I/O exception 
 (java.net.SocketException) caught when processing request: Broken pipe
 14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: Retrying request
 14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: I/O exception 
 (java.io.IOException) caught when processing request: Resetting to invalid 
 mark
 14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: Retrying request
 14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: I/O exception 
 (java.io.IOException) caught when processing request: Resetting to invalid 
 mark
 14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: Retrying request
 14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: I/O exception 
 (java.io.IOException) caught when processing request: Resetting to invalid 
 mark
 14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: Retrying request
 14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: I/O exception 
 (java.io.IOException) caught when processing request: Resetting to invalid 
 mark
 14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: Retrying request
 14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: I/O exception 
 (java.io.IOException) caught when processing request: Resetting to invalid 
 mark
 14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: Retrying request
 14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: I/O exception 
 (java.io.IOException) caught when processing request: Resetting to invalid 
 mark
 14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: Retrying request
 14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: I/O exception 
 (java.io.IOException) caught when processing request: Resetting to invalid 
 mark
 14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: Retrying request
 14/11/13 17:21:34 INFO httpclient.HttpMethodDirector: I/O exception 
 (java.io.IOException) caught when processing request: Resetting to invalid 
 mark
 14/11/13 17:21:34 INFO 

Re: Exception in connection from worker to worker

2015-01-21 Thread Akhil Das
Can you try the following:

- Use Kryo Serializer
- Enable RDD Compression
- Repartition the data (Use hash partition, then all the similar keys will
go in the same partition)


Thanks
Best Regards

On Thu, Jan 22, 2015 at 4:05 AM, vantoniuk vita...@jetlore.com wrote:

 I have temporary fix for my case. My sample file was 2G / 50M lines in
 size.
 My initial configuration was 1000 splits.

 Based on my understanding of distributed algorithms, number of splits can
 affect the memory pressure in operations such as distinct and reduceByKey.
 So i tried to reduce the number of splits from 1000 to 100. Now I can run
 distinct and reduceByKey on files that are 2G / 50M lines.

 Unfortunately it still doesn't scale well.

 Thanks.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Exception-in-connection-from-worker-to-worker-tp20983p21302.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark job for demoing Spark metrics monitoring?

2015-01-21 Thread Akhil Das
I think you can easily run twitter popular hashtags
https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala
You
can also save the data into a db and visualize it.

Thanks
Best Regards

On Thu, Jan 22, 2015 at 12:37 AM, Otis Gospodnetic 
otis.gospodne...@gmail.com wrote:

 Hi,

 I'll be showing our Spark monitoring
 http://blog.sematext.com/2014/10/07/apache-spark-monitoring/ at the
 upcoming Spark Summit in NYC.  I'd like to run some/any Spark job that
 really exercises Spark and makes it emit all its various metrics (so the
 metrics charts are full of data and not blank or flat and boring).

 Since we don't use Spark at Sematext yet, I was wondering if anyone could
 recommend some Spark app/job that's easy to run, just to get some Spark job
 to start emitting various Spark metrics?

 Thanks,
 Otis
 --
 Monitoring * Alerting * Anomaly Detection * Centralized Log Management
 Solr  Elasticsearch Support * http://sematext.com/




Re: Is there a way to delete hdfs file/directory using spark API?

2015-01-21 Thread Akhil Das
There is no direct way of doing it, but you can do something like this:

val hadoopConf = ssc.sparkContext.hadoopConfiguration

var hdfs = org.apache.hadoop.fs.FileSystem.get(hadoopConf)


tmp_stream = ssc.textFileStream(/akhld/sigmoid/) // each line will have
hdfs location to be deleted.


tmp_stream.foreachRDD(path = {


try {

  hdfs.delete(new org.apache.hadoop.fs.Path(path), true)

} catch{ case e: Exception =

  println(w00t!! Exception!!HDFS =  + e)

}



  })


Thanks
Best Regards

On Thu, Jan 22, 2015 at 12:15 PM, LinQili lin_q...@outlook.com wrote:

 Hi, all
 I wonder how to delete hdfs file/directory using spark API?



RE: spark 1.1.0 save data to hdfs failed

2015-01-21 Thread ey-chih chow
The hdfs release should be hadoop 1.0.4.
Ey-Chih Chow 

Date: Wed, 21 Jan 2015 16:56:25 -0800
Subject: Re: spark 1.1.0 save data to hdfs failed
From: yuzhih...@gmail.com
To: eyc...@hotmail.com
CC: user@spark.apache.org

What hdfs release are you using ?
Can you check namenode log around time of error below to see if there is some 
clue ?
Cheers
On Wed, Jan 21, 2015 at 4:51 PM, ey-chih chow eyc...@hotmail.com wrote:
Hi,



I used the following fragment of a scala program to save data to hdfs:



contextAwareEvents

.map(e = (new AvroKey(e), null))

.saveAsNewAPIHadoopFile(hdfs:// + masterHostname + :9000/ETL/output/

+ dateDir,

classOf[AvroKey[GenericRecord]],

classOf[NullWritable],

classOf[AvroKeyOutputFormat[GenericRecord]],

job.getConfiguration)



But it failed with the following error messages.  Is there any people who

can help?  Thanks.



Ey-Chih Chow



=



Exception in thread main java.lang.reflect.InvocationTargetException

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at

sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

at

sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:606)

at

org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:40)

at 
org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)

Caused by: java.io.IOException: Failed on local exception:

java.io.EOFException; Host Details : local host is:

ip-10-33-140-157/10.33.140.157; destination host is:

ec2-54-203-58-2.us-west-2.compute.amazonaws.com:9000;

at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:764)

at org.apache.hadoop.ipc.Client.call(Client.java:1415)

at org.apache.hadoop.ipc.Client.call(Client.java:1364)

at

org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)

at com.sun.proxy.$Proxy14.getFileInfo(Unknown Source)

at

org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:744)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at

sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

at

sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:606)

at

org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)

at

org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)

at com.sun.proxy.$Proxy15.getFileInfo(Unknown Source)

at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:1925)

at

org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1079)

at

org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1075)

at

org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)

at

org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1075)

at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1400)

at

org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:145)

at

org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:900)

at

org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopFile(PairRDDFunctions.scala:832)

at com.crowdstar.etl.ParseAndClean$.main(ParseAndClean.scala:101)

at com.crowdstar.etl.ParseAndClean.main(ParseAndClean.scala)

... 6 more

Caused by: java.io.EOFException

at java.io.DataInputStream.readInt(DataInputStream.java:392)

at

org.apache.hadoop.ipc.Client$Connection.receiveRpcResponse(Client.java:1055)

at org.apache.hadoop.ipc.Client$Connection.run(Client.java:950)



===











--

View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-1-1-0-save-data-to-hdfs-failed-tp21305.html

Sent from the Apache Spark User List mailing list archive at Nabble.com.



-

To unsubscribe, e-mail: user-unsubscr...@spark.apache.org

For additional commands, e-mail: user-h...@spark.apache.org




  

Re: Is there a way to delete hdfs file/directory using spark API?

2015-01-21 Thread Raghavendra Pandey
You can use Hadoop Client Api to remove files
https://hadoop.apache.org/docs/current/api/org/apache/hadoop/fs/FileSystem.html#delete(org.apache.hadoop.fs.Path,
boolean). I don't think spark has any wrapper on hadoop filesystem APIs.

On Thu, Jan 22, 2015 at 12:15 PM, LinQili lin_q...@outlook.com wrote:

 Hi, all
 I wonder how to delete hdfs file/directory using spark API?



loading utf16le file with sc.textFile

2015-01-21 Thread Nathan Stott
How can I load a utf16le file with BOM using sc.textFile? Right now I
get a String that is garbled. I can't find documentation on using a
different encoding when loading a text file. Any help is appreciated.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: dynamically change receiver for a spark stream

2015-01-21 Thread Tamas Jambor
thanks for the replies.

is this something we can get around? Tried to hack into the code without
much success.

On Wed, Jan 21, 2015 at 3:15 AM, Shao, Saisai saisai.s...@intel.com wrote:

 Hi,

 I don't think current Spark Streaming support this feature, all the
 DStream lineage is fixed after the context is started.

 Also stopping a stream is not supported, instead currently we need to stop
 the whole streaming context to meet what you want.

 Thanks
 Saisai

 -Original Message-
 From: jamborta [mailto:jambo...@gmail.com]
 Sent: Wednesday, January 21, 2015 3:09 AM
 To: user@spark.apache.org
 Subject: dynamically change receiver for a spark stream

 Hi all,

 we have been trying to setup a stream using a custom receiver that would
 pick up data from sql databases. we'd like to keep that stream context
 running and dynamically change the streams on demand, adding and removing
 streams based on demand. alternativel, if a stream is fixed, is it possible
 to stop a stream, change to config and start again?

 thanks,



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/dynamically-change-receiver-for-a-spark-stream-tp21268.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional
 commands, e-mail: user-h...@spark.apache.org




Are these numbers abnormal for spark streaming?

2015-01-21 Thread Ashic Mahtab
Hi Guys,
I've got Spark Streaming set up for a low data rate system (using spark's 
features for analysis, rather than high throughput). Messages are coming in 
throughout the day, at around 1-20 per second (finger in the air estimate...not 
analysed yet).  In the spark streaming UI for the application, I'm getting the 
following after 17 hours.

StreamingStarted at: Tue Jan 20 16:58:43 GMT 2015Time since start: 18 hours 24 
minutes 34 secondsNetwork receivers: 2Batch interval: 2 secondsProcessed 
batches: 16482Waiting batches: 1

Statistics over last 100 processed batchesReceiver 
StatisticsReceiverStatusLocationRecords in last batch[2015/01/21 
11:23:18]Minimum rate[records/sec]Median rate[records/sec]Maximum 
rate[records/sec]Last ErrorRmqReceiver-0ACTIVEF
144727-RmqReceiver-1ACTIVEBR
124726-Batch Processing StatisticsMetricLast batchMinimum25th 
percentileMedian75th percentileMaximumProcessing Time3 seconds 994 ms157 ms4 
seconds 16 ms4 seconds 961 ms5 seconds 3 ms5 seconds 171 msScheduling Delay9 
hours 15 minutes 4 seconds9 hours 10 minutes 54 seconds9 hours 11 minutes 56 
seconds9 hours 12 minutes 57 seconds9 hours 14 minutes 5 seconds9 hours 15 
minutes 4 secondsTotal Delay9 hours 15 minutes 8 seconds9 hours 10 minutes 58 
seconds9 hours 12 minutes9 hours 13 minutes 2 seconds9 hours 14 minutes 10 
seconds9 hours 15 minutes 8 seconds
Are these normal. I was wondering what the scheduling delay and total delay 
terms are, and if it's normal for them to be 9 hours.

I've got a standalone spark master and 4 spark nodes. The streaming app has 
been given 4 cores, and it's using 1 core per worker node. The streaming app is 
submitted from a 5th machine, and that machine has nothing but the driver 
running. The worker nodes are running alongside Cassandra (and reading and 
writing to it).

Any insights would be appreciated.

Regards,
Ashic.
  

Re: dynamically change receiver for a spark stream

2015-01-21 Thread Tamas Jambor
we were thinking along the same line, that is to fix the number of streams
and change the input and output channels dynamically.

But could not make it work (seems that the receiver is not allowing any
change in the config after it started).

thanks,

On Wed, Jan 21, 2015 at 10:49 AM, Gerard Maas gerard.m...@gmail.com wrote:

 One possible workaround could be to orchestrate launch/stopping of
 Streaming jobs on demand as long as the number of jobs/streams stay
 within the boundaries of the resources (cores) you've available.
 e.g. if you're using Mesos, Marathon offers a REST interface to manage job
 lifecycle. You will still need to solve the dynamic configuration through
 some alternative channel.

 On Wed, Jan 21, 2015 at 11:30 AM, Tamas Jambor jambo...@gmail.com wrote:

 thanks for the replies.

 is this something we can get around? Tried to hack into the code without
 much success.

 On Wed, Jan 21, 2015 at 3:15 AM, Shao, Saisai saisai.s...@intel.com
 wrote:

 Hi,

 I don't think current Spark Streaming support this feature, all the
 DStream lineage is fixed after the context is started.

 Also stopping a stream is not supported, instead currently we need to
 stop the whole streaming context to meet what you want.

 Thanks
 Saisai

 -Original Message-
 From: jamborta [mailto:jambo...@gmail.com]
 Sent: Wednesday, January 21, 2015 3:09 AM
 To: user@spark.apache.org
 Subject: dynamically change receiver for a spark stream

 Hi all,

 we have been trying to setup a stream using a custom receiver that would
 pick up data from sql databases. we'd like to keep that stream context
 running and dynamically change the streams on demand, adding and removing
 streams based on demand. alternativel, if a stream is fixed, is it possible
 to stop a stream, change to config and start again?

 thanks,



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/dynamically-change-receiver-for-a-spark-stream-tp21268.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For
 additional commands, e-mail: user-h...@spark.apache.org






Re: dynamically change receiver for a spark stream

2015-01-21 Thread Gerard Maas
Hi Tamas,

I meant not changing the receivers, but starting/stopping the Streaming
jobs. So you would have a 'small' Streaming job for a subset of streams
that you'd configure-start-stop  on demand.
I haven't tried myself yet, but I think it should also be possible to
create a Streaming Job from the Spark Job Server (
https://github.com/spark-jobserver/spark-jobserver). Then you would have a
REST interface that even gives you the possibility of passing a
configuration.

-kr, Gerard.

On Wed, Jan 21, 2015 at 11:54 AM, Tamas Jambor jambo...@gmail.com wrote:

 we were thinking along the same line, that is to fix the number of streams
 and change the input and output channels dynamically.

 But could not make it work (seems that the receiver is not allowing any
 change in the config after it started).

 thanks,

 On Wed, Jan 21, 2015 at 10:49 AM, Gerard Maas gerard.m...@gmail.com
 wrote:

 One possible workaround could be to orchestrate launch/stopping of
 Streaming jobs on demand as long as the number of jobs/streams stay
 within the boundaries of the resources (cores) you've available.
 e.g. if you're using Mesos, Marathon offers a REST interface to manage
 job lifecycle. You will still need to solve the dynamic configuration
 through some alternative channel.

 On Wed, Jan 21, 2015 at 11:30 AM, Tamas Jambor jambo...@gmail.com
 wrote:

 thanks for the replies.

 is this something we can get around? Tried to hack into the code without
 much success.

 On Wed, Jan 21, 2015 at 3:15 AM, Shao, Saisai saisai.s...@intel.com
 wrote:

 Hi,

 I don't think current Spark Streaming support this feature, all the
 DStream lineage is fixed after the context is started.

 Also stopping a stream is not supported, instead currently we need to
 stop the whole streaming context to meet what you want.

 Thanks
 Saisai

 -Original Message-
 From: jamborta [mailto:jambo...@gmail.com]
 Sent: Wednesday, January 21, 2015 3:09 AM
 To: user@spark.apache.org
 Subject: dynamically change receiver for a spark stream

 Hi all,

 we have been trying to setup a stream using a custom receiver that
 would pick up data from sql databases. we'd like to keep that stream
 context running and dynamically change the streams on demand, adding and
 removing streams based on demand. alternativel, if a stream is fixed, is it
 possible to stop a stream, change to config and start again?

 thanks,



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/dynamically-change-receiver-for-a-spark-stream-tp21268.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For
 additional commands, e-mail: user-h...@spark.apache.org







Re: Spark 1.1 (slow, working), Spark 1.2 (fast, freezing)

2015-01-21 Thread critikaled
I'm also facing the same issue.
is this a bug?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-1-slow-working-Spark-1-2-fast-freezing-tp21278p21283.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: dynamically change receiver for a spark stream

2015-01-21 Thread Gerard Maas
One possible workaround could be to orchestrate launch/stopping of
Streaming jobs on demand as long as the number of jobs/streams stay within
the boundaries of the resources (cores) you've available.
e.g. if you're using Mesos, Marathon offers a REST interface to manage job
lifecycle. You will still need to solve the dynamic configuration through
some alternative channel.

On Wed, Jan 21, 2015 at 11:30 AM, Tamas Jambor jambo...@gmail.com wrote:

 thanks for the replies.

 is this something we can get around? Tried to hack into the code without
 much success.

 On Wed, Jan 21, 2015 at 3:15 AM, Shao, Saisai saisai.s...@intel.com
 wrote:

 Hi,

 I don't think current Spark Streaming support this feature, all the
 DStream lineage is fixed after the context is started.

 Also stopping a stream is not supported, instead currently we need to
 stop the whole streaming context to meet what you want.

 Thanks
 Saisai

 -Original Message-
 From: jamborta [mailto:jambo...@gmail.com]
 Sent: Wednesday, January 21, 2015 3:09 AM
 To: user@spark.apache.org
 Subject: dynamically change receiver for a spark stream

 Hi all,

 we have been trying to setup a stream using a custom receiver that would
 pick up data from sql databases. we'd like to keep that stream context
 running and dynamically change the streams on demand, adding and removing
 streams based on demand. alternativel, if a stream is fixed, is it possible
 to stop a stream, change to config and start again?

 thanks,



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/dynamically-change-receiver-for-a-spark-stream-tp21268.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional
 commands, e-mail: user-h...@spark.apache.org





Re: dynamically change receiver for a spark stream

2015-01-21 Thread Tamas Jambor
Hi Gerard,

thanks, that makes sense. I'll try that out.

Tamas

On Wed, Jan 21, 2015 at 11:14 AM, Gerard Maas gerard.m...@gmail.com wrote:

 Hi Tamas,

 I meant not changing the receivers, but starting/stopping the Streaming
 jobs. So you would have a 'small' Streaming job for a subset of streams
 that you'd configure-start-stop  on demand.
 I haven't tried myself yet, but I think it should also be possible to
 create a Streaming Job from the Spark Job Server (
 https://github.com/spark-jobserver/spark-jobserver). Then you would have
 a REST interface that even gives you the possibility of passing a
 configuration.

 -kr, Gerard.

 On Wed, Jan 21, 2015 at 11:54 AM, Tamas Jambor jambo...@gmail.com wrote:

 we were thinking along the same line, that is to fix the number of
 streams and change the input and output channels dynamically.

 But could not make it work (seems that the receiver is not allowing any
 change in the config after it started).

 thanks,

 On Wed, Jan 21, 2015 at 10:49 AM, Gerard Maas gerard.m...@gmail.com
 wrote:

 One possible workaround could be to orchestrate launch/stopping of
 Streaming jobs on demand as long as the number of jobs/streams stay
 within the boundaries of the resources (cores) you've available.
 e.g. if you're using Mesos, Marathon offers a REST interface to manage
 job lifecycle. You will still need to solve the dynamic configuration
 through some alternative channel.

 On Wed, Jan 21, 2015 at 11:30 AM, Tamas Jambor jambo...@gmail.com
 wrote:

 thanks for the replies.

 is this something we can get around? Tried to hack into the code
 without much success.

 On Wed, Jan 21, 2015 at 3:15 AM, Shao, Saisai saisai.s...@intel.com
 wrote:

 Hi,

 I don't think current Spark Streaming support this feature, all the
 DStream lineage is fixed after the context is started.

 Also stopping a stream is not supported, instead currently we need to
 stop the whole streaming context to meet what you want.

 Thanks
 Saisai

 -Original Message-
 From: jamborta [mailto:jambo...@gmail.com]
 Sent: Wednesday, January 21, 2015 3:09 AM
 To: user@spark.apache.org
 Subject: dynamically change receiver for a spark stream

 Hi all,

 we have been trying to setup a stream using a custom receiver that
 would pick up data from sql databases. we'd like to keep that stream
 context running and dynamically change the streams on demand, adding and
 removing streams based on demand. alternativel, if a stream is fixed, is 
 it
 possible to stop a stream, change to config and start again?

 thanks,



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/dynamically-change-receiver-for-a-spark-stream-tp21268.html
 Sent from the Apache Spark User List mailing list archive at
 Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For
 additional commands, e-mail: user-h...@spark.apache.org








Re: RangePartitioner

2015-01-21 Thread Sandy Ryza
Hi Rishi,

If you look in the Spark UI, have any executors registered?

Are you able to collect a jstack of the driver process?

-Sandy

On Tue, Jan 20, 2015 at 9:07 PM, Rishi Yadav ri...@infoobjects.com wrote:

  I am joining two tables as below, the program stalls at below log line
 and never proceeds.
 What might be the issue and possible solution?

  INFO SparkContext: Starting job: RangePartitioner at Exchange.scala:79

 Table 1 has  450 columns
 Table2 has  100 columns

 Both tables have few million rows


 val table1= myTable1.as('table1)
 val table2= myTable2.as('table2)
 val results=
 table1.join(table2,LeftOuter,Some(table1.Id.attr === table2.id.attr ))


println(results.count())

 Thanks and Regards,
 Rishi
 @meditativesoul



Re: spark 1.2 three times slower than spark 1.1, why?

2015-01-21 Thread Fengyun RAO
thanks, Sean.

I don't quite understand you have *more *partitions across *more *workers.

It's within the same cluster, and the same data, thus I think the same
partition, the same workers.

we switched from spark 1.1 to 1.2, then it's 3x slower.

(We upgrade from CDH 5.2.1 to CDH 5.3, hence spark 1.1 to 1.2, and found
the problem.
then we installed a standalone spark 1.1, stop the 1.2, run the same
script, it's 3x faster.
stop 1.1, start 1.2, 3x slower again)


2015-01-21 15:45 GMT+08:00 Sean Owen so...@cloudera.com:

 I don't know of any reason to think the singleton pattern doesn't work or
 works differently. I wonder if, for example, task scheduling is different
 in 1.2 and you have more partitions across more workers and so are loading
 more copies more slowly into your singletons.
 On Jan 21, 2015 7:13 AM, Fengyun RAO raofeng...@gmail.com wrote:

 the LogParser instance is not serializable, and thus cannot be a
 broadcast,

 what’s worse, it contains an LRU cache, which is essential to the
 performance, and we would like to share among all the tasks on the same
 node.

 If it is the case, what’s the recommended way to share a variable among
 all the tasks within the same executor.
 ​

 2015-01-21 15:04 GMT+08:00 Davies Liu dav...@databricks.com:

 Maybe some change related to serialize the closure cause LogParser is
 not a singleton any more, then it is initialized for every task.

 Could you change it to a Broadcast?

 On Tue, Jan 20, 2015 at 10:39 PM, Fengyun RAO raofeng...@gmail.com
 wrote:
  Currently we are migrating from spark 1.1 to spark 1.2, but found the
  program 3x slower, with nothing else changed.
  note: our program in spark 1.1 has successfully processed a whole year
 data,
  quite stable.
 
  the main script is as below
 
  sc.textFile(inputPath)
  .flatMap(line = LogParser.parseLine(line))
  .groupByKey(new HashPartitioner(numPartitions))
  .mapPartitionsWithIndex(...)
  .foreach(_ = {})
 
  where LogParser is a singleton which may take some time to initialized
 and
  is shared across the execuator.
 
  the flatMap stage is 3x slower.
 
  We tried to change spark.shuffle.manager back to hash, and
  spark.shuffle.blockTransferService back to nio, but didn’t help.
 
  May somebody explain possible causes, or what should we test or change
 to
  find it out





Re: spark 1.2 three times slower than spark 1.1, why?

2015-01-21 Thread Fengyun RAO
maybe you mean different spark-submit script?

we also use the same spark-submit script, thus the same memory, cores, etc
configuration.
​

2015-01-21 15:45 GMT+08:00 Sean Owen so...@cloudera.com:

 I don't know of any reason to think the singleton pattern doesn't work or
 works differently. I wonder if, for example, task scheduling is different
 in 1.2 and you have more partitions across more workers and so are loading
 more copies more slowly into your singletons.
 On Jan 21, 2015 7:13 AM, Fengyun RAO raofeng...@gmail.com wrote:

 the LogParser instance is not serializable, and thus cannot be a
 broadcast,

 what’s worse, it contains an LRU cache, which is essential to the
 performance, and we would like to share among all the tasks on the same
 node.

 If it is the case, what’s the recommended way to share a variable among
 all the tasks within the same executor.
 ​

 2015-01-21 15:04 GMT+08:00 Davies Liu dav...@databricks.com:

 Maybe some change related to serialize the closure cause LogParser is
 not a singleton any more, then it is initialized for every task.

 Could you change it to a Broadcast?

 On Tue, Jan 20, 2015 at 10:39 PM, Fengyun RAO raofeng...@gmail.com
 wrote:
  Currently we are migrating from spark 1.1 to spark 1.2, but found the
  program 3x slower, with nothing else changed.
  note: our program in spark 1.1 has successfully processed a whole year
 data,
  quite stable.
 
  the main script is as below
 
  sc.textFile(inputPath)
  .flatMap(line = LogParser.parseLine(line))
  .groupByKey(new HashPartitioner(numPartitions))
  .mapPartitionsWithIndex(...)
  .foreach(_ = {})
 
  where LogParser is a singleton which may take some time to initialized
 and
  is shared across the execuator.
 
  the flatMap stage is 3x slower.
 
  We tried to change spark.shuffle.manager back to hash, and
  spark.shuffle.blockTransferService back to nio, but didn’t help.
 
  May somebody explain possible causes, or what should we test or change
 to
  find it out





Re: How to share a NonSerializable variable among tasks in the same worker node?

2015-01-21 Thread Sean Owen
Singletons aren't hacks; it can be an entirely appropriate pattern for
this. What exception do you get? From Spark or your code? I think this
pattern is orthogonal to using Spark.
On Jan 21, 2015 8:11 AM, octavian.ganea octavian.ga...@inf.ethz.ch
wrote:

 In case someone has the same problem:

 The singleton hack works for me sometimes, sometimes it doesn't in spark
 1.2.0, that is, sometimes I get nullpointerexception. Anyway, if you really
 need to work with big indexes and you want to have the smallest amount of
 communication between master and nodes, as well as if you have RAM
 available
 just for one instance of the indexes data per machine, than I suggest you
 use spark with memcached .



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-share-a-NonSerializable-variable-among-tasks-in-the-same-worker-node-tp11048p21282.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: spark 1.2 three times slower than spark 1.1, why?

2015-01-21 Thread JaeBoo Jung
Title: Samsung Enterprise Portal mySingle


I was recently faced with a similar issue, but unfortunatelyIcould notfind out why it happened.
Here'sjira ticket https://issues.apache.org/jira/browse/SPARK-5081of my previous post.
Please checkyour shuffle I/O differences between the two in spark web UI because itcan bepossibly related to my case.

Thanks
Kevin

--- Original Message ---
Sender : Fengyun RAOraofeng...@gmail.com
Date : 2015-01-21 17:41 (GMT+09:00)
Title : Re: spark 1.2 three times slower than spark 1.1, why?



maybe you mean different spark-submit script?
we also use the same spark-submit script, thus the same memory, cores, etc configuration.
​

2015-01-21 15:45 GMT+08:00 Sean Owen so...@cloudera.com: 

I don't know of any reason to think the singleton pattern doesn't work or works differently. I wonder if, for example, task scheduling is different in 1.2 and you have more partitions across more workers and so are loading more copies more slowly into your singletons. 


On Jan 21, 2015 7:13 AM, "Fengyun RAO" raofeng...@gmail.com wrote: 



the LogParser instance is not serializable, and thus cannot be a broadcast, 
what’s worse, it contains an LRU cache, which is essential to the performance, and we would like to share among all the tasks on the same node.
If it is the case, what’s the recommended way to share a variable among all the tasks within the same executor.
​

2015-01-21 15:04 GMT+08:00 Davies Liu dav...@databricks.com: 
Maybe some change related to serialize the closure cause LogParser is not a singleton any more, then it is initialized for every task. Could you change it to a Broadcast? 

On Tue, Jan 20, 2015 at 10:39 PM, Fengyun RAO raofeng...@gmail.com wrote:  Currently we are migrating from spark 1.1 to spark 1.2, but found the  program 3x slower, with nothing else changed.  note: our program in spark 1.1 has successfully processed a whole year data,  quite stable.   the main script is as below   sc.textFile(inputPath)  .flatMap(line = LogParser.parseLine(line))  .groupByKey(new HashPartitioner(numPartitions))  .mapPartitionsWithIndex(...)  .foreach(_ = {})   where LogParser is a singleton which may take some time to initialized and  is shared across the execuator.   the flatMap stage is 3x slower.   We tried to change spark.shuffle.manager back to hash, and  spark.shuffle.blockTransferService back to nio, but didn’t help.   May somebody explain possible causes, or what should we test or change to  find it out 


Re: spark 1.2 three times slower than spark 1.1, why?

2015-01-21 Thread Fengyun RAO
I don't know how to debug distributed application, any tools or suggestion?

but from spark web UI,

the GC time (~0.1 s), Shuffle Write(11 GB) are similar for spark 1.1 and
1.2.
there are no Shuffle Read and Spill.
The only difference is Duration
DurationMin25th percentileMedian75th percentileMaxspark 1.24s37s45s53s1.9
minspark 1.12 s17 s18 s18 s34 s

2015-01-21 16:56 GMT+08:00 Sean Owen so...@cloudera.com:

 I mean that if you had tasks running on 10 machines now instead of 3 for
 some reason you would have more than 3 times the read load on your source
 of data all at once. Same if you made more executors per machine. But from
 your additional info it does not sound like this is the case. I think you
 need more debugging to pinpoint what is slower.
 On Jan 21, 2015 9:30 AM, Fengyun RAO raofeng...@gmail.com wrote:

 thanks, Sean.

 I don't quite understand you have *more *partitions across *more *
 workers.

 It's within the same cluster, and the same data, thus I think the same
 partition, the same workers.

 we switched from spark 1.1 to 1.2, then it's 3x slower.

 (We upgrade from CDH 5.2.1 to CDH 5.3, hence spark 1.1 to 1.2, and found
 the problem.
 then we installed a standalone spark 1.1, stop the 1.2, run the same
 script, it's 3x faster.
 stop 1.1, start 1.2, 3x slower again)


 2015-01-21 15:45 GMT+08:00 Sean Owen so...@cloudera.com:

 I don't know of any reason to think the singleton pattern doesn't work
 or works differently. I wonder if, for example, task scheduling is
 different in 1.2 and you have more partitions across more workers and so
 are loading more copies more slowly into your singletons.
 On Jan 21, 2015 7:13 AM, Fengyun RAO raofeng...@gmail.com wrote:

 the LogParser instance is not serializable, and thus cannot be a
 broadcast,

 what’s worse, it contains an LRU cache, which is essential to the
 performance, and we would like to share among all the tasks on the same
 node.

 If it is the case, what’s the recommended way to share a variable among
 all the tasks within the same executor.
 ​

 2015-01-21 15:04 GMT+08:00 Davies Liu dav...@databricks.com:

 Maybe some change related to serialize the closure cause LogParser is
 not a singleton any more, then it is initialized for every task.

 Could you change it to a Broadcast?

 On Tue, Jan 20, 2015 at 10:39 PM, Fengyun RAO raofeng...@gmail.com
 wrote:
  Currently we are migrating from spark 1.1 to spark 1.2, but found the
  program 3x slower, with nothing else changed.
  note: our program in spark 1.1 has successfully processed a whole
 year data,
  quite stable.
 
  the main script is as below
 
  sc.textFile(inputPath)
  .flatMap(line = LogParser.parseLine(line))
  .groupByKey(new HashPartitioner(numPartitions))
  .mapPartitionsWithIndex(...)
  .foreach(_ = {})
 
  where LogParser is a singleton which may take some time to
 initialized and
  is shared across the execuator.
 
  the flatMap stage is 3x slower.
 
  We tried to change spark.shuffle.manager back to hash, and
  spark.shuffle.blockTransferService back to nio, but didn’t help.
 
  May somebody explain possible causes, or what should we test or
 change to
  find it out






Re: Support for SQL on unions of tables (merge tables?)

2015-01-21 Thread Paul Wais
Thanks Cheng!

For the list, I talked with Michael Armbrust at a recent Spark meetup
and his comments were:
 * For a union of tables, use a view and the Hive metastore
 * SQLContext might have the directory-traversing logic I need in it already
 * The union() of sequence files I saw was slow because Spark was
probably trying to shuffle the whole union.  A similar Spark SQL join
will also be slow (or break) unless one runs statistics so that the
smaller table can be broadcasted (e.g. see
https://spark.apache.org/docs/latest/sql-programming-guide.html#other-configuration-options
)

I have never used Hive, so I'll have to investigate further.


On Tue, Jan 20, 2015 at 1:15 PM, Cheng Lian lian.cs@gmail.com wrote:
 I think you can resort to a Hive table partitioned by date
 https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-PartitionedTables


 On 1/11/15 9:51 PM, Paul Wais wrote:


 Dear List,

 What are common approaches for addressing over a union of tables / RDDs?
 E.g. suppose I have a collection of log files in HDFS, one log file per day,
 and I want to compute the sum of some field over a date range in SQL.  Using
 log schema, I can read each as a distinct SchemaRDD, but I want to union
 them all and query against one 'table'.

 If this data were in MySQL, I could have a table for each day of data and
 use a MyISAM merge table to union these tables together and just query
 against the merge table.  What's nice here is that MySQL persists the merge
 table, and the merge table is r/w, so one can just update the merge table
 once per day.  (What's not nice is that merge tables scale poorly, backup
 admin is a pain, and oh hey I'd like to use Spark not MySQL).

 One naive and untested idea (that achieves implicit persistence): scan an
 HDFS directory for log files, create one RDD per file, union() the RDDs,
 then create a Schema RDD from that union().

 A few specific questions:
  * Any good approaches to a merge / union table? (Other than the naive
 idea above).  Preferably with some way to persist that table / RDD between
 Spark runs.  (How does Impala approach this problem?)

  * Has anybody tried joining against such a union of tables / RDDs on a
 very large amount of data?  When I've tried (non-spark-sql) union()ing
 Sequence Files, and then join()ing them against another RDD, Spark seems to
 try to compute the full union before doing any join() computation (and
 eventually OOMs the cluster because the union of Sequence Files is so big).
 I haven't tried something similar with Spark SQL.

  * Are there any plans related to this in the Spark roadmap?  (This
 feature would be a nice compliment to, say, persistent RDD indices for
 interactive querying).

  * Related question: are there plans to use Parquet Index Pages to make
 Spark SQL faster?  E.g. log indices over date ranges would be relevant here.

 All the best,
 -Paul




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming with Kafka

2015-01-21 Thread Dibyendu Bhattacharya
You can probably try the Low Level Consumer option with Spark 1.2

https://github.com/dibbhatt/kafka-spark-consumer

This Consumer can recover from any underlying failure of Spark Platform or
Kafka and either retry or restart the receiver. This is being working
nicely for us.

Regards,
Dibyendu


On Wed, Jan 21, 2015 at 7:46 AM, firemonk9 dhiraj.peech...@gmail.com
wrote:

 Hi,

I am having similar issues. Have you found any resolution ?

 Thank you



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-with-Kafka-tp21222p21276.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: SPARK-streaming app running 10x slower on YARN vs STANDALONE cluster

2015-01-21 Thread Mukesh Jha
Hello Guys,

I've re partitioned my kafkaStream so that it gets evenly distributed among
the executors and the results are better.
Still from the executors page it seems that only 1 executors all 8 cores
are getting used and other executors are using just 1 core.

Is this the correct interpretation based on the below data? If so how can
we fix this?

[image: Inline image 1]

On Wed, Dec 31, 2014 at 7:22 AM, Tathagata Das tathagata.das1...@gmail.com
wrote:

 Thats is kind of expected due to data locality. Though you should see
 some tasks running on the executors as the data gets replicated to
 other nodes and can therefore run tasks based on locality. You have
 two solutions

 1. kafkaStream.repartition() to explicitly repartition the received
 data across the cluster.
 2. Create multiple kafka streams and union them together.

 See
 http://spark.apache.org/docs/latest/streaming-programming-guide.html#reducing-the-processing-time-of-each-batch

 On Tue, Dec 30, 2014 at 1:43 AM, Mukesh Jha me.mukesh@gmail.com
 wrote:
  Thanks Sandy, It was the issue with the no of cores.
 
  Another issue I was facing is that tasks are not getting distributed
 evenly
  among all executors and are running on the NODE_LOCAL locality level i.e.
  all the tasks are running on the same executor where my kafkareceiver(s)
 are
  running even though other executors are idle.
 
  I configured spark.locality.wait=50 instead of the default 3000 ms, which
  forced the task rebalancing among nodes, let me know if there is a better
  way to deal with this.
 
 
  On Tue, Dec 30, 2014 at 12:09 AM, Mukesh Jha me.mukesh@gmail.com
  wrote:
 
  Makes sense, I've also tries it in standalone mode where all 3 workers 
  driver were running on the same 8 core box and the results were similar.
 
  Anyways I will share the results in YARN mode with 8 core yarn
 containers.
 
  On Mon, Dec 29, 2014 at 11:58 PM, Sandy Ryza sandy.r...@cloudera.com
  wrote:
 
  When running in standalone mode, each executor will be able to use all
 8
  cores on the box.  When running on YARN, each executor will only have
 access
  to 2 cores.  So the comparison doesn't seem fair, no?
 
  -Sandy
 
  On Mon, Dec 29, 2014 at 10:22 AM, Mukesh Jha me.mukesh@gmail.com
  wrote:
 
  Nope, I am setting 5 executors with 2  cores each. Below is the
 command
  that I'm using to submit in YARN mode. This starts up 5 executor
 nodes and a
  drives as per the spark  application master UI.
 
  spark-submit --master yarn-cluster --num-executors 5 --driver-memory
  1024m --executor-memory 1024m --executor-cores 2 --class
  com.oracle.ci.CmsgK2H /homext/lib/MJ-ci-k2h.jar
 vm.cloud.com:2181/kafka
  spark-yarn avro 1 5000
 
  On Mon, Dec 29, 2014 at 11:45 PM, Sandy Ryza sandy.r...@cloudera.com
 
  wrote:
 
  *oops, I mean are you setting --executor-cores to 8
 
  On Mon, Dec 29, 2014 at 10:15 AM, Sandy Ryza 
 sandy.r...@cloudera.com
  wrote:
 
  Are you setting --num-executors to 8?
 
  On Mon, Dec 29, 2014 at 10:13 AM, Mukesh Jha 
 me.mukesh@gmail.com
  wrote:
 
  Sorry Sandy, The command is just for reference but I can confirm
 that
  there are 4 executors and a driver as shown in the spark UI page.
 
  Each of these machines is a 8 core box with ~15G of ram.
 
  On Mon, Dec 29, 2014 at 11:23 PM, Sandy Ryza
  sandy.r...@cloudera.com wrote:
 
  Hi Mukesh,
 
  Based on your spark-submit command, it looks like you're only
  running with 2 executors on YARN.  Also, how many cores does each
 machine
  have?
 
  -Sandy
 
  On Mon, Dec 29, 2014 at 4:36 AM, Mukesh Jha
  me.mukesh@gmail.com wrote:
 
  Hello Experts,
  I'm bench-marking Spark on YARN
  (https://spark.apache.org/docs/latest/running-on-yarn.html) vs
 a standalone
  spark cluster (
 https://spark.apache.org/docs/latest/spark-standalone.html).
  I have a standalone cluster with 3 executors, and a spark app
  running on yarn with 4 executors as shown below.
 
  The spark job running inside yarn is 10x slower than the one
  running on the standalone cluster (even though the yarn has more
 number of
  workers), also in both the case all the executors are in the
 same datacenter
  so there shouldn't be any latency. On YARN each 5sec batch is
 reading data
  from kafka and processing it in 5sec  on the standalone cluster
 each 5sec
  batch is getting processed in 0.4sec.
  Also, In YARN mode all the executors are not getting used up
 evenly
  as vm-13  vm-14 are running most of the tasks whereas in the
 standalone
  mode all the executors are running the tasks.
 
  Do I need to set up some configuration to evenly distribute the
  tasks? Also do you have any pointers on the reasons the yarn job
 is 10x
  slower than the standalone job?
  Any suggestion is greatly appreciated, Thanks in advance.
 
  YARN(5 workers + driver)
  
  Executor ID Address RDD Blocks Memory Used DU AT FT CT TT TT
 Input
  ShuffleRead ShuffleWrite Thread Dump
  1 vm-18.cloud.com:51796 0 0.0B/530.3MB 0.0 B 1 0 16 17 634 ms
 

Re: SPARK-streaming app running 10x slower on YARN vs STANDALONE cluster

2015-01-21 Thread Gerard Maas
Hi Mukesh,

How are you creating your receivers? Could you post the (relevant) code?

-kr, Gerard.

On Wed, Jan 21, 2015 at 9:42 AM, Mukesh Jha me.mukesh@gmail.com wrote:

 Hello Guys,

 I've re partitioned my kafkaStream so that it gets evenly distributed
 among the executors and the results are better.
 Still from the executors page it seems that only 1 executors all 8 cores
 are getting used and other executors are using just 1 core.

 Is this the correct interpretation based on the below data? If so how can
 we fix this?

 [image: Inline image 1]

 On Wed, Dec 31, 2014 at 7:22 AM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 Thats is kind of expected due to data locality. Though you should see
 some tasks running on the executors as the data gets replicated to
 other nodes and can therefore run tasks based on locality. You have
 two solutions

 1. kafkaStream.repartition() to explicitly repartition the received
 data across the cluster.
 2. Create multiple kafka streams and union them together.

 See
 http://spark.apache.org/docs/latest/streaming-programming-guide.html#reducing-the-processing-time-of-each-batch

 On Tue, Dec 30, 2014 at 1:43 AM, Mukesh Jha me.mukesh@gmail.com
 wrote:
  Thanks Sandy, It was the issue with the no of cores.
 
  Another issue I was facing is that tasks are not getting distributed
 evenly
  among all executors and are running on the NODE_LOCAL locality level
 i.e.
  all the tasks are running on the same executor where my
 kafkareceiver(s) are
  running even though other executors are idle.
 
  I configured spark.locality.wait=50 instead of the default 3000 ms,
 which
  forced the task rebalancing among nodes, let me know if there is a
 better
  way to deal with this.
 
 
  On Tue, Dec 30, 2014 at 12:09 AM, Mukesh Jha me.mukesh@gmail.com
  wrote:
 
  Makes sense, I've also tries it in standalone mode where all 3 workers
 
  driver were running on the same 8 core box and the results were
 similar.
 
  Anyways I will share the results in YARN mode with 8 core yarn
 containers.
 
  On Mon, Dec 29, 2014 at 11:58 PM, Sandy Ryza sandy.r...@cloudera.com
  wrote:
 
  When running in standalone mode, each executor will be able to use
 all 8
  cores on the box.  When running on YARN, each executor will only have
 access
  to 2 cores.  So the comparison doesn't seem fair, no?
 
  -Sandy
 
  On Mon, Dec 29, 2014 at 10:22 AM, Mukesh Jha me.mukesh@gmail.com
 
  wrote:
 
  Nope, I am setting 5 executors with 2  cores each. Below is the
 command
  that I'm using to submit in YARN mode. This starts up 5 executor
 nodes and a
  drives as per the spark  application master UI.
 
  spark-submit --master yarn-cluster --num-executors 5 --driver-memory
  1024m --executor-memory 1024m --executor-cores 2 --class
  com.oracle.ci.CmsgK2H /homext/lib/MJ-ci-k2h.jar
 vm.cloud.com:2181/kafka
  spark-yarn avro 1 5000
 
  On Mon, Dec 29, 2014 at 11:45 PM, Sandy Ryza 
 sandy.r...@cloudera.com
  wrote:
 
  *oops, I mean are you setting --executor-cores to 8
 
  On Mon, Dec 29, 2014 at 10:15 AM, Sandy Ryza 
 sandy.r...@cloudera.com
  wrote:
 
  Are you setting --num-executors to 8?
 
  On Mon, Dec 29, 2014 at 10:13 AM, Mukesh Jha 
 me.mukesh@gmail.com
  wrote:
 
  Sorry Sandy, The command is just for reference but I can confirm
 that
  there are 4 executors and a driver as shown in the spark UI page.
 
  Each of these machines is a 8 core box with ~15G of ram.
 
  On Mon, Dec 29, 2014 at 11:23 PM, Sandy Ryza
  sandy.r...@cloudera.com wrote:
 
  Hi Mukesh,
 
  Based on your spark-submit command, it looks like you're only
  running with 2 executors on YARN.  Also, how many cores does
 each machine
  have?
 
  -Sandy
 
  On Mon, Dec 29, 2014 at 4:36 AM, Mukesh Jha
  me.mukesh@gmail.com wrote:
 
  Hello Experts,
  I'm bench-marking Spark on YARN
  (https://spark.apache.org/docs/latest/running-on-yarn.html) vs
 a standalone
  spark cluster (
 https://spark.apache.org/docs/latest/spark-standalone.html).
  I have a standalone cluster with 3 executors, and a spark app
  running on yarn with 4 executors as shown below.
 
  The spark job running inside yarn is 10x slower than the one
  running on the standalone cluster (even though the yarn has
 more number of
  workers), also in both the case all the executors are in the
 same datacenter
  so there shouldn't be any latency. On YARN each 5sec batch is
 reading data
  from kafka and processing it in 5sec  on the standalone
 cluster each 5sec
  batch is getting processed in 0.4sec.
  Also, In YARN mode all the executors are not getting used up
 evenly
  as vm-13  vm-14 are running most of the tasks whereas in the
 standalone
  mode all the executors are running the tasks.
 
  Do I need to set up some configuration to evenly distribute the
  tasks? Also do you have any pointers on the reasons the yarn
 job is 10x
  slower than the standalone job?
  Any suggestion is greatly appreciated, Thanks in advance.
 
  YARN(5 workers + driver)
  

Re: spark 1.2 three times slower than spark 1.1, why?

2015-01-21 Thread Fengyun RAO
thanks JaeBoo, in our case, the shuffle write are similar.

2015-01-21 17:01 GMT+08:00 JaeBoo Jung itsjb.j...@samsung.com:

  I was recently faced with a similar issue, but unfortunately I could
 not find out why it happened.

 Here's jira ticket https://issues.apache.org/jira/browse/SPARK-5081 of my
 previous post.

 Please check your shuffle I/O differences between the two in spark web UI
 because it can be possibly related to my case.



 Thanks

 Kevin



 --- *Original Message* ---

 *Sender* : Fengyun RAOraofeng...@gmail.com

 *Date* : 2015-01-21 17:41 (GMT+09:00)

 *Title* : Re: spark 1.2 three times slower than spark 1.1, why?



 maybe you mean different spark-submit script?

 we also use the same spark-submit script, thus the same memory, cores,
 etc configuration.
 ​

 2015-01-21 15:45 GMT+08:00 Sean Owen so...@cloudera.com:

 I don't know of any reason to think the singleton pattern doesn't work or
 works differently. I wonder if, for example, task scheduling is different
 in 1.2 and you have more partitions across more workers and so are loading
 more copies more slowly into your singletons.
  On Jan 21, 2015 7:13 AM, Fengyun RAO raofeng...@gmail.com wrote:

  the LogParser instance is not serializable, and thus cannot be a
 broadcast,

 what’s worse, it contains an LRU cache, which is essential to the
 performance, and we would like to share among all the tasks on the same
 node.

 If it is the case, what’s the recommended way to share a variable among
 all the tasks within the same executor.
 ​

 2015-01-21 15:04 GMT+08:00 Davies Liu dav...@databricks.com:

 Maybe some change related to serialize the closure cause LogParser is
 not a singleton any more, then it is initialized for every task.

 Could you change it to a Broadcast?

 On Tue, Jan 20, 2015 at 10:39 PM, Fengyun RAO raofeng...@gmail.com
 wrote:
  Currently we are migrating from spark 1.1 to spark 1.2, but found the
  program 3x slower, with nothing else changed.
  note: our program in spark 1.1 has successfully processed a whole
 year data,
  quite stable.
 
  the main script is as below
 
  sc.textFile(inputPath)
  .flatMap(line = LogParser.parseLine(line))
  .groupByKey(new HashPartitioner(numPartitions))
  .mapPartitionsWithIndex(...)
  .foreach(_ = {})
 
  where LogParser is a singleton which may take some time to
 initialized and
  is shared across the execuator.
 
  the flatMap stage is 3x slower.
 
  We tried to change spark.shuffle.manager back to hash, and
  spark.shuffle.blockTransferService back to nio, but didn’t help.
 
  May somebody explain possible causes, or what should we test or
 change to
  find it out






Re: spark 1.2 three times slower than spark 1.1, why?

2015-01-21 Thread Fengyun RAO
btw: Shuffle Write(11 GB) mean 11 GB per Executor, for each task, it's ~40
MB

2015-01-21 17:53 GMT+08:00 Fengyun RAO raofeng...@gmail.com:

 I don't know how to debug distributed application, any tools or suggestion?

 but from spark web UI,

 the GC time (~0.1 s), Shuffle Write(11 GB) are similar for spark 1.1 and
 1.2.
 there are no Shuffle Read and Spill.
 The only difference is Duration
 DurationMin25th percentileMedian75th percentileMaxspark 1.24s37s45s53s1.9
 minspark 1.12 s17 s18 s18 s34 s

 2015-01-21 16:56 GMT+08:00 Sean Owen so...@cloudera.com:

 I mean that if you had tasks running on 10 machines now instead of 3 for
 some reason you would have more than 3 times the read load on your source
 of data all at once. Same if you made more executors per machine. But from
 your additional info it does not sound like this is the case. I think you
 need more debugging to pinpoint what is slower.
 On Jan 21, 2015 9:30 AM, Fengyun RAO raofeng...@gmail.com wrote:

 thanks, Sean.

 I don't quite understand you have *more *partitions across *more *
 workers.

 It's within the same cluster, and the same data, thus I think the same
 partition, the same workers.

 we switched from spark 1.1 to 1.2, then it's 3x slower.

 (We upgrade from CDH 5.2.1 to CDH 5.3, hence spark 1.1 to 1.2, and found
 the problem.
 then we installed a standalone spark 1.1, stop the 1.2, run the same
 script, it's 3x faster.
 stop 1.1, start 1.2, 3x slower again)


 2015-01-21 15:45 GMT+08:00 Sean Owen so...@cloudera.com:

 I don't know of any reason to think the singleton pattern doesn't work
 or works differently. I wonder if, for example, task scheduling is
 different in 1.2 and you have more partitions across more workers and so
 are loading more copies more slowly into your singletons.
 On Jan 21, 2015 7:13 AM, Fengyun RAO raofeng...@gmail.com wrote:

 the LogParser instance is not serializable, and thus cannot be a
 broadcast,

 what’s worse, it contains an LRU cache, which is essential to the
 performance, and we would like to share among all the tasks on the same
 node.

 If it is the case, what’s the recommended way to share a variable
 among all the tasks within the same executor.
 ​

 2015-01-21 15:04 GMT+08:00 Davies Liu dav...@databricks.com:

 Maybe some change related to serialize the closure cause LogParser is
 not a singleton any more, then it is initialized for every task.

 Could you change it to a Broadcast?

 On Tue, Jan 20, 2015 at 10:39 PM, Fengyun RAO raofeng...@gmail.com
 wrote:
  Currently we are migrating from spark 1.1 to spark 1.2, but found
 the
  program 3x slower, with nothing else changed.
  note: our program in spark 1.1 has successfully processed a whole
 year data,
  quite stable.
 
  the main script is as below
 
  sc.textFile(inputPath)
  .flatMap(line = LogParser.parseLine(line))
  .groupByKey(new HashPartitioner(numPartitions))
  .mapPartitionsWithIndex(...)
  .foreach(_ = {})
 
  where LogParser is a singleton which may take some time to
 initialized and
  is shared across the execuator.
 
  the flatMap stage is 3x slower.
 
  We tried to change spark.shuffle.manager back to hash, and
  spark.shuffle.blockTransferService back to nio, but didn’t help.
 
  May somebody explain possible causes, or what should we test or
 change to
  find it out







Re: spark 1.2 three times slower than spark 1.1, why?

2015-01-21 Thread Paul Wais
To force one instance per executor, you could explicitly subclass
FlatMapFunction and have it lazy-create your parser in the subclass
constructor.  You might also want to try RDD#mapPartitions() (instead of
RDD#flatMap() if you want one instance per partition.  This approach worked
well for me when I had a flat map function that used non-serializable
native code / objects.

FWIW RDD#flatMap() does not appear to have changed 1.1 - 1.2 (tho master
has a slight refactor).  Agree it's worth checking the number of partitions
in your 1.1 vs 1.2 test.



On Tue, Jan 20, 2015 at 11:13 PM, Fengyun RAO raofeng...@gmail.com wrote:

 the LogParser instance is not serializable, and thus cannot be a
 broadcast,

 what’s worse, it contains an LRU cache, which is essential to the
 performance, and we would like to share among all the tasks on the same
 node.

 If it is the case, what’s the recommended way to share a variable among
 all the tasks within the same executor.
 ​

 2015-01-21 15:04 GMT+08:00 Davies Liu dav...@databricks.com:

 Maybe some change related to serialize the closure cause LogParser is
 not a singleton any more, then it is initialized for every task.

 Could you change it to a Broadcast?

 On Tue, Jan 20, 2015 at 10:39 PM, Fengyun RAO raofeng...@gmail.com
 wrote:
  Currently we are migrating from spark 1.1 to spark 1.2, but found the
  program 3x slower, with nothing else changed.
  note: our program in spark 1.1 has successfully processed a whole year
 data,
  quite stable.
 
  the main script is as below
 
  sc.textFile(inputPath)
  .flatMap(line = LogParser.parseLine(line))
  .groupByKey(new HashPartitioner(numPartitions))
  .mapPartitionsWithIndex(...)
  .foreach(_ = {})
 
  where LogParser is a singleton which may take some time to initialized
 and
  is shared across the execuator.
 
  the flatMap stage is 3x slower.
 
  We tried to change spark.shuffle.manager back to hash, and
  spark.shuffle.blockTransferService back to nio, but didn’t help.
 
  May somebody explain possible causes, or what should we test or change
 to
  find it out





Re: How to share a NonSerializable variable among tasks in the same worker node?

2015-01-21 Thread octavian.ganea
In case someone has the same problem:

The singleton hack works for me sometimes, sometimes it doesn't in spark
1.2.0, that is, sometimes I get nullpointerexception. Anyway, if you really
need to work with big indexes and you want to have the smallest amount of
communication between master and nodes, as well as if you have RAM available
just for one instance of the indexes data per machine, than I suggest you
use spark with memcached . 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-share-a-NonSerializable-variable-among-tasks-in-the-same-worker-node-tp11048p21282.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark 1.2 three times slower than spark 1.1, why?

2015-01-21 Thread Fengyun RAO
Thanks, Paul,

I don’t understand how subclass FlatMapFunction helps, could you show a
sample code?

We need one instance per executor, not per partition, thus mapPartitions()
doesn’t help.
​

2015-01-21 16:07 GMT+08:00 Paul Wais paulw...@gmail.com:

 To force one instance per executor, you could explicitly subclass
 FlatMapFunction and have it lazy-create your parser in the subclass
 constructor.  You might also want to try RDD#mapPartitions() (instead of
 RDD#flatMap() if you want one instance per partition.  This approach worked
 well for me when I had a flat map function that used non-serializable
 native code / objects.

 FWIW RDD#flatMap() does not appear to have changed 1.1 - 1.2 (tho master
 has a slight refactor).  Agree it's worth checking the number of partitions
 in your 1.1 vs 1.2 test.



 On Tue, Jan 20, 2015 at 11:13 PM, Fengyun RAO raofeng...@gmail.com
 wrote:

 the LogParser instance is not serializable, and thus cannot be a
 broadcast,

 what’s worse, it contains an LRU cache, which is essential to the
 performance, and we would like to share among all the tasks on the same
 node.

 If it is the case, what’s the recommended way to share a variable among
 all the tasks within the same executor.
 ​

 2015-01-21 15:04 GMT+08:00 Davies Liu dav...@databricks.com:

 Maybe some change related to serialize the closure cause LogParser is
 not a singleton any more, then it is initialized for every task.

 Could you change it to a Broadcast?

 On Tue, Jan 20, 2015 at 10:39 PM, Fengyun RAO raofeng...@gmail.com
 wrote:
  Currently we are migrating from spark 1.1 to spark 1.2, but found the
  program 3x slower, with nothing else changed.
  note: our program in spark 1.1 has successfully processed a whole year
 data,
  quite stable.
 
  the main script is as below
 
  sc.textFile(inputPath)
  .flatMap(line = LogParser.parseLine(line))
  .groupByKey(new HashPartitioner(numPartitions))
  .mapPartitionsWithIndex(...)
  .foreach(_ = {})
 
  where LogParser is a singleton which may take some time to initialized
 and
  is shared across the execuator.
 
  the flatMap stage is 3x slower.
 
  We tried to change spark.shuffle.manager back to hash, and
  spark.shuffle.blockTransferService back to nio, but didn’t help.
 
  May somebody explain possible causes, or what should we test or change
 to
  find it out






Re: spark-submit --py-files remote: Only local additional python files are supported

2015-01-21 Thread Vladimir Grigor
Thank you Andrew for you reply!

I am very intested in having this feature. It is possible to run PySpark on
AWS EMR in client mode(https://aws.amazon.com/articles/4926593393724923),
but that kills the whole idea of running batch jobs in EMR on PySpark.

Could you please (help to) create a task(with some details of possible
implementation) for this feature? I'd like to implement that but I'm too
new to Spark to know how to do it in a good way...

-Vladimir

On Tue, Jan 20, 2015 at 8:40 PM, Andrew Or and...@databricks.com wrote:

 Hi Vladimir,

 Yes, as the error messages suggests, PySpark currently only supports local
 files. This does not mean it only runs in local mode, however; you can
 still run PySpark on any cluster manager (though only in client mode). All
 this means is that your python files must be on your local file system.
 Until this is supported, the straightforward workaround then is to just
 copy the files to your local machine.

 -Andrew

 2015-01-20 7:38 GMT-08:00 Vladimir Grigor vladi...@kiosked.com:

 Hi all!

 I found this problem when I tried running python application on Amazon's
 EMR yarn cluster.

 It is possible to run bundled example applications on EMR but I cannot
 figure out how to run a little bit more complex python application which
 depends on some other python scripts. I tried adding those files with
 '--py-files' and it works fine in local mode but it fails and gives me
 following message when run in EMR:
 Error: Only local python files are supported:
 s3://pathtomybucket/mylibrary.py.

 Simplest way to reproduce in local:
 bin/spark-submit --py-files s3://whatever.path.com/library.py main.py

 Actual commands to run it in EMR
 #launch cluster
 aws emr create-cluster --name SparkCluster --ami-version 3.3.1
 --instance-type m1.medium --instance-count 2  --ec2-attributes
 KeyName=key20141114 --log-uri s3://pathtomybucket/cluster_logs
 --enable-debugging --use-default-roles  --bootstrap-action
 Name=Spark,Path=s3://pathtomybucket/bootstrap-actions/spark/install-spark,Args=[-s,
 http://pathtomybucket/bootstrap-actions/spark
 ,-l,WARN,-v,1.2,-b,2014121700,-x]
 #{
 #   ClusterId: j-2Y58DME79MPQJ
 #}

 #run application
 aws emr add-steps --cluster-id j-2Y58DME79MPQJ --steps
 ActionOnFailure=CONTINUE,Name=SparkPy,Jar=s3://eu-west-1.elasticmapreduce/libs/script-runner/script-runner.jar,Args=[/home/hadoop/spark/bin/spark-submit,--deploy-mode,cluster,--master,yarn-cluster,--py-files,s3://pathtomybucket/tasks/demo/main.py,main.py]
 #{
 #StepIds: [
 #s-2UP4PP75YX0KU
 #]
 #}
 And in stderr of that step I get Error: Only local python files are
 supported: s3://pathtomybucket/tasks/demo/main.py.

 What is the workaround or correct way to do it? Using hadoop's distcp to
 copy dependency files from s3 to nodes as another pre-step?

 Regards, Vladimir





Re: Closing over a var with changing value in Streaming application

2015-01-21 Thread Tobias Pfeiffer
Hi,

On Wed, Jan 21, 2015 at 9:13 PM, Bob Tiernay btier...@hotmail.com wrote:

 Maybe I'm misunderstanding something here, but couldn't this be done with
 broadcast variables? I there is the following caveat from the docs:

 In addition, the object v should not be modified after it is broadcast
 in order to ensure that all nodes get the same value of the broadcast
 variable (e.g. if the variable is shipped to a new node later)


Well, I think I need a modifiable state (modifiable = changes once per
interval) that stores the number of total items seen so far in the
lifetime of my application, and I need this number on each executor. Since
this number changes after every interval processed, I think broadcast
variables are probably not appropriate in this case.

Thanks
Tobias


Possible to restart (or stop and create) a StreamingContext

2015-01-21 Thread jamborta
hi all,

I have been experimenting with creating a sparkcontext - streamingcontext
- a few streams - starting - stopping - creating new streams - starting
a new (or the existing) streamingcontext with the new streams

(I need to keep the existing sparkcontext alive as it would run other spark
jobs)

I ran into a few problems:

- I cannot seem to create a new streaming context after another one was shut
down. I get this error:
15/01/21 12:43:16 INFO MetricsSystem: Metrics already registered
java.lang.IllegalArgumentException: A metric named
app-20150121123832-0008.driver.Spark
shell.StreamingMetrics.streaming.lastCompletedBatch_processStartTime already
exists

- Or if I try to start the one that was stopped I get this:
org.apache.spark.SparkException: StreamingContext has already been started

- It seems even after the streaming context is stopped, it still shows up in
the job info (spark web UI).

is there a better way to do this?

thanks,





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Possible-to-restart-or-stop-and-create-a-StreamingContext-tp21291.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Discourse: A proposed alternative to the Spark User list

2015-01-21 Thread btiernay
I think this is a really great idea for really opening up the discussions
that happen here. Also, it would be nice to know why there doesn't seem to
be much interest. Maybe I'm misunderstanding some nuance of Apache projects.

Cheers



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Discourse-A-proposed-alternative-to-the-Spark-User-list-tp20851p21288.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: NullPointer when access rdd.sparkContext (Spark 1.1.1)

2015-01-21 Thread Luis Ángel Vicente Sánchez
The SparkContext is lost when I call the persist function from the sink
function, just before the function call... everything works as intended so
I guess is the FunctionN class serialisation what it's causing the problem.
I will try to embed the functionality in the sink method to verify that.

2015-01-21 12:35 GMT+00:00 Luis Ángel Vicente Sánchez 
langel.gro...@gmail.com:

 The following functions,

 def sink(data: DStream[((GameID, Category), ((TimeSeriesKey, Platform),
 HLL))]): Unit = {
 data.foreachRDD { rdd =
   rdd.cache()
   val (minTime, maxTime): (Long, Long) =
 rdd.map {
   case (_, ((TimeSeriesKey(_, time), _), _)) = (time, time)
 }.fold((Long.MaxValue, Long.MinValue)) {
   case ((min, max), (num, _)) = (math.min(min, num),
 math.max(max, num))
 }
   if (minTime != Long.MaxValue  maxTime != Long.MinValue) {
 rdd.map(_._1).distinct().foreach {
   case (game, category) = persist(game, category, minTime,
 maxTime, rdd)
 }
   }
   rdd.unpersist(blocking = false)
 }
   }

   def persist(game: GameID, category: Category, min: Long, max: Long,
 data: RDD[((GameID, Category), ((TimeSeriesKey, Platform), HLL))]): Unit =
 {
 val family: String = s${parameters.table.family}_$
 {game.repr}_${category.repr}
 val cas: CassandraRDD[(Long, Long, String, Array[Byte])] =
   data.sparkContext.cassandraTable[(Long, Long, String,
 Array[Byte])](parameters.table.keyspace, family)
 val fil: RDD[((TimeSeriesKey, Platform), HLL)] =
   cas
 .where(time = ?, new Date(min))
 .where(time = ?, new Date(max))
 .map {
   case (date, time, platform, array) = ((TimeSeriesKey(date,
 time), Platform(platform)), HyperLogLog.fromBytes(array))
 }
 data.filter(_._1 == ((game, category))).map(_._2).leftOuterJoin(fil).map
 {
   case ((key, platform), (value, maybe)) =
 (key.date, key.time, platform.repr, 
 HyperLogLog.toBytes(maybe.fold(value)(array
 = value + array)))
 }.saveToCassandra(parameters.table.keyspace, family)
   }

 are causing this exception at runtime:

 15/01/20 18:54:52 ERROR Executor: Exception in task 1.3 in stage 23.0 (TID
 126)
 java.lang.NullPointerException
 at com.datastax.spark.connector.SparkContextFunctions.
 cassandraTable$default$3(SparkContextFunctions.scala:47)
 at com.mindcandy.services.mako.concurrentusers.
 ActiveUsersJobImpl.persist(ActiveUsersJobImpl.scala:51)
 at com.mindcandy.services.mako.concurrentusers.
 ActiveUsersJobImpl$$anonfun$sink$1$$anonfun$apply$2.apply(
 ActiveUsersJobImpl.scala:41)
 at com.mindcandy.services.mako.concurrentusers.
 ActiveUsersJobImpl$$anonfun$sink$1$$anonfun$apply$2.apply(
 ActiveUsersJobImpl.scala:40)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.
 scala:759)
 at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.
 scala:759)
 at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(
 SparkContext.scala:1143)
 at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(
 SparkContext.scala:1143)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.
 scala:62)
 at org.apache.spark.scheduler.Task.run(Task.scala:54)
 at org.apache.spark.executor.Executor$TaskRunner.run(
 Executor.scala:178)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(
 ThreadPoolExecutor.java:1145)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(
 ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)

 I'm using spark 1.1.1 and spark-cassandra-connector 1.1.1; line 47 of
 SparkContextFunctions.scala is the implicit CassandraConnector that uses
 the underlying spark context to retrieve the SparkConf.

 After a few hours debugging the code, the source of the problem is that,

 data.sparkContext

 is returning null. It seems that the RDD is serialised and the
 SparkContext is lost. Is this the expected behaviour? Is a known bug?

 I have ran out of ideas on how to make this work so I'm open to
 suggestions.

 Kind regards,

 Luis



Re: ClosureCleaner should use ClassLoader created by SparkContext

2015-01-21 Thread Aniket Bhatnagar
Here is the stack trace for reference. Notice that this happens in when the
job spawns a new thread.

java.lang.ClassNotFoundException: com.myclass$$anonfun$8$$anonfun$9
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
~[na:1.7.0_71]
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
~[na:1.7.0_71]
at java.security.AccessController.doPrivileged(Native Method)
~[na:1.7.0_71]
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
~[na:1.7.0_71]
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
~[na:1.7.0_71]
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
~[na:1.7.0_71]
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
~[na:1.7.0_71]
at java.lang.Class.forName0(Native Method) ~[na:1.7.0_71]
at java.lang.Class.forName(Class.java:274) ~[na:1.7.0_71]
at
org.apache.spark.util.InnerClosureFinder$$anon$4.visitMethodInsn(ClosureCleaner.scala:260)
~[org.apache.spark.spark-core_2.11-1.2.0.jar:1.2.0]
at
com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader.accept(Unknown
Source) ~[com.esotericsoftware.reflectasm.reflectasm-1.07-shaded.jar:na]
at
com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader.accept(Unknown
Source) ~[com.esotericsoftware.reflectasm.reflectasm-1.07-shaded.jar:na]
at
org.apache.spark.util.ClosureCleaner$.getInnerClasses(ClosureCleaner.scala:87)
~[org.apache.spark.spark-core_2.11-1.2.0.jar:1.2.0]
at
org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:107)
~[org.apache.spark.spark-core_2.11-1.2.0.jar:1.2.0]
at org.apache.spark.SparkContext.clean(SparkContext.scala:1435)
~[org.apache.spark.spark-core_2.11-1.2.0.jar:1.2.0]
at org.apache.spark.rdd.RDD.map(RDD.scala:271)
~[org.apache.spark.spark-core_2.11-1.2.0.jar:1.2.0]
at com.myclass.com$myclass$$load(myclass.scala:375) ~[na:na]
at
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
~[org.scala-lang.scala-library-2.11.5.jar:na]
at
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
~[org.scala-lang.scala-library-2.11.5.jar:na]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
[na:1.7.0_71]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
[na:1.7.0_71]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71]


On Wed Jan 21 2015 at 17:34:34 Aniket Bhatnagar aniket.bhatna...@gmail.com
wrote:

 While implementing a spark server, I realized that Thread's context loader
 must be set to any dynamically loaded classloader so that ClosureCleaner
 can do it's thing. Should the ClosureCleaner not use classloader created by
 SparkContext (that has all dynamically added jars via SparkContext.addJar)
 instead of using Thread.currentThread.getContextClassLoader while looking
 up class in InnerClosureFinder?

 Thanks,
 Aniket



NullPointer when access rdd.sparkContext (Spark 1.1.1)

2015-01-21 Thread Luis Ángel Vicente Sánchez
The following functions,

def sink(data: DStream[((GameID, Category), ((TimeSeriesKey, Platform),
HLL))]): Unit = {
data.foreachRDD { rdd =
  rdd.cache()
  val (minTime, maxTime): (Long, Long) =
rdd.map {
  case (_, ((TimeSeriesKey(_, time), _), _)) = (time, time)
}.fold((Long.MaxValue, Long.MinValue)) {
  case ((min, max), (num, _)) = (math.min(min, num), math.max(max,
num))
}
  if (minTime != Long.MaxValue  maxTime != Long.MinValue) {
rdd.map(_._1).distinct().foreach {
  case (game, category) = persist(game, category, minTime,
maxTime, rdd)
}
  }
  rdd.unpersist(blocking = false)
}
  }

  def persist(game: GameID, category: Category, min: Long, max: Long, data:
RDD[((GameID, Category), ((TimeSeriesKey, Platform), HLL))]): Unit = {
val family: String = s${parameters.table.family}_$
{game.repr}_${category.repr}
val cas: CassandraRDD[(Long, Long, String, Array[Byte])] =
  data.sparkContext.cassandraTable[(Long, Long, String,
Array[Byte])](parameters.table.keyspace, family)
val fil: RDD[((TimeSeriesKey, Platform), HLL)] =
  cas
.where(time = ?, new Date(min))
.where(time = ?, new Date(max))
.map {
  case (date, time, platform, array) = ((TimeSeriesKey(date,
time), Platform(platform)), HyperLogLog.fromBytes(array))
}
data.filter(_._1 == ((game, category))).map(_._2).leftOuterJoin(fil).map
{
  case ((key, platform), (value, maybe)) =
(key.date, key.time, platform.repr,
HyperLogLog.toBytes(maybe.fold(value)(array
= value + array)))
}.saveToCassandra(parameters.table.keyspace, family)
  }

are causing this exception at runtime:

15/01/20 18:54:52 ERROR Executor: Exception in task 1.3 in stage 23.0 (TID
126)
java.lang.NullPointerException
at com.datastax.spark.connector.SparkContextFunctions.
cassandraTable$default$3(SparkContextFunctions.scala:47)
at com.mindcandy.services.mako.concurrentusers.
ActiveUsersJobImpl.persist(ActiveUsersJobImpl.scala:51)
at com.mindcandy.services.mako.concurrentusers.
ActiveUsersJobImpl$$anonfun$sink$1$$anonfun$apply$2.apply(
ActiveUsersJobImpl.scala:41)
at com.mindcandy.services.mako.concurrentusers.
ActiveUsersJobImpl$$anonfun$sink$1$$anonfun$apply$2.apply(
ActiveUsersJobImpl.scala:40)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:759)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:759)
at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(
SparkContext.scala:1143)
at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(
SparkContext.scala:1143)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.
scala:62)
at org.apache.spark.scheduler.Task.run(Task.scala:54)
at org.apache.spark.executor.Executor$TaskRunner.run(
Executor.scala:178)
at java.util.concurrent.ThreadPoolExecutor.runWorker(
ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(
ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

I'm using spark 1.1.1 and spark-cassandra-connector 1.1.1; line 47 of
SparkContextFunctions.scala is the implicit CassandraConnector that uses
the underlying spark context to retrieve the SparkConf.

After a few hours debugging the code, the source of the problem is that,

data.sparkContext

is returning null. It seems that the RDD is serialised and the SparkContext
is lost. Is this the expected behaviour? Is a known bug?

I have ran out of ideas on how to make this work so I'm open to
suggestions.

Kind regards,

Luis


Re: Error for first run from iPython Notebook

2015-01-21 Thread Dave
Is this the wrong list to be asking this question? I'm not even sure where
to start troubleshooting.



On Tue, Jan 20, 2015 at 9:48 AM, Dave dla...@gmail.com wrote:

 Not sure if anyone who can help has seen this. Any suggestions would be
 appreciated, thanks!


 On Mon Jan 19 2015 at 1:50:43 PM Dave dla...@gmail.com wrote:

 Hi,

 I've setup my first spark cluster (1 master, 2 workers) and an iPython
 notebook server that I'm trying to setup to access the cluster. I'm running
 the workers from Anaconda to make sure the python setup is correct on each
 box. The iPy notebook server appears to have everything setup correctly,
 and I'm able to initialize Spark and push a job out. However, the job is
 failing, and I'm not sure how to troubleshoot. Here's the code:

 from pyspark import SparkContext
 CLUSTER_URL = 'spark://192.168.1.20:7077'
 sc = SparkContext( CLUSTER_URL, 'pyspark')
 def sample(p):
 x, y = random(), random()
 return 1 if x*x + y*y  1 else 0

 count = sc.parallelize(xrange(0, 20)).map(sample).reduce(lambda a, b: a +
 b)
 print Pi is roughly %f % (4.0 * count / 20)


 And here's the error:

 Py4JJavaError Traceback (most recent call 
 last)ipython-input-4-e8dce94b43bb in module()  3 return 1 if x*x 
 + y*y  1 else 0  4  5 count = sc.parallelize(xrange(0, 
 20)).map(sample).reduce(lambda a, b: a + b)  6 print Pi is roughly %f 
 % (4.0 * count / 20)
 /opt/spark-1.2.0/python/pyspark/rdd.pyc in reduce(self, f)713
  yield reduce(f, iterator, initial)714 -- 715 vals = 
 self.mapPartitions(func).collect()716 if vals:717
  return reduce(f, vals)
 /opt/spark-1.2.0/python/pyspark/rdd.pyc in collect(self)674   
   675 with SCCallSiteSync(self.context) as css:-- 676 
 bytesInJava = self._jrdd.collect().iterator()677 return 
 list(self._collect_iterator_through_file(bytesInJava))678
 /opt/spark-1.2.0/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in 
 __call__(self, *args)536 answer = 
 self.gateway_client.send_command(command)537 return_value = 
 get_return_value(answer, self.gateway_client,-- 538 
 self.target_id, self.name)539 540 for temp_arg in temp_args:
 /opt/spark-1.2.0/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in 
 get_return_value(answer, gateway_client, target_id, name)298 
 raise Py4JJavaError(299 'An error occurred while 
 calling {0}{1}{2}.\n'.-- 300 format(target_id, '.', 
 name), value)301 else:302 raise 
 Py4JError(
 Py4JJavaError: An error occurred while calling o28.collect.
 : org.apache.spark.SparkException: Job aborted due to stage failure: Task 31 
 in stage 0.0 failed 4 times, most recent failure: Lost task 31.3 in stage 
 0.0 (TID 72, 192.168.1.21): org.apache.spark.api.python.PythonException: 
 Traceback (most recent call last):
   File /opt/spark-1.2.0/python/pyspark/worker.py, line 107, in main
 process()
   File /opt/spark-1.2.0/python/pyspark/worker.py, line 98, in process
 serializer.dump_stream(func(split_index, iterator), outfile)
   File /opt/spark-1.2.0/python/pyspark/serializers.py, line 227, in 
 dump_stream
 vs = list(itertools.islice(iterator, batch))
   File /opt/spark-1.2.0/python/pyspark/rdd.py, line 710, in func
 initial = next(iterator)
   File ipython-input-4-e8dce94b43bb, line 2, in sample
 TypeError: 'module' object is not callable

  at 
 org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:137)
  at 
 org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:174)
  at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
  at org.apache.spark.scheduler.Task.run(Task.scala:56)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
  at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
  at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
  at java.lang.Thread.run(Thread.java:745)

 Driver stacktrace:
  at 
 org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
  at 
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
  at 
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
  at 
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
  at 
 

Re: Possible to restart (or stop and create) a StreamingContext

2015-01-21 Thread jamborta
Just found this in the documentation:

A SparkContext can be re-used to create multiple StreamingContexts, as long
as the previous StreamingContext is stopped (without stopping the
SparkContext) before the next StreamingContext is created.

in this case, I assume the error I reported above is a bug.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Possible-to-restart-or-stop-and-create-a-StreamingContext-tp21291p21294.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Finding most occurrences in a JSON Nested Array

2015-01-21 Thread Pankaj Narang
send me the current code here. I will fix and send back to you



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Finding-most-occurrences-in-a-JSON-Nested-Array-tp20971p21295.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: [SparkSQL] Try2: Parquet predicate pushdown troubles

2015-01-21 Thread Yana Kadiyska
Thanks for looking Cheng. Just to clarify in case other people need this
sooner, setting sc.hadoopConfiguration.set(parquet.task.side.metadata,
false)did work well in terms of dropping rowgroups/showing small input
size. What was odd about that is that the overall time wasn't much
better...but maybe that was overhead from sending the metadata clientside.

Thanks again and looking forward to your fix

On Tue, Jan 20, 2015 at 9:07 PM, Cheng Lian lian.cs@gmail.com wrote:

  Hey Yana,

 Sorry for the late reply, missed this important thread somehow. And many
 thanks for reporting this. It turned out to be a bug — filter pushdown is
 only enabled when using client side metadata, which is not expected,
 because task side metadata code path is more performant. And I guess that
 the reason why setting parquet.task.side.metadata to false didn’t reduce
 input size for you is because you set the configuration with Spark API, or
 put it into spark-defaults.conf. This configuration goes to Hadoop
 Configuration, and Spark only merge properties whose names start with
 spark.hadoop into Hadoop Configuration instances. You may try to put
 parquet.task.side.metadata config into Hadoop core-site.xml, and then
 re-run the query. I can see significant differences by doing so.

 I’ll open a JIRA and deliver a fix for this ASAP. Thanks again for
 reporting all the details!

 Cheng

 On 1/13/15 12:56 PM, Yana Kadiyska wrote:

   Attempting to bump this up in case someone can help out after all. I
 spent a few good hours stepping through the code today, so I'll summarize
 my observations both in hope I get some help and to help others that might
 be looking into this:

  1. I am setting *spark.sql.parquet.**filterPushdown=true*
 2. I can see by stepping through the driver debugger that
 PaquetTableOperations.execute sets the filters via
 ParquetInputFormat.setFilterPredicate (I checked the conf object, things
 appear OK there)
 3. In FilteringParquetRowInputFormat, I get through the codepath for
 getTaskSideSplits. It seems that the codepath for getClientSideSplits would
 try to drop rowGroups but I don't see similar in getTaskSideSplit.

  Does anyone have pointers on where to look after this? Where is rowgroup
 filtering happening in the case of getTaskSideSplits? I can attach to the
 executor but am not quite sure what code related to Parquet gets called
 executor side...also don't see any messages in the executor logs related to
 Filtering predicates.

 For comparison, I went through the getClientSideSplits and can see that
 predicate pushdown works OK:


 sc.hadoopConfiguration.set(parquet.task.side.metadata,false)

 15/01/13 20:04:49 INFO FilteringParquetRowInputFormat: Using Client Side 
 Metadata Split Strategy
 15/01/13 20:05:13 INFO FilterCompat: Filtering using predicate: eq(epoch, 
 1417384800)
 15/01/13 20:06:45 INFO FilteringParquetRowInputFormat: Dropping 572 row 
 groups that do not pass filter predicate (28 %) !

 ​

  Is it possible that this is just a UI bug? I can see Input=4G when using
 (parquet.task.side.metadata,false) and Input=140G when using
 (parquet.task.side.metadata,true) but the runtimes are very comparable?

  [image: Inline image 1]


  JobId 4 is the ClientSide split, JobId 5 is the TaskSide split.



  On Fri, Jan 9, 2015 at 2:56 PM, Yana Kadiyska yana.kadiy...@gmail.com
 wrote:

 I am running the following (connecting to an external Hive Metastore)

   /a/shark/spark/bin/spark-shell --master spark://ip:7077  --conf
 *spark.sql.parquet.filterPushdown=true*

  val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)

  and then ran two queries:

 sqlContext.sql(select count(*) from table where partition='blah' )
 andsqlContext.sql(select count(*) from table where partition='blah' and 
 epoch=1415561604)

 ​

  According to the Input tab in the UI both scan about 140G of data which
 is the size of my whole partition. So I have two questions --

  1. is there a way to tell from the plan if a predicate pushdown is
 supposed to happen?
 I see this for the second query

 res0: org.apache.spark.sql.SchemaRDD =
 SchemaRDD[0] at RDD at SchemaRDD.scala:108
 == Query Plan ==
 == Physical Plan ==
 Aggregate false, [], [Coalesce(SUM(PartialCount#49L),0) AS _c0#0L]
  Exchange SinglePartition
   Aggregate true, [], [COUNT(1) AS PartialCount#49L]
OutputFaker []
 Project []
  ParquetTableScan [epoch#139L], (ParquetRelation list of hdfs files

 ​
  2. am I doing something obviously wrong that this is not working? (Im
 guessing it's not woring because the input size for the second query shows
 unchanged and the execution time is almost 2x as long)

  thanks in advance for any insights


​



ClosureCleaner should use ClassLoader created by SparkContext

2015-01-21 Thread Aniket Bhatnagar
While implementing a spark server, I realized that Thread's context loader
must be set to any dynamically loaded classloader so that ClosureCleaner
can do it's thing. Should the ClosureCleaner not use classloader created by
SparkContext (that has all dynamically added jars via SparkContext.addJar)
instead of using Thread.currentThread.getContextClassLoader while looking
up class in InnerClosureFinder?

Thanks,
Aniket


RE: Closing over a var with changing value in Streaming application

2015-01-21 Thread Bob Tiernay
Maybe I'm misunderstanding something here, but couldn't this be done with 
broadcast variables? I there is the following caveat from the docs: 
In addition, the object v should not be modified after it is broadcast in 
order to ensure that all nodes get the same value of the broadcast variable 
(e.g. if the variable is shipped to a new node later)
But isn't this exactly the semantics you want (i.e. not the same value)?


Date: Wed, 21 Jan 2015 21:02:31 +0900
Subject: Re: Closing over a var with changing value in Streaming application
From: t...@preferred.jp
To: ak...@sigmoidanalytics.com
CC: user@spark.apache.org

Hi again,

On Wed, Jan 21, 2015 at 4:53 PM, Tobias Pfeiffer t...@preferred.jp wrote:On 
Wed, Jan 21, 2015 at 4:46 PM, Akhil Das ak...@sigmoidanalytics.com wrote:
How about using accumulators?
As far as I understand, they solve the part of the problem that I am not 
worried about, namely increasing the counter. I was more worried about getting 
that counter/accumulator value back to the executors.
Uh, I may have been a bit quick here...
So I had this one working:
  var totalNumberOfItems = 0L
  // update the keys of the stream data  val globallyIndexedItems = 
inputStream.map(keyVal =  (keyVal._1 + totalNumberOfItems, keyVal._2))  // 
increase the number of total seen items  inputStream.foreachRDD(rdd = {
totalNumberOfItems += rdd.count  })
and used the dstream.foreachRDD(rdd = someVar += rdd.count) pattern at a 
number of places.
Then, however, I added a  dstream.transformWith(otherDStream, func)call, which 
somehow changed the order in which the DStreams are computed. In particular, 
suddenly some of my DStream values were computed before the foreachRDD calls 
that set the proper variables were executed, which lead to completely 
unpredictable behavior. So especially when looking at the existence of 
spark.streaming.concurrentJobs, I suddenly feel like none of DStream 
computations done on executors should depend on the ordering of output 
operations done on the driver. (And I am afraid this includes accumulator 
updates.)
Thinking about this, I feel I don't even know how I can realize a globally 
(over the lifetime of my stream) increasing ID in my DStream. Do I need 
something like  val counts: DStream[(Int, Long)] = stream.count().map((1, 
_)).updateStateByKey(...)with a pseudo-key just to keep a tiny bit of state 
from one interval to the next?
Really thankful for any insights,Tobias
  

Re: NullPointer when access rdd.sparkContext (Spark 1.1.1)

2015-01-21 Thread Luis Ángel Vicente Sánchez
Yes, I have just found that. By replacing,

rdd.map(_._1).distinct().foreach {
  case (game, category) = persist(game, category, minTime,
maxTime, rdd)
}

with,

rdd.map(_._1).distinct().collect().foreach {
  case (game, category) = persist(game, category, minTime,
maxTime, rdd)
}

everything works as expected.

2015-01-21 14:18 GMT+00:00 Sean Owen so...@cloudera.com:

 It looks like you are trying to use the RDD in a distributed operation,
 which won't work. The context will be null.
 On Jan 21, 2015 1:50 PM, Luis Ángel Vicente Sánchez 
 langel.gro...@gmail.com wrote:

 The SparkContext is lost when I call the persist function from the sink
 function, just before the function call... everything works as intended so
 I guess is the FunctionN class serialisation what it's causing the problem.
 I will try to embed the functionality in the sink method to verify that.

 2015-01-21 12:35 GMT+00:00 Luis Ángel Vicente Sánchez 
 langel.gro...@gmail.com:

 The following functions,

 def sink(data: DStream[((GameID, Category), ((TimeSeriesKey, Platform),
 HLL))]): Unit = {
 data.foreachRDD { rdd =
   rdd.cache()
   val (minTime, maxTime): (Long, Long) =
 rdd.map {
   case (_, ((TimeSeriesKey(_, time), _), _)) = (time, time)
 }.fold((Long.MaxValue, Long.MinValue)) {
   case ((min, max), (num, _)) = (math.min(min, num),
 math.max(max, num))
 }
   if (minTime != Long.MaxValue  maxTime != Long.MinValue) {
 rdd.map(_._1).distinct().foreach {
   case (game, category) = persist(game, category, minTime,
 maxTime, rdd)
 }
   }
   rdd.unpersist(blocking = false)
 }
   }

   def persist(game: GameID, category: Category, min: Long, max: Long,
 data: RDD[((GameID, Category), ((TimeSeriesKey, Platform), HLL))]): Unit =
 {
 val family: String = s${parameters.table.family}_$
 {game.repr}_${category.repr}
 val cas: CassandraRDD[(Long, Long, String, Array[Byte])] =
   data.sparkContext.cassandraTable[(Long, Long, String,
 Array[Byte])](parameters.table.keyspace, family)
 val fil: RDD[((TimeSeriesKey, Platform), HLL)] =
   cas
 .where(time = ?, new Date(min))
 .where(time = ?, new Date(max))
 .map {
   case (date, time, platform, array) = ((TimeSeriesKey(date,
 time), Platform(platform)), HyperLogLog.fromBytes(array))
 }
 data.filter(_._1 == ((game, category))).map(_._2).leftOuterJoin(fil).map
 {
   case ((key, platform), (value, maybe)) =
 (key.date, key.time, platform.repr, 
 HyperLogLog.toBytes(maybe.fold(value)(array
 = value + array)))
 }.saveToCassandra(parameters.table.keyspace, family)
   }

 are causing this exception at runtime:

 15/01/20 18:54:52 ERROR Executor: Exception in task 1.3 in stage 23.0
 (TID 126)
 java.lang.NullPointerException
 at com.datastax.spark.connector.SparkContextFunctions.
 cassandraTable$default$3(SparkContextFunctions.scala:47)
 at com.mindcandy.services.mako.concurrentusers.
 ActiveUsersJobImpl.persist(ActiveUsersJobImpl.scala:51)
 at com.mindcandy.services.mako.concurrentusers.
 ActiveUsersJobImpl$$anonfun$sink$1$$anonfun$apply$2.apply(
 ActiveUsersJobImpl.scala:41)
 at com.mindcandy.services.mako.concurrentusers.
 ActiveUsersJobImpl$$anonfun$sink$1$$anonfun$apply$2.apply(
 ActiveUsersJobImpl.scala:40)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(
 Iterator.scala:1157)
 at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.
 scala:759)
 at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.
 scala:759)
 at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(
 SparkContext.scala:1143)
 at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(
 SparkContext.scala:1143)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.
 scala:62)
 at org.apache.spark.scheduler.Task.run(Task.scala:54)
 at org.apache.spark.executor.Executor$TaskRunner.run(
 Executor.scala:178)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(
 ThreadPoolExecutor.java:1145)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(
 ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)

 I'm using spark 1.1.1 and spark-cassandra-connector 1.1.1; line 47 of
 SparkContextFunctions.scala is the implicit CassandraConnector that uses
 the underlying spark context to retrieve the SparkConf.

 After a few hours debugging the code, the source of the problem is that,

 data.sparkContext

 is returning null. It seems that the RDD is serialised and the
 SparkContext is lost. Is this the expected behaviour? Is a known bug?

 I have ran out of ideas on how to make this work so I'm open to
 suggestions.

 Kind regards,

 Luis





Re: Discourse: A proposed alternative to the Spark User list

2015-01-21 Thread Jay Vyas
Its a very valid  idea indeed, but... It's a tricky  subject since the entire 
ASF is run on mailing lists , hence there are so many different but equally 
sound ways of looking at this idea, which conflict with one another.

 On Jan 21, 2015, at 7:03 AM, btiernay btier...@hotmail.com wrote:
 
 I think this is a really great idea for really opening up the discussions
 that happen here. Also, it would be nice to know why there doesn't seem to
 be much interest. Maybe I'm misunderstanding some nuance of Apache projects.
 
 Cheers
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Discourse-A-proposed-alternative-to-the-Spark-User-list-tp20851p21288.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Broadcast variable questions

2015-01-21 Thread Hao Ren
Hi,

Spark 1.2.0, standalone, local mode(for test)

Here are several questions on broadcast variable: 

1) Where is the broadcast variable cached on executors ? In memory or On
disk ?

I read somewhere, it was said these variables are stored in spark.local.dir.
But I can find any info in Spark 1.2 document. I encountered a problem with
broadcast variables. I have a loop in which a broadcast variable is created,
after 3 iteration, the used memory increased quickly until the full size,
and Spark is blocked, no error message, no exception, just blocked. I would
like to make sure whether it is caused by too many broadcast variables,
because I did not call unpersist() on each broadcast variable.

2) I find that broadcast variable has destroy() and unpersist() method,
what's the difference between them? If a broadcast variable is destroyed, is
it removed from where it is stored ?

Hao







--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Broadcast-variable-questions-tp21292.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Confused about shuffle read and shuffle write

2015-01-21 Thread Darin McBeath
 I have the following code in a Spark Job.

// Get the baseline input file(s)
JavaPairRDDText,Text hsfBaselinePairRDDReadable = 
sc.hadoopFile(baselineInputBucketFile, SequenceFileInputFormat.class, 
Text.class, Text.class);
JavaPairRDDString, String hsfBaselinePairRDD = 
hsfBaselinePairRDDReadable.mapToPair(newConvertFromWritableTypes()).partitionBy(newHashPartitioner(Variables.NUM_OUTPUT_PARTITIONS)).persist(StorageLevel.MEMORY_ONLY_SER());

// Use 'substring' to extract epoch values.
JavaPairRDDString, Long baselinePairRDD = 
hsfBaselinePairRDD.mapValues(newExtractEpoch(accumBadBaselineRecords)).persist(StorageLevel.MEMORY_ONLY_SER());

When looking at the STAGE information for my job, I notice the following:

To construct hsfBaselinePairRDD, it takes about 7.5 minutes, with 931GB of 
input (from S3) and 377GB of shuffle write (presumably because of the hash 
partitioning).  This all makes sense.

To construct the baselinePairRDD, it also takes about 7.5 minutes.  I thought 
that was a bit odd.  But what I thought was really odd is why there was also 
330GB of shuffle read in this stage.  I would have thought there should be 0 
shuffle read in this stage.  

What I'm confused about is why there is even any 'shuffle read' when 
constructing the baselinePairRDD.  If anyone could shed some light on this it 
would be appreciated.

Thanks.

Darin.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: NullPointer when access rdd.sparkContext (Spark 1.1.1)

2015-01-21 Thread Sean Owen
It looks like you are trying to use the RDD in a distributed operation,
which won't work. The context will be null.
On Jan 21, 2015 1:50 PM, Luis Ángel Vicente Sánchez 
langel.gro...@gmail.com wrote:

 The SparkContext is lost when I call the persist function from the sink
 function, just before the function call... everything works as intended so
 I guess is the FunctionN class serialisation what it's causing the problem.
 I will try to embed the functionality in the sink method to verify that.

 2015-01-21 12:35 GMT+00:00 Luis Ángel Vicente Sánchez 
 langel.gro...@gmail.com:

 The following functions,

 def sink(data: DStream[((GameID, Category), ((TimeSeriesKey, Platform),
 HLL))]): Unit = {
 data.foreachRDD { rdd =
   rdd.cache()
   val (minTime, maxTime): (Long, Long) =
 rdd.map {
   case (_, ((TimeSeriesKey(_, time), _), _)) = (time, time)
 }.fold((Long.MaxValue, Long.MinValue)) {
   case ((min, max), (num, _)) = (math.min(min, num),
 math.max(max, num))
 }
   if (minTime != Long.MaxValue  maxTime != Long.MinValue) {
 rdd.map(_._1).distinct().foreach {
   case (game, category) = persist(game, category, minTime,
 maxTime, rdd)
 }
   }
   rdd.unpersist(blocking = false)
 }
   }

   def persist(game: GameID, category: Category, min: Long, max: Long,
 data: RDD[((GameID, Category), ((TimeSeriesKey, Platform), HLL))]): Unit =
 {
 val family: String = s${parameters.table.family}_$
 {game.repr}_${category.repr}
 val cas: CassandraRDD[(Long, Long, String, Array[Byte])] =
   data.sparkContext.cassandraTable[(Long, Long, String,
 Array[Byte])](parameters.table.keyspace, family)
 val fil: RDD[((TimeSeriesKey, Platform), HLL)] =
   cas
 .where(time = ?, new Date(min))
 .where(time = ?, new Date(max))
 .map {
   case (date, time, platform, array) = ((TimeSeriesKey(date,
 time), Platform(platform)), HyperLogLog.fromBytes(array))
 }
 data.filter(_._1 == ((game, category))).map(_._2).leftOuterJoin(fil).map
 {
   case ((key, platform), (value, maybe)) =
 (key.date, key.time, platform.repr, 
 HyperLogLog.toBytes(maybe.fold(value)(array
 = value + array)))
 }.saveToCassandra(parameters.table.keyspace, family)
   }

 are causing this exception at runtime:

 15/01/20 18:54:52 ERROR Executor: Exception in task 1.3 in stage 23.0
 (TID 126)
 java.lang.NullPointerException
 at com.datastax.spark.connector.SparkContextFunctions.
 cassandraTable$default$3(SparkContextFunctions.scala:47)
 at com.mindcandy.services.mako.concurrentusers.
 ActiveUsersJobImpl.persist(ActiveUsersJobImpl.scala:51)
 at com.mindcandy.services.mako.concurrentusers.
 ActiveUsersJobImpl$$anonfun$sink$1$$anonfun$apply$2.apply(
 ActiveUsersJobImpl.scala:41)
 at com.mindcandy.services.mako.concurrentusers.
 ActiveUsersJobImpl$$anonfun$sink$1$$anonfun$apply$2.apply(
 ActiveUsersJobImpl.scala:40)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(
 Iterator.scala:1157)
 at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.
 scala:759)
 at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.
 scala:759)
 at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(
 SparkContext.scala:1143)
 at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(
 SparkContext.scala:1143)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.
 scala:62)
 at org.apache.spark.scheduler.Task.run(Task.scala:54)
 at org.apache.spark.executor.Executor$TaskRunner.run(
 Executor.scala:178)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(
 ThreadPoolExecutor.java:1145)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(
 ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)

 I'm using spark 1.1.1 and spark-cassandra-connector 1.1.1; line 47 of
 SparkContextFunctions.scala is the implicit CassandraConnector that uses
 the underlying spark context to retrieve the SparkConf.

 After a few hours debugging the code, the source of the problem is that,

 data.sparkContext

 is returning null. It seems that the RDD is serialised and the
 SparkContext is lost. Is this the expected behaviour? Is a known bug?

 I have ran out of ideas on how to make this work so I'm open to
 suggestions.

 Kind regards,

 Luis





sparkcontext.objectFile return thousands of partitions

2015-01-21 Thread Wang, Ningjun (LNG-NPV)
Why sc.objectFile(...) return a Rdd with thousands of partitions?

I save a rdd to file system using

rdd.saveAsObjectFile(file:///tmp/mydir)

Note that the rdd contains 7 millions object. I check the directory 
/tmp/mydir/, it contains 8 partitions

part-0  part-2  part-4  part-6  _SUCCESS
part-1  part-3  part-5  part-7

I then load the rdd back using

val rdd2 = sc.objectFile[LabeledPoint]( (file:///tmp/mydir, 8)

I expect rdd2 to have 8 partitions. But from the master UI, I see that rdd2 has 
over 1000 partitions. This is very inefficient. How can I limit it to 8 
partitions just like what is stored on the file system?

Regards,

Ningjun Wang
Consulting Software Engineer
LexisNexis
121 Chanlon Road
New Providence, NJ 07974-1541