Problem reading Parquet from 1.2 to 1.3

2015-06-03 Thread Don Drake
As part of upgrading a cluster from CDH 5.3.x to CDH 5.4.x I noticed that
Spark is behaving differently when reading Parquet directories that contain
a .metadata directory.

It seems that in spark 1.2.x, it would just ignore the .metadata directory,
but now that I'm using Spark 1.3, reading these files causes the following
exceptions:

scala val d = sqlContext.parquetFile(/user/ddrak/parq_dir)

SLF4J: Failed to load class org.slf4j.impl.StaticLoggerBinder.

SLF4J: Defaulting to no-operation (NOP) logger implementation

SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further
details.

scala.collection.parallel.CompositeThrowable: Multiple exceptions thrown
during a parallel computation: java.lang.RuntimeException:
hdfs://nameservice1/user/ddrak/parq_dir/.metadata/schema.avsc is not a
Parquet file. expected magic number at tail [80, 65, 82, 49] but found
[116, 34, 10, 125]

parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:427)

parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:398)

org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$5.apply(newParquet.scala:276)

org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$5.apply(newParquet.scala:275)

scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658)

scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54)

scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)

scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)

scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)

scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650)

.

.

.



java.lang.RuntimeException:
hdfs://nameservice1/user/ddrak/parq_dir/.metadata/schemas/1.avsc is not a
Parquet file. expected magic number at tail [80, 65, 82, 49] but found
[116, 34, 10, 125]

parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:427)

parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:398)

org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$5.apply(newParquet.scala:276)

org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$5.apply(newParquet.scala:275)

scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658)

scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54)

scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)

scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)

scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)

scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650)

.

.

.



java.lang.RuntimeException:
hdfs://nameservice1/user/ddrak/parq_dir/.metadata/descriptor.properties is
not a Parquet file. expected magic number at tail [80, 65, 82, 49] but
found [117, 101, 116, 10]

parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:427)

parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:398)

org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$5.apply(newParquet.scala:276)

org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$5.apply(newParquet.scala:275)

scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658)

scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54)

scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)

scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)

scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)

scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650)

.

.

.

at
scala.collection.parallel.package$$anon$1.alongWith(package.scala:87)

at
scala.collection.parallel.Task$class.mergeThrowables(Tasks.scala:86)

at
scala.collection.parallel.mutable.ParArray$Map.mergeThrowables(ParArray.scala:650)

at scala.collection.parallel.Task$class.tryMerge(Tasks.scala:72)

at
scala.collection.parallel.mutable.ParArray$Map.tryMerge(ParArray.scala:650)

at
scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.internal(Tasks.scala:190)

at
scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.internal(Tasks.scala:514)

at
scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:162)

at
scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514)

at
scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)

at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

at

Managing spark processes via supervisord

2015-06-03 Thread Mike Trienis
Hi All,

I am curious to know if anyone has successfully deployed a spark cluster
using supervisord?

   - http://supervisord.org/

Currently I am using the cluster launch scripts which are working greater,
however, every time I reboot my VM or development environment I need to
re-launch the cluster.

I am considering using supervisord to control all the processes (worker,
master, ect.. ) in order to have the cluster up an running after boot-up;
although I'd like to understand if it will cause more issues than it
solves.

Thanks, Mike.


Re: Managing spark processes via supervisord

2015-06-03 Thread Igor Berman
assuming you are talking about standalone cluster
imho, with workers you won't get any problems and it's straightforward
since they are usually foreground processes
with master it's a bit more complicated, ./sbin/start-master.sh goes
background which is not good for supervisor, but anyway I think it's
doable(going to setup it too in a few days)

On 3 June 2015 at 21:46, Mike Trienis mike.trie...@orcsol.com wrote:

 Hi All,

 I am curious to know if anyone has successfully deployed a spark cluster
 using supervisord?

- http://supervisord.org/

 Currently I am using the cluster launch scripts which are working greater,
 however, every time I reboot my VM or development environment I need to
 re-launch the cluster.

 I am considering using supervisord to control all the processes (worker,
 master, ect.. ) in order to have the cluster up an running after boot-up;
 although I'd like to understand if it will cause more issues than it
 solves.

 Thanks, Mike.




Python Image Library and Spark

2015-06-03 Thread Justin Spargur
Hi all,

 I'm playing around with manipulating images via Python and want to
utilize Spark for scalability. That said, I'm just learing Spark and my
Python is a bit rusty (been doing PHP coding for the last few years). I
think I have most of the process figured out. However, the script fails on
larger images and Spark is sending out the following warning for smaller
images:

Stage 0 contains a task of very large size (1151 KB). The maximum
recommended task size is 100 KB.

My code is as follows:

import Image
from pyspark import SparkContext

if __name__ == __main__:

imageFile = sample.jpg
outFile   = sample.gray.jpg

sc = SparkContext(appName=Grayscale)
im = Image.open(imageFile)

# Create an RDD for the data from the image file
img_data = sc.parallelize( list(im.getdata()) )

# Create an RDD for the grayscale value
gValue = img_data.map( lambda x: int(x[0]*0.21 + x[1]*0.72 + x[2]*0.07)
)

# Put our grayscale value into the RGR channels
grayscale = gValue.map( lambda x: (x,x,x)  )

# Save the output in a new image.
im.putdata( grayscale.collect() )

im.save(outFile)

Obviously, something is amiss. However, I can't figure out where I'm off
track with this. Any help is appreciated! Thanks in advance!!!


Re: Problem reading Parquet from 1.2 to 1.3

2015-06-03 Thread Marcelo Vanzin
(bcc: user@spark, cc:cdh-user@cloudera)

If you're using CDH, Spark SQL is currently unsupported and mostly
untested. I'd recommend trying to use it in CDH. You could try an upstream
version of Spark instead.

On Wed, Jun 3, 2015 at 1:39 PM, Don Drake dondr...@gmail.com wrote:

 As part of upgrading a cluster from CDH 5.3.x to CDH 5.4.x I noticed that
 Spark is behaving differently when reading Parquet directories that contain
 a .metadata directory.

 It seems that in spark 1.2.x, it would just ignore the .metadata
 directory, but now that I'm using Spark 1.3, reading these files causes the
 following exceptions:

 scala val d = sqlContext.parquetFile(/user/ddrak/parq_dir)

 SLF4J: Failed to load class org.slf4j.impl.StaticLoggerBinder.

 SLF4J: Defaulting to no-operation (NOP) logger implementation

 SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further
 details.

 scala.collection.parallel.CompositeThrowable: Multiple exceptions thrown
 during a parallel computation: java.lang.RuntimeException:
 hdfs://nameservice1/user/ddrak/parq_dir/.metadata/schema.avsc is not a
 Parquet file. expected magic number at tail [80, 65, 82, 49] but found
 [116, 34, 10, 125]

 parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:427)

 parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:398)


 org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$5.apply(newParquet.scala:276)


 org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$5.apply(newParquet.scala:275)

 scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658)


 scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54)

 scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)

 scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)

 scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)

 scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650)

 .

 .

 .



 java.lang.RuntimeException:
 hdfs://nameservice1/user/ddrak/parq_dir/.metadata/schemas/1.avsc is not a
 Parquet file. expected magic number at tail [80, 65, 82, 49] but found
 [116, 34, 10, 125]

 parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:427)

 parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:398)


 org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$5.apply(newParquet.scala:276)


 org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$5.apply(newParquet.scala:275)

 scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658)


 scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54)

 scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)

 scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)

 scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)

 scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650)

 .

 .

 .



 java.lang.RuntimeException:
 hdfs://nameservice1/user/ddrak/parq_dir/.metadata/descriptor.properties
 is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but
 found [117, 101, 116, 10]

 parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:427)

 parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:398)


 org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$5.apply(newParquet.scala:276)


 org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$5.apply(newParquet.scala:275)

 scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658)


 scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54)

 scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)

 scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)

 scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)

 scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650)

 .

 .

 .

 at
 scala.collection.parallel.package$$anon$1.alongWith(package.scala:87)

 at
 scala.collection.parallel.Task$class.mergeThrowables(Tasks.scala:86)

 at
 scala.collection.parallel.mutable.ParArray$Map.mergeThrowables(ParArray.scala:650)

 at scala.collection.parallel.Task$class.tryMerge(Tasks.scala:72)

 at
 scala.collection.parallel.mutable.ParArray$Map.tryMerge(ParArray.scala:650)

 at
 scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.internal(Tasks.scala:190)

 at
 scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.internal(Tasks.scala:514)

 at
 scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:162)

 at
 scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514)

 at
 

Re: Python Image Library and Spark

2015-06-03 Thread ayan guha
Try with large number of partition in parallelize.
On 4 Jun 2015 06:28, Justin Spargur jmspar...@gmail.com wrote:

 Hi all,

  I'm playing around with manipulating images via Python and want to
 utilize Spark for scalability. That said, I'm just learing Spark and my
 Python is a bit rusty (been doing PHP coding for the last few years). I
 think I have most of the process figured out. However, the script fails on
 larger images and Spark is sending out the following warning for smaller
 images:

 Stage 0 contains a task of very large size (1151 KB). The maximum
 recommended task size is 100 KB.

 My code is as follows:

 import Image
 from pyspark import SparkContext

 if __name__ == __main__:

 imageFile = sample.jpg
 outFile   = sample.gray.jpg

 sc = SparkContext(appName=Grayscale)
 im = Image.open(imageFile)

 # Create an RDD for the data from the image file
 img_data = sc.parallelize( list(im.getdata()) )

 # Create an RDD for the grayscale value
 gValue = img_data.map( lambda x: int(x[0]*0.21 + x[1]*0.72 +
 x[2]*0.07) )

 # Put our grayscale value into the RGR channels
 grayscale = gValue.map( lambda x: (x,x,x)  )

 # Save the output in a new image.
 im.putdata( grayscale.collect() )

 im.save(outFile)

 Obviously, something is amiss. However, I can't figure out where I'm off
 track with this. Any help is appreciated! Thanks in advance!!!



[ANNOUNCE] YARN support in Spark EC2

2015-06-03 Thread Shivaram Venkataraman
Hi all

We recently merged support for launching YARN clusters using Spark EC2
scripts as a part of
https://issues.apache.org/jira/browse/SPARK-3674. To use this you can pass
in hadoop-major-version as yarn to the spark-ec2 script and this will
setup Hadoop 2.4 HDFS, YARN and Spark built for YARN on the EC2 cluster.

Developers who work on features related to YARN might find this useful for
testing / benchmarking Spark with YARN. If anyone has questions or feedback
please let me know.

Thanks
Shivaram


Standard Scaler taking 1.5hrs

2015-06-03 Thread Piero Cinquegrana
Hello User group,

I have a RDD of LabeledPoint composed of sparse vectors like showing below. In 
the next step, I am standardizing the columns with the Standard Scaler. The 
data has 2450 columns and ~110M rows. It took 1.5hrs to complete the 
standardization with 10 nodes and 80 executors. The spark.executor.memory was 
set to 2g and the driver memory to 5g.

scala val parsedData = stack_sorted.mapPartitions( partition =
partition.map{row = 
LabeledPoint(row._2._1.getDouble(4), sparseVectorCat(row._2, CategoriesIdx, 
InteractionIds, tupleMap, vecLength))
 }, 
preservesPartitioning=true).cache()

CategoriesIdx: Array[Int] = Array(3, 8, 12)
InteractionIds: Array[(Int, Int)] = Array((13,12))
vecLength: Int = 2450
parsedData: 
org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] = 
MapPartitionsRDD[93] at mapPartitions at console:111
(1.0,(2450,[1322,1361,2430],[1.0,1.0,1.0]))
(0.0,(2450,[1322,1781,2430],[1.0,1.0,1.0]))
(2.0,(2450,[1322,2193,2430],[1.0,1.0,1.0]))
(1.0,(2450,[297,1322,2430],[1.0,1.0,1.0]))
(0.0,(2450,[898,1322,2430],[1.0,1.0,1.0]))


My suspicious is that because the data is partitioned using a custom 
partitioner the Standard Scaler is causing a major shuffle operation. Any 
suggestion on how to improve the performance this step and a 
LinearRegressionWithSGD() which is also taking a very long time?

scala parsedData.partitioner
res72: Option[org.apache.spark.Partitioner] = 
Some(org.apache.spark.HashPartitioner@d2mailto:org.apache.spark.HashPartitioner@d2)

scala val scaler = new StandardScaler(withMean = false, withStd = 
true).fit(parsedData.map( row =  row.features))
scala val scaledData = parsedData.mapPartitions(partition = partition.map{row 
= LabeledPoint(row.label, scaler.transform(row.features))}).cache()

scala val numIterations = 100
scala val stepSize = 0.1
scala val miniBatchFraction = 0.1
scala val algorithm = new LinearRegressionWithSGD()

scala algorithm.setIntercept(false)
scala algorithm.optimizer.setNumIterations(numIterations)
scala algorithm.optimizer.setStepSize(stepSize)
scala algorithm.optimizer.setMiniBatchFraction(miniBatchFraction)

scala val model = algorithm.run(scaledData)

Best,

Piero Cinquegrana
Marketing Scientist | MarketShare
11150 Santa Monica Blvd, 5th Floor, Los Angeles, CA 90025
P: 310.914.5677 x242 M: 323.377.9197
www.marketshare.comhttp://www.marketsharepartners.com/
twitter.com/marketsharephttp://twitter.com/marketsharep



Re: data localisation in spark

2015-06-03 Thread Sandy Ryza
Tasks are scheduled on executors based on data locality.  Things work as
you would expect in the example you brought up.

Through dynamic allocation, the number of executors can change throughout
the life time of an application.  10 executors (or 5 executors with 2 cores
each) are not needed for a reducebyKey with parallelism = 10.  If there are
fewer slots to run tasks than tasks, the tasks will just be run serially.

-Sandy

On Tue, Jun 2, 2015 at 11:24 AM, Shushant Arora shushantaror...@gmail.com
wrote:

  So in spark is after acquiring executors from ClusterManeger, does tasks
 are scheduled on executors based on datalocality ?I Mean if in an
 application there are 2 jobs and output of 1 job is used as input of
 another job.
 And in job1 I did persist on some RDD, then while running job2 will it use
 the same executor where job1's output was persisted or it acquire executor
 again and data movement happens?

 And is it true no of execuotrs in an application are fixed and acquired at
 start of application  and remains same throught application? If yes, how
 does it takes cares of explicit no of reducers in some of apis say
 rddd.reduceByKey(func,10);

 does at converting DAG to stages it calculates executors required and then
 acquire executors/worker nodes ?


 On Tue, Jun 2, 2015 at 11:06 PM, Sandy Ryza sandy.r...@cloudera.com
 wrote:

 It is not possible with JavaSparkContext either.  The API mentioned below
 currently does not have any effect (we should document this).

 The primary difference between MR and Spark here is that MR runs each
 task in its own YARN container, while Spark runs multiple tasks within an
 executor, which needs to be requested before Spark knows what tasks it will
 run.  Although dynamic allocation improves that last part.

 -Sandy

 On Tue, Jun 2, 2015 at 9:55 AM, Shushant Arora shushantaror...@gmail.com
  wrote:

 Is it possible in JavaSparkContext ?

 JavaSparkContext jsc = new JavaSparkContext(conf);
 JavaRDDStringlines = jsc.textFile(args[0]);

 If yes , does its programmer's responsibilty to first calculate splits
 locations and then instantiate spark context with preferred locations?

 How does its achieved in MR2 with yarn, there is Application Master
 specifies split locations to ResourceManager before acquiring the node
 managers ?



 On Mon, Jun 1, 2015 at 7:24 AM, bit1...@163.com bit1...@163.com wrote:

 Take a look at the following SparkContext constructor variant that
 tries to honor the data locality in YARN mode.

   /**
 * :: DeveloperApi ::
 * Alternative constructor for setting preferred locations where Spark
 will create executors.
 *
 * @param preferredNodeLocationData used in YARN mode to select nodes to
 launch containers on.
 * Can be generated using
 [[org.apache.spark.scheduler.InputFormatInfo.computePreferredLocations]]
 * from a list of input files or InputFormats for the application.
 */
 @DeveloperApi
 def this(config: SparkConf, preferredNodeLocationData: Map[String,
 Set[SplitInfo]]) = {
 this(config)
 this.preferredNodeLocationData = preferredNodeLocationData
 }

 --
 bit1...@163.com


 *From:* Shushant Arora shushantaror...@gmail.com
 *Date:* 2015-05-31 22:54
 *To:* user user@spark.apache.org
 *Subject:* data localisation in spark

 I want to understand how  spark takes care of data localisation in
 cluster mode when run on YARN.

 1.Driver program asks ResourceManager for executors. Does it tell
 yarn's RM to check HDFS blocks of input data and then allocate executors to
 it.
 And executors remain fixed throughout application or driver program
 asks for new executors when it submits another job in same application ,
 since in spark new job is created for each action . If executors are fixed
 then for second job achieving data localisation is impossible?



 2.When executors are done with their processing, does they are marked
 as free in ResourceManager's resoruce queue and  executors directly tell
 this to Rm  instead of via driver's ?

 Thanks
 Shushant







Re: Spark 1.4.0-rc4 HiveContext.table(db.tbl) NoSuchTableException

2015-06-03 Thread Yin Huai
Hi Doug,

Actually, sqlContext.table does not support database name in both Spark 1.3
and Spark 1.4. We will support it in future version.

Thanks,

Yin



On Wed, Jun 3, 2015 at 10:45 AM, Doug Balog doug.sparku...@dugos.com
wrote:

 Hi,

 sqlContext.table(“db.tbl”) isn’t working for me, I get a
 NoSuchTableException.

 But I can access the table via

 sqlContext.sql(“select * from db.tbl”)

 So I know it has the table info from the metastore.

 Anyone else see this ?

 I’ll keep digging.
 I compiled via make-distribution  -Pyarn -phadoop-2.4 -Phive
 -Phive-thriftserver
 It worked for me in 1.3.1

 Cheers,

 Doug


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: ALS Rating Object

2015-06-03 Thread Joseph Bradley
Hi Yasemin,

If you can convert your user IDs to Integers in pre-processing (if you have
 a couple billion users), that would work.  Otherwise...
In Spark 1.3: You may need to modify ALS to use Long instead of Int.
In Spark 1.4: spark.ml.recommendation.ALS (in the Pipeline API) exposes
ALS.train as a DeveloperApi to allow users to use Long instead of Int.
We're also thinking about better ways to permit Long IDs.

Joseph

On Wed, Jun 3, 2015 at 5:04 AM, Yasemin Kaya godo...@gmail.com wrote:

 Hi,

 I want to use Spark's ALS in my project. I have the userid
 like 30011397223227125563254 and Rating Object which is the Object of ALS
 wants Integer as a userid so the id field does not fit into a 32 bit
 Integer. How can I solve that ? Thanks.

 Best,
 yasemin
 --
 hiç ender hiç



RE: Make HTTP requests from within Spark

2015-06-03 Thread Mohammed Guller
The short answer is yes.

How you do it depends on a number of factors. Assuming you want to build an RDD 
from the responses and then analyze the responses using Spark core (not Spark 
Streaming), here is one simple way to do it:
1) Implement a class or function that connects to a web service and returns a 
list of responses. This code has no dependency on Spark. It will be the same 
whether you are using Spark or not. Obviously, you have to be take into account 
memory and latency requirements.
2) Call sc.parallelize on the list obtained in step 1. 

This is not the most efficient way of doing it, but hopefully gives you an idea.

Mohammed

-Original Message-
From: kasparfischer [mailto:kaspar.fisc...@dreizak.com] 
Sent: Wednesday, June 3, 2015 12:49 AM
To: user@spark.apache.org
Subject: Make HTTP requests from within Spark

Hi everybody,

I'm new to Spark, apologies if my question is very basic. 

I have a need to send millions of requests to a web service and analyse and 
store the responses in an RDD. I can easy express the analysing part using 
Spark's filter/map/etc. primitives but I don't know how to make the requests. 
Is that something I can do from within Spark? Or Spark Streaming?
Or does it conflict with the way Spark works?

I've found a similar question but am not sure whether the answer applies
here:

  
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-Spark-Streaming-from-an-HTTP-api-tp12330.html

Any clarifications or pointers would be super helpful!

Thanks,
Kaspar 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Make-HTTP-requests-from-within-Spark-tp23129.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RandomForest - subsamplingRate parameter

2015-06-03 Thread Andrew Leverentz
When training a RandomForest model, the Strategy class (in 
mllib.tree.configuration) provides a subsamplingRate parameter.  I was hoping 
to use this to cut down on processing time for large datasets (more than 2MM 
rows and 9K predictors), but I've found that the runtime stays approximately 
constant (and sometimes noticeably increases) when I try lowering the value of 
subsamplingRate.

Is this the expected behavior?  (And, if so, what is the intended purpose of 
this parameter?)

Of course, I could always just subsample the input dataset prior to running RF, 
but I was hoping that the subsamplingRate (which ostensibly affects the 
sampling used during RF bagging) would decrease the amount of data processing 
without requiring me to entirely ignore large subsets of the data.

Thanks,

~ Andrew


This email and any files transmitted with it are confidential, proprietary and 
intended solely for the individual or entity to whom they are addressed. If you 
have received this email in error please delete it immediately.


How to pass system properties in spark ?

2015-06-03 Thread Ashwin Shankar
Hi,
I'm trying to use property substitution in my log4j.properties, so that
I can choose where to write spark logs at runtime.
The problem is that, system property passed to spark shell
doesn't seem to getting propagated to log4j.

*Here is log4j.properites(partial) with a parameter 'spark.log.path' :*
log4j.appender.logFile=org.apache.log4j.FileAppender
log4j.appender.logFile.File=*${spark.log.path}*
log4j.appender.logFile.layout=org.apache.log4j.PatternLayout
log4j.appender.logFile.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p
%c{1}: %m%n

*Here is how I pass the 'spark.log.path' variable on command line :*
$spark-shell --conf
spark.driver.extraJavaOptions=-Dspark.log.path=/tmp/spark.log

I also tried :
$spark-shell -Dspark.log.path=/tmp/spark.log

*Result : */tmp*/*spark.log not getting created when I run spark.

Any ideas why this is happening ?

*When I enable log4j debug I see that following :*
log4j: Setting property [file] to [].
log4j: setFile called: , true
log4j:ERROR setFile(null,true) call failed.
java.io.FileNotFoundException:  (No such file or directory)
at java.io.FileOutputStream.open(Native Method)

-- 
Thanks,
Ashwin


Re: Spark Client

2015-06-03 Thread Akhil Das
Did you try this?

Create an sbt project like:

 // Create your context
 val sconf = new
SparkConf().setAppName(Sigmoid).setMaster(spark://sigmoid:7077)
 val sc = new SparkContext(sconf)

 // Do some computations
 sc.parallelize(1 to 1).take(10).foreach(println)

 //Now return the exit status
 System.exit(Some number)

 Now, make your workflow manager to trigger *sbt run* on the project
instead of using spark-submit.



Thanks
Best Regards

On Wed, Jun 3, 2015 at 2:18 PM, pavan kumar Kolamuri 
pavan.kolam...@gmail.com wrote:

 Hi akhil , sorry i may not conveying the question properly .  Actually we
 are looking to Launch a spark job from a long running workflow manager,
 which invokes spark client via SparkSubmit. Unfortunately the client upon
 successful completion of the application exits with a System.exit(0) or
 System.exit(NON_ZERO) when there is a failure. Question is, Is there an
 alternate  api though which a spark application can be launched which can
 return a exit status back to the caller as opposed to initiating JVM halt.

 On Wed, Jun 3, 2015 at 12:58 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Run it as a standalone application. Create an sbt project and do sbt run?

 Thanks
 Best Regards

 On Wed, Jun 3, 2015 at 11:36 AM, pavan kumar Kolamuri 
 pavan.kolam...@gmail.com wrote:

 Hi guys , i am new to spark . I am using sparksubmit to submit spark
 jobs. But for my use case i don't want it to be exit with System.exit . Is
 there any other spark client which is api friendly other than SparkSubmit
 which shouldn't exit with system.exit. Please correct me if i am missing
 something.

 Thanks in advance




 --
 Regards
 Pavan Kumar Kolamuri





 --
 Regards
 Pavan Kumar Kolamuri




run spark submit on cloudera cluster

2015-06-03 Thread Pa Rö
hi,

i want to run my spark app on a cluster,
i use cloudera live single node vm.


how i must build the job for the spark submit script?
and i must upload spark submit on hdfs?

best regards
paul


Re: Application is always in process when I check out logs of completed application

2015-06-03 Thread ayan guha
Have you done sc.stop() ? :)
On 3 Jun 2015 14:05, amghost zhengweita...@outlook.com wrote:

 I run spark application in spark standalone cluster with client deploy
 mode.
 I want to check out the logs of my finished application, but I always get
 a
 page telling me Application history not found - Application xxx is still
 in
 process.
 I am pretty sure that the application has indeed completed because I can
 see
 it in the Completed Applications list show by Spark WebUI, and I have also
 found the log file with suffix .inprocessin the directory set by
 spark.eventLog.dir in my spark-default.conf

 Oh, BTW, I am using spark 1.3.0

 So, is there anything I missed?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Application-is-always-in-process-when-I-check-out-logs-of-completed-application-tp23123.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Filter operation to return two RDDs at once.

2015-06-03 Thread Jeff Zhang
As far as I know, spark don't support multiple outputs

On Wed, Jun 3, 2015 at 2:15 PM, ayan guha guha.a...@gmail.com wrote:

 Why do you need to do that if filter and content of the resulting rdd are
 exactly same? You may as well declare them as 1 RDD.
 On 3 Jun 2015 15:28, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 I want to do this

 val qtSessionsWithQt = rawQtSession.filter(_._2.qualifiedTreatmentId
 != NULL_VALUE)

 val guidUidMapSessions = rawQtSession.filter(_._2.
 qualifiedTreatmentId == NULL_VALUE)

 This will run two different stages can this be done in one stage ?

 val (qtSessionsWithQt, guidUidMapSessions) = rawQtSession.
 *magicFilter*(_._2.qualifiedTreatmentId != NULL_VALUE)




 --
 Deepak




-- 
Best Regards

Jeff Zhang


MetaException(message:java.security.AccessControlException: Permission denied

2015-06-03 Thread patcharee

Hi,

I was running a spark job to insert overwrite hive table and got 
Permission denied. My question is why spark job did the insert by using 
user 'hive', not myself who ran the job? How can I fix the problem?


val hiveContext = new HiveContext(sc)
import hiveContext.implicits._
hiveContext.sql(INSERT OVERWRITE table 4dim ... )


Caused by: MetaException(message:java.security.AccessControlException: 
Permission denied: user=hive, access=WRITE, 
inode=/apps/hive/warehouse/wrf_tables/4dim/zone=2/z=1/year=2009/month=1:patcharee:hdfs:drwxr-xr-x
at 
org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkFsPermission(FSPermissionChecker.java:271)
at 
org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:257)
at 
org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:185)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6795)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6777)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPathAccess(FSNamesystem.java:6702)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkAccess(FSNamesystem.java:9529)
at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.checkAccess(NameNodeRpcServer.java:1516)
at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.checkAccess(ClientNamenodeProtocolServerSideTranslatorPB.java:1433)
at 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619)

at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:962)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2039)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2035)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)

at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2033)
)
at 
org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$alter_partition_result$alter_partition_resultStandardScheme.read(ThriftHiveMetastore.java)
at 
org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$alter_partition_result$alter_partition_resultStandardScheme.read(ThriftHiveMetastore.java)
at 
org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$alter_partition_result.read(ThriftHiveMetastore.java)

at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:78)
at 
org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_alter_partition(ThriftHiveMetastore.java:2033)
at 
org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.alter_partition(ThriftHiveMetastore.java:2018)
at 
org.apache.hadoop.hive.metastore.HiveMetaStoreClient.alter_partition(HiveMetaStoreClient.java:1091)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:89)

at com.sun.proxy.$Proxy37.alter_partition(Unknown Source)
at 
org.apache.hadoop.hive.ql.metadata.Hive.alterPartition(Hive.java:469)

... 26 more


BR,
Patcharee

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: ERROR cluster.YarnScheduler: Lost executor

2015-06-03 Thread Akhil Das
You need to look into your executor/worker logs to see whats going on.

Thanks
Best Regards

On Wed, Jun 3, 2015 at 12:01 PM, patcharee patcharee.thong...@uni.no
wrote:

 Hi,

 What can be the cause of this ERROR cluster.YarnScheduler: Lost executor?
 How can I fix it?

 Best,
 Patcharee

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Filter operation to return two RDDs at once.

2015-06-03 Thread Jeff Zhang
I check the RDD#randSplit, it is much more like multiple one-to-one
transformation rather than a one-to-multiple transformation.

I write one sample code as following, it would generate 3 stages. Although
we can use cache here to make it better, If spark can support multiple
outputs, only 2 stages are needed. ( This would be useful for pig's
multiple query and hive's self join )


val data = 
sc.textFile(/Users/jzhang/a.log).flatMap(line=line.split(\\s)).map(w=(w,1))
val parts = data.randomSplit(Array(0.2,0.8))
val joinResult = parts(0).join(parts(1))
println(joinResult.toDebugString)


(1) MapPartitionsRDD[8] at join at WordCount.scala:22 []
 |  MapPartitionsRDD[7] at join at WordCount.scala:22 []
 |  CoGroupedRDD[6] at join at WordCount.scala:22 []
 +-(1) PartitionwiseSampledRDD[4] at randomSplit at WordCount.scala:21 []
 |  |  MapPartitionsRDD[3] at map at WordCount.scala:20 []
 |  |  MapPartitionsRDD[2] at flatMap at WordCount.scala:20 []
 |  |  /Users/jzhang/a.log MapPartitionsRDD[1] at textFile at
WordCount.scala:20 []
 |  |  /Users/jzhang/a.log HadoopRDD[0] at textFile at WordCount.scala:20 []
 +-(1) PartitionwiseSampledRDD[5] at randomSplit at WordCount.scala:21 []
|  MapPartitionsRDD[3] at map at WordCount.scala:20 []
|  MapPartitionsRDD[2] at flatMap at WordCount.scala:20 []
|  /Users/jzhang/a.log MapPartitionsRDD[1] at textFile at
WordCount.scala:20 []
|  /Users/jzhang/a.log HadoopRDD[0] at textFile at WordCount.scala:20 []


On Wed, Jun 3, 2015 at 2:45 PM, Sean Owen so...@cloudera.com wrote:

 In the sense here, Spark actually does have operations that make multiple
 RDDs like randomSplit. However there is not an equivalent of the partition
 operation which gives the elements that matched and did not match at once.

 On Wed, Jun 3, 2015, 8:32 AM Jeff Zhang zjf...@gmail.com wrote:

 As far as I know, spark don't support multiple outputs

 On Wed, Jun 3, 2015 at 2:15 PM, ayan guha guha.a...@gmail.com wrote:

 Why do you need to do that if filter and content of the resulting rdd
 are exactly same? You may as well declare them as 1 RDD.
 On 3 Jun 2015 15:28, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 I want to do this

 val qtSessionsWithQt = rawQtSession.filter(_._2.
 qualifiedTreatmentId != NULL_VALUE)

 val guidUidMapSessions = rawQtSession.filter(_._2.
 qualifiedTreatmentId == NULL_VALUE)

 This will run two different stages can this be done in one stage ?

 val (qtSessionsWithQt, guidUidMapSessions) = rawQtSession.
 *magicFilter*(_._2.qualifiedTreatmentId != NULL_VALUE)




 --
 Deepak




 --
 Best Regards

 Jeff Zhang




-- 
Best Regards

Jeff Zhang


Re: ERROR cluster.YarnScheduler: Lost executor

2015-06-03 Thread Jeff Zhang
node down or container preempted ? You need to check the executor log /
node manager log for more info.

On Wed, Jun 3, 2015 at 2:31 PM, patcharee patcharee.thong...@uni.no wrote:

 Hi,

 What can be the cause of this ERROR cluster.YarnScheduler: Lost executor?
 How can I fix it?

 Best,
 Patcharee

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
Best Regards

Jeff Zhang


Re: DataFrames coming in SparkR in Apache Spark 1.4.0

2015-06-03 Thread Emaasit
You can build Spark from the 1.4 release branch yourself:
https://github.com/apache/spark/tree/branch-1.4



-
Daniel Emaasit, 
Ph.D. Research Assistant
Transportation Research Center (TRC)
University of Nevada, Las Vegas
Las Vegas, NV 89154-4015
Cell: 615-649-2489
www.danielemaasit.com 
--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/DataFrames-coming-in-SparkR-in-Apache-Spark-1-4-0-tp23116p23131.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Error: Building Spark 1.4.0 from Github-1.4 release branch

2015-06-03 Thread Emaasit
I run into errors while trying to build Spark from the 1.4 release branch
yourself: https://github.com/apache/spark/tree/branch-1.4. Any help will be
much appreciated. Here is the log file. (F.Y.I, I installed all the
dependencies like Java 7, Maven 3.2.5)


C:\Program Files\Apache Software Foundation\spark-branch-1.4mvn -Psparkr
-Pyarn
 -Phadoop-2.4 -Dhadoop.version=2.4.0 -DskipTests clean package
[INFO] Scanning for projects...
[INFO]

[INFO] Reactor Build Order:
[INFO]
[INFO] Spark Project Parent POM
[INFO] Spark Launcher Project
[INFO] Spark Project Networking
[INFO] Spark Project Shuffle Streaming Service
[INFO] Spark Project Unsafe
[INFO] Spark Project Core
[INFO] Spark Project Bagel
[INFO] Spark Project GraphX
[INFO] Spark Project Streaming
[INFO] Spark Project Catalyst
[INFO] Spark Project SQL
[INFO] Spark Project ML Library
[INFO] Spark Project Tools
[INFO] Spark Project Hive
[INFO] Spark Project REPL
[INFO] Spark Project YARN
[INFO] Spark Project Assembly
[INFO] Spark Project External Twitter
[INFO] Spark Project External Flume Sink
[INFO] Spark Project External Flume
[INFO] Spark Project External MQTT
[INFO] Spark Project External ZeroMQ
[INFO] Spark Project External Kafka
[INFO] Spark Project Examples
[INFO] Spark Project External Kafka Assembly
[INFO] Spark Project YARN Shuffle Service
[INFO]
[INFO]

[INFO] Building Spark Project Parent POM 1.4.0-SNAPSHOT
[INFO]

[INFO]
[INFO] --- maven-clean-plugin:2.6.1:clean (default-clean) @
spark-parent_2.10 --
-
[INFO]
[INFO] --- maven-enforcer-plugin:1.4:enforce (enforce-versions) @
spark-parent_2
.10 ---
[INFO]
[INFO] --- scala-maven-plugin:3.2.0:add-source (eclipse-add-source) @
spark-pare
nt_2.10 ---
[INFO] Add Source directory: C:\Program Files\Apache Software
Foundation\spark-b
ranch-1.4\src\main\scala
[INFO] Add Test Source directory: C:\Program Files\Apache Software
Foundation\sp
ark-branch-1.4\src\test\scala
[INFO]
[INFO] --- build-helper-maven-plugin:1.9.1:add-source (add-scala-sources) @
spar
k-parent_2.10 ---
[INFO] Source directory: C:\Program Files\Apache Software
Foundation\spark-branc
h-1.4\src\main\scala added.
[INFO]
[INFO] --- maven-remote-resources-plugin:1.5:process (default) @
spark-parent_2.
10 ---
[INFO]

[INFO] Reactor Summary:
[INFO]
[INFO] Spark Project Parent POM ... FAILURE [ 23.528
s]
[INFO] Spark Launcher Project . SKIPPED
[INFO] Spark Project Networking ... SKIPPED
[INFO] Spark Project Shuffle Streaming Service  SKIPPED
[INFO] Spark Project Unsafe ... SKIPPED
[INFO] Spark Project Core . SKIPPED
[INFO] Spark Project Bagel  SKIPPED
[INFO] Spark Project GraphX ... SKIPPED
[INFO] Spark Project Streaming  SKIPPED
[INFO] Spark Project Catalyst . SKIPPED
[INFO] Spark Project SQL .. SKIPPED
[INFO] Spark Project ML Library ... SKIPPED
[INFO] Spark Project Tools  SKIPPED
[INFO] Spark Project Hive . SKIPPED
[INFO] Spark Project REPL . SKIPPED
[INFO] Spark Project YARN . SKIPPED
[INFO] Spark Project Assembly . SKIPPED
[INFO] Spark Project External Twitter . SKIPPED
[INFO] Spark Project External Flume Sink .. SKIPPED
[INFO] Spark Project External Flume ... SKIPPED
[INFO] Spark Project External MQTT  SKIPPED
[INFO] Spark Project External ZeroMQ .. SKIPPED
[INFO] Spark Project External Kafka ... SKIPPED
[INFO] Spark Project Examples . SKIPPED
[INFO] Spark Project External Kafka Assembly .. SKIPPED
[INFO] Spark Project YARN Shuffle Service . SKIPPED
[INFO]

[INFO] BUILD FAILURE
[INFO]

[INFO] Total time: 24.680 s
[INFO] Finished at: 2015-06-03T02:11:35-07:00
[INFO] Final Memory: 27M/224M
[INFO]

[ERROR] Failed to execute goal
org.apache.maven.plugins:maven-remote-resources-p
lugin:1.5:process (default) on project spark-parent_2.10: Error finding
remote r
esources manifests: C:\Program Files\Apache Software
Foundation\spark-branch-1.4
\target\maven-shared-archive-resources\META-INF\NOTICE (The system 

Re: ERROR cluster.YarnScheduler: Lost executor

2015-06-03 Thread patcharee

Hi again,

Below is the log from executor

FetchFailed(BlockManagerId(4, compute-10-0.local, 38594), shuffleId=0, 
mapId=117, reduceId=117, message=
org.apache.spark.shuffle.FetchFailedException: Failed to connect to 
compute-10-0.local/10.10.255.241:38594
at 
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
at 
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
at 
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)

at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)

at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at 
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125)
at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160)
at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)
at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)

at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

at java.lang.Thread.run(Thread.java:744)
Caused by: java.io.IOException: Failed to connect to 
compute-10-0.local/10.10.255.241:38594
at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191)
at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
at 
org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78)
at 
org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
at 
org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
at 
org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)

at java.util.concurrent.FutureTask.run(FutureTask.java:262)
... 3 more
Caused by: java.net.ConnectException: Connection refused: 
compute-10-0.local/10.10.255.241:38594

at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at 
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:735)
at 
io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:208)
at 
io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:287)
at 

Spark Client

2015-06-03 Thread pavan kumar Kolamuri
Hi guys , i am new to spark . I am using sparksubmit to submit spark jobs.
But for my use case i don't want it to be exit with System.exit . Is there
any other spark client which is api friendly other than SparkSubmit which
shouldn't exit with system.exit. Please correct me if i am missing
something.

Thanks in advance




-- 
Regards
Pavan Kumar Kolamuri


Re: Filter operation to return two RDDs at once.

2015-06-03 Thread Sean Owen
In the sense here, Spark actually does have operations that make multiple
RDDs like randomSplit. However there is not an equivalent of the partition
operation which gives the elements that matched and did not match at once.

On Wed, Jun 3, 2015, 8:32 AM Jeff Zhang zjf...@gmail.com wrote:

 As far as I know, spark don't support multiple outputs

 On Wed, Jun 3, 2015 at 2:15 PM, ayan guha guha.a...@gmail.com wrote:

 Why do you need to do that if filter and content of the resulting rdd are
 exactly same? You may as well declare them as 1 RDD.
 On 3 Jun 2015 15:28, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 I want to do this

 val qtSessionsWithQt = rawQtSession.filter(_._2.qualifiedTreatmentId
 != NULL_VALUE)

 val guidUidMapSessions = rawQtSession.filter(_._2.
 qualifiedTreatmentId == NULL_VALUE)

 This will run two different stages can this be done in one stage ?

 val (qtSessionsWithQt, guidUidMapSessions) = rawQtSession.
 *magicFilter*(_._2.qualifiedTreatmentId != NULL_VALUE)




 --
 Deepak




 --
 Best Regards

 Jeff Zhang



Re: Scripting with groovy

2015-06-03 Thread Akhil Das
I think when you do a ssc.stop it will stop your entire application and by
update a transformation function you mean modifying the driver program?
In that case even if you checkpoint your application, it won't be able to
recover from its previous state.

A simpler approach would be to add certain conditions inside your
transformation function and switch them accordingly instead of modifying
the transformation.

Thanks
Best Regards

On Wed, Jun 3, 2015 at 4:27 AM, Paolo Platter paolo.plat...@agilelab.it
wrote:

  Hi all,

 Has anyone tried to add Scripting capabilities to spark streaming using
 groovy?
 I would like to stop the streaming context, update a transformation
 function written in groovy( for example to manipulate json ), restart the
 streaming context and obtain a new behavior without re-submit the
 application.

 Is it possible? Do you think it makes sense or there is a smarter way to
 accomplish that?

 Thanks
 Paolo

 Inviata dal mio Windows Phone



Re: Spark Client

2015-06-03 Thread pavan kumar Kolamuri
Hi akhil , sorry i may not conveying the question properly .  Actually we
are looking to Launch a spark job from a long running workflow manager,
which invokes spark client via SparkSubmit. Unfortunately the client upon
successful completion of the application exits with a System.exit(0) or
System.exit(NON_ZERO) when there is a failure. Question is, Is there an
alternate  api though which a spark application can be launched which can
return a exit status back to the caller as opposed to initiating JVM halt.

On Wed, Jun 3, 2015 at 12:58 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 Run it as a standalone application. Create an sbt project and do sbt run?

 Thanks
 Best Regards

 On Wed, Jun 3, 2015 at 11:36 AM, pavan kumar Kolamuri 
 pavan.kolam...@gmail.com wrote:

 Hi guys , i am new to spark . I am using sparksubmit to submit spark
 jobs. But for my use case i don't want it to be exit with System.exit . Is
 there any other spark client which is api friendly other than SparkSubmit
 which shouldn't exit with system.exit. Please correct me if i am missing
 something.

 Thanks in advance




 --
 Regards
 Pavan Kumar Kolamuri





-- 
Regards
Pavan Kumar Kolamuri


Re: ERROR cluster.YarnScheduler: Lost executor

2015-06-03 Thread patcharee

This is log I can get

15/06/02 16:37:31 INFO shuffle.RetryingBlockFetcher: Retrying fetch 
(2/3) for 4 outstanding blocks after 5000 ms
15/06/02 16:37:36 INFO client.TransportClientFactory: Found inactive 
connection to compute-10-3.local/10.10.255.238:33671, creating a new one.
15/06/02 16:37:36 WARN server.TransportChannelHandler: Exception in 
connection from /10.10.255.238:35430

java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:192)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
at 
io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:311)
at 
io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881)
at 
io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:225)
at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)

at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at 
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)

at java.lang.Thread.run(Thread.java:744)
15/06/02 16:37:36 ERROR server.TransportRequestHandler: Error sending 
result 
ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=1033433133943, 
chunkIndex=1}, 
buffer=FileSegmentManagedBuffer{file=/hdisk3/hadoop/yarn/local/usercache/patcharee/appcache/application_1432633634512_0213/blockmgr-12d59e6b-0895-4a0e-9d06-152d2f7ee855/09/shuffle_0_56_0.data, 
offset=896, length=1132499356}} to /10.10.255.238:35430; closing connection

java.nio.channels.ClosedChannelException
15/06/02 16:37:38 ERROR shuffle.RetryingBlockFetcher: Exception while 
beginning fetch of 4 outstanding blocks (after 2 retries)
java.io.IOException: Failed to connect to 
compute-10-3.local/10.10.255.238:33671
at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191)
at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
at 
org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78)
at 
org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
at 
org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
at 
org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)

at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

at java.lang.Thread.run(Thread.java:744)
Caused by: java.net.ConnectException: Connection refused: 
compute-10-3.local/10.10.255.238:33671

at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at 
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:735)
at 
io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:208)
at 
io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:287)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)

at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at 
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)

... 1 more



Best,
Patcharee

On 03. juni 2015 09:21, Akhil Das wrote:

You need to look into your executor/worker logs to see whats going on.

Thanks
Best Regards

On Wed, Jun 3, 2015 at 12:01 PM, patcharee patcharee.thong...@uni.no 
mailto:patcharee.thong...@uni.no wrote:


Hi,

What can be the cause of this ERROR cluster.YarnScheduler: Lost
executor? How can I fix it?

Best,
Patcharee

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
mailto:user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Re: Spark Client

2015-06-03 Thread Akhil Das
Run it as a standalone application. Create an sbt project and do sbt run?

Thanks
Best Regards

On Wed, Jun 3, 2015 at 11:36 AM, pavan kumar Kolamuri 
pavan.kolam...@gmail.com wrote:

 Hi guys , i am new to spark . I am using sparksubmit to submit spark jobs.
 But for my use case i don't want it to be exit with System.exit . Is there
 any other spark client which is api friendly other than SparkSubmit which
 shouldn't exit with system.exit. Please correct me if i am missing
 something.

 Thanks in advance




 --
 Regards
 Pavan Kumar Kolamuri




Make HTTP requests from within Spark

2015-06-03 Thread kasparfischer
Hi everybody,

I'm new to Spark, apologies if my question is very basic. 

I have a need to send millions of requests to a web service and analyse and
store the responses in an RDD. I can easy express the analysing part using
Spark's filter/map/etc. primitives but I don't know how to make the
requests. Is that something I can do from within Spark? Or Spark Streaming?
Or does it conflict with the way Spark works?

I've found a similar question but am not sure whether the answer applies
here:

  
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-Spark-Streaming-from-an-HTTP-api-tp12330.html

Any clarifications or pointers would be super helpful!

Thanks,
Kaspar 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Make-HTTP-requests-from-within-Spark-tp23129.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark 1.4.0-rc3: Actor not found

2015-06-03 Thread Anders Arpteg
Tried on some other data sources as well, and it actually works for some
parquet sources. Potentially some specific problems with that first parquet
source that I tried with, and not a Spark 1.4 problem. I'll get back with
more info if I find any new information.

Thanks,
Anders

On Tue, Jun 2, 2015 at 8:45 PM, Yin Huai yh...@databricks.com wrote:

 Does it happen every time you read a parquet source?

 On Tue, Jun 2, 2015 at 3:42 AM, Anders Arpteg arp...@spotify.com wrote:

 The log is from the log aggregation tool (hortonworks, yarn logs ...),
 so both executors and driver. I'll send a private mail to you with the full
 logs. Also, tried another job as you suggested, and it actually worked
 fine. The first job was reading from a parquet source, and the second from
 an avro source. Could there be some issues with the parquet reader?

 Thanks,
 Anders

 On Tue, Jun 2, 2015 at 11:53 AM, Shixiong Zhu zsxw...@gmail.com wrote:

 How about other jobs? Is it an executor log, or a driver log? Could you
 post other logs near this error, please? Thank you.

 Best Regards,
 Shixiong Zhu

 2015-06-02 17:11 GMT+08:00 Anders Arpteg arp...@spotify.com:

 Just compiled Spark 1.4.0-rc3 for Yarn 2.2 and tried running a job that
 worked fine for Spark 1.3. The job starts on the cluster (yarn-cluster
 mode), initial stage starts, but the job fails before any task succeeds
 with the following error. Any hints?

 [ERROR] [06/02/2015 09:05:36.962] [Executor task launch worker-0]
 [akka.tcp://sparkDriver@10.254.6.15:33986/user/CoarseGrainedScheduler]
 swallowing exception during message send
 (akka.remote.RemoteTransportExceptionNoStackTrace)
 Exception in thread main akka.actor.ActorNotFound: Actor not found
 for: ActorSelection[Anchor(akka.tcp://sparkDriver@10.254.6.15:33986/),
 Path(/user/OutputCommitCoordinator)]
 at
 akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65)
 at
 akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63)
 at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
 at
 akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
 at
 akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
 at
 akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
 at
 akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
 at
 scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
 at
 akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
 at
 akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74)
 at
 akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110)
 at
 akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73)
 at
 scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
 at
 scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
 at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:267)
 at
 akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:508)
 at
 akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:541)
 at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:531)
 at
 akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:87)
 at
 akka.remote.EndpointManager$$anonfun$1.applyOrElse(Remoting.scala:575)
 at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
 at akka.remote.EndpointManager.aroundReceive(Remoting.scala:395)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
 at akka.actor.ActorCell.invoke(ActorCell.scala:487)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
 at akka.dispatch.Mailbox.run(Mailbox.scala:220)
 at
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
 at
 scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)







Re: ERROR cluster.YarnScheduler: Lost executor

2015-06-03 Thread Akhil Das
Which version of spark? Looks like you are hitting this one
https://issues.apache.org/jira/browse/SPARK-4516

Thanks
Best Regards

On Wed, Jun 3, 2015 at 1:06 PM, patcharee patcharee.thong...@uni.no wrote:

  This is log I can get

 15/06/02 16:37:31 INFO shuffle.RetryingBlockFetcher: Retrying fetch (2/3)
 for 4 outstanding blocks after 5000 ms
 15/06/02 16:37:36 INFO client.TransportClientFactory: Found inactive
 connection to compute-10-3.local/10.10.255.238:33671, creating a new one.
 15/06/02 16:37:36 WARN server.TransportChannelHandler: Exception in
 connection from /10.10.255.238:35430
 java.io.IOException: Connection reset by peer
 at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
 at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
 at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
 at sun.nio.ch.IOUtil.read(IOUtil.java:192)
 at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
 at
 io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:311)
 at
 io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881)
 at
 io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:225)
 at
 io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
 at
 io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
 at
 io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
 at
 io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
 at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
 at
 io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
 at java.lang.Thread.run(Thread.java:744)
 15/06/02 16:37:36 ERROR server.TransportRequestHandler: Error sending
 result
 ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=1033433133943,
 chunkIndex=1},
 buffer=FileSegmentManagedBuffer{file=/hdisk3/hadoop/yarn/local/usercache/patcharee/appcache/application_1432633634512_0213/blockmgr-12d59e6b-0895-4a0e-9d06-152d2f7ee855/09/shuffle_0_56_0.data,
 offset=896, length=1132499356}} to /10.10.255.238:35430; closing
 connection
 java.nio.channels.ClosedChannelException
 15/06/02 16:37:38 ERROR shuffle.RetryingBlockFetcher: Exception while
 beginning fetch of 4 outstanding blocks (after 2 retries)
 java.io.IOException: Failed to connect to compute-10-3.local/
 10.10.255.238:33671
 at
 org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191)
 at
 org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
 at
 org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78)
 at
 org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
 at
 org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
 at
 org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)
 at
 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
 at java.util.concurrent.FutureTask.run(FutureTask.java:262)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:744)
 Caused by: java.net.ConnectException: Connection refused:
 compute-10-3.local/10.10.255.238:33671
 at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
 at
 sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:735)
 at
 io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:208)
 at
 io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:287)
 at
 io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
 at
 io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
 at
 io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
 at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
 at
 io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
 ... 1 more



 Best,
 Patcharee


 On 03. juni 2015 09:21, Akhil Das wrote:

  You need to look into your executor/worker logs to see whats going on.

  Thanks
 Best Regards

 On Wed, Jun 3, 2015 at 12:01 PM, patcharee patcharee.thong...@uni.no
 wrote:

 Hi,

 What can be the cause of this ERROR cluster.YarnScheduler: Lost executor?
 How can I fix it?

 Best,
 Patcharee

 

Re: Spark 1.4.0 build Error on Windows

2015-06-03 Thread Daniel Emaasit
I run into errors while trying to build Spark from the 1.4 release branch:
https://github.com/apache/spark/tree/branch-1.4. Any help will be much
appreciated. Here is the log file from my windows 8.1 PC. (F.Y.I, I
installed all the dependencies like Java 7, Maven 3.2.5 and set
the environment variables)


C:\Program Files\Apache Software Foundation\spark-branch-1.4mvn -Psparkr
-Pyarn
 -Phadoop-2.4 -Dhadoop.version=2.4.0 -DskipTests clean package
[INFO] Scanning for projects...
[INFO]

[INFO] Reactor Build Order:
[INFO]
[INFO] Spark Project Parent POM
[INFO] Spark Launcher Project
[INFO] Spark Project Networking
[INFO] Spark Project Shuffle Streaming Service
[INFO] Spark Project Unsafe
[INFO] Spark Project Core
[INFO] Spark Project Bagel
[INFO] Spark Project GraphX
[INFO] Spark Project Streaming
[INFO] Spark Project Catalyst
[INFO] Spark Project SQL
[INFO] Spark Project ML Library
[INFO] Spark Project Tools
[INFO] Spark Project Hive
[INFO] Spark Project REPL
[INFO] Spark Project YARN
[INFO] Spark Project Assembly
[INFO] Spark Project External Twitter
[INFO] Spark Project External Flume Sink
[INFO] Spark Project External Flume
[INFO] Spark Project External MQTT
[INFO] Spark Project External ZeroMQ
[INFO] Spark Project External Kafka
[INFO] Spark Project Examples
[INFO] Spark Project External Kafka Assembly
[INFO] Spark Project YARN Shuffle Service
[INFO]
[INFO]

[INFO] Building Spark Project Parent POM 1.4.0-SNAPSHOT
[INFO]

[INFO]
[INFO] --- maven-clean-plugin:2.6.1:clean (default-clean) @ spark-parent_2.10
-- 
-
[INFO]
[INFO] --- maven-enforcer-plugin:1.4:enforce (enforce-versions) @ spark
-parent_2
.10 ---
[INFO]
[INFO] --- scala-maven-plugin:3.2.0:add-source (eclipse-add-source) @ spark
-pare
nt_2.10 ---
[INFO] Add Source directory: C:\Program Files\Apache Software Foundation\
spark-b
ranch-1.4\src\main\scala
[INFO] Add Test Source directory: C:\Program Files\Apache Software
Foundation\sp
ark-branch-1.4\src\test\scala
[INFO]
[INFO] --- build-helper-maven-plugin:1.9.1:add-source (add-scala-sources) @
spar
k-parent_2.10 ---
[INFO] Source directory: C:\Program Files\Apache Software Foundation\spark
-branc
h-1.4\src\main\scala added.
[INFO]
[INFO] --- maven-remote-resources-plugin:1.5:process (default) @ spark
-parent_2.
10 ---
[INFO]

[INFO] Reactor Summary:
[INFO]
[INFO] Spark Project Parent POM ... FAILURE [
23.528 s]
[INFO] Spark Launcher Project . SKIPPED
[INFO] Spark Project Networking ... SKIPPED
[INFO] Spark Project Shuffle Streaming Service  SKIPPED
[INFO] Spark Project Unsafe ... SKIPPED
[INFO] Spark Project Core . SKIPPED
[INFO] Spark Project Bagel  SKIPPED
[INFO] Spark Project GraphX ... SKIPPED
[INFO] Spark Project Streaming  SKIPPED
[INFO] Spark Project Catalyst . SKIPPED
[INFO] Spark Project SQL .. SKIPPED
[INFO] Spark Project ML Library ... SKIPPED
[INFO] Spark Project Tools  SKIPPED
[INFO] Spark Project Hive . SKIPPED
[INFO] Spark Project REPL . SKIPPED
[INFO] Spark Project YARN . SKIPPED
[INFO] Spark Project Assembly . SKIPPED
[INFO] Spark Project External Twitter . SKIPPED
[INFO] Spark Project External Flume Sink .. SKIPPED
[INFO] Spark Project External Flume ... SKIPPED
[INFO] Spark Project External MQTT  SKIPPED
[INFO] Spark Project External ZeroMQ .. SKIPPED
[INFO] Spark Project External Kafka ... SKIPPED
[INFO] Spark Project Examples . SKIPPED
[INFO] Spark Project External Kafka Assembly .. SKIPPED
[INFO] Spark Project YARN Shuffle Service . SKIPPED
[INFO]

[INFO] BUILD FAILURE
[INFO]

[INFO] Total time: 24.680 s
[INFO] Finished at: 2015-06-03T02:11:35-07:00
[INFO] Final Memory: 27M/224M
[INFO]

[ERROR] Failed to execute goal
org.apache.maven.plugins:maven-remote-resources-p
lugin:1.5:process (default) on project spark-parent_2.10: Error finding
remote r
esources manifests: C:\Program Files\Apache Software Foundation\spark
-branch-1.4

Re: Filter operation to return two RDDs at once.

2015-06-03 Thread ayan guha
Why do you need to do that if filter and content of the resulting rdd are
exactly same? You may as well declare them as 1 RDD.
On 3 Jun 2015 15:28, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 I want to do this

 val qtSessionsWithQt = rawQtSession.filter(_._2.qualifiedTreatmentId
 != NULL_VALUE)

 val guidUidMapSessions = rawQtSession.filter(_._2.qualifiedTreatmentId
 == NULL_VALUE)

 This will run two different stages can this be done in one stage ?

 val (qtSessionsWithQt, guidUidMapSessions) = rawQtSession.
 *magicFilter*(_._2.qualifiedTreatmentId != NULL_VALUE)




 --
 Deepak




ERROR cluster.YarnScheduler: Lost executor

2015-06-03 Thread patcharee

Hi,

What can be the cause of this ERROR cluster.YarnScheduler: Lost 
executor? How can I fix it?


Best,
Patcharee

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: ERROR cluster.YarnScheduler: Lost executor

2015-06-03 Thread Saisai Shao
I think you could check the yarn nodemanager log or other Spark executor
logs to see the details. What you listed above of the exception stacks are
just the phenomenon, not the cause. Normally there will be some situations
which will lead to executor lost:

1. Killed by yarn cause of memory exceed, or preemption.
2. Killed by Spark itself when dynamic allocation is enabled.
3. Executor run into unexpected behavior and lost connection with driver.

You need to check the executor logs as well as yarn logs to find any clues.

Thanks
Saisai

2015-06-03 17:17 GMT+08:00 patcharee patcharee.thong...@uni.no:

  Hi again,

 Below is the log from executor

 FetchFailed(BlockManagerId(4, compute-10-0.local, 38594), shuffleId=0,
 mapId=117, reduceId=117, message=
 org.apache.spark.shuffle.FetchFailedException: Failed to connect to
 compute-10-0.local/10.10.255.241:38594
 at
 org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
 at
 org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
 at
 org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
 at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
 at
 org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
 at
 org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at
 org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125)
 at
 org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160)
 at
 org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)
 at
 scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at
 scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
 at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
 at org.apache.spark.scheduler.Task.run(Task.scala:64)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:744)
 Caused by: java.io.IOException: Failed to connect to compute-10-0.local/
 10.10.255.241:38594
 at
 org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191)
 at
 org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
 at
 org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78)
 at
 org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
 at
 org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
 at
 org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)
 at
 

RE: Objects serialized before foreachRDD/foreachPartition ?

2015-06-03 Thread Evo Eftimov
Dmitry was concerned about the “serialization cost” NOT the “memory footprint – 
hence option a) is still viable since a Broadcast is performed only ONCE for 
the lifetime of Driver instance 

 

From: Ted Yu [mailto:yuzhih...@gmail.com] 
Sent: Wednesday, June 3, 2015 2:44 PM
To: Evo Eftimov
Cc: dgoldenberg; user
Subject: Re: Objects serialized before foreachRDD/foreachPartition ?

 

Considering memory footprint of param as mentioned by Dmitry, option b seems 
better.

 

Cheers

 

On Wed, Jun 3, 2015 at 6:27 AM, Evo Eftimov evo.efti...@isecc.com wrote:

Hmmm a spark streaming app code doesn't execute in the linear fashion
assumed in your previous code snippet - to achieve your objectives you
should do something like the following

in terms of your second objective - saving the initialization and
serialization of the params you can:

a) broadcast them
b) have them as a Singleton (initialized from e.g. params in a file on HDFS)
on each Executor

messageBodies.foreachRDD(new FunctionJavaRDDlt;String, Void() {

Param param = new Param();
param.initialize();

  @Override
  public Void call(JavaRDDString rdd) throws Exception {
ProcessPartitionFunction func = new ProcessPartitionFunction(param);
rdd.foreachPartition(func);
return null;
  }

});

//put this in e.g. the object destructor
param.deinitialize();


-Original Message-
From: dgoldenberg [mailto:dgoldenberg...@gmail.com]
Sent: Wednesday, June 3, 2015 1:56 PM
To: user@spark.apache.org
Subject: Objects serialized before foreachRDD/foreachPartition ?

I'm looking at https://spark.apache.org/docs/latest/tuning.html.  Basically
the takeaway is that all objects passed into the code processing RDD's must
be serializable. So if I've got a few objects that I'd rather initialize
once and deinitialize once outside of the logic processing the RDD's, I'd
need to think twice about the costs of serializing such objects, it would
seem.

In the below, does the Spark serialization happen before calling foreachRDD
or before calling foreachPartition?

Param param = new Param();
param.initialize();
messageBodies.foreachRDD(new FunctionJavaRDDlt;String, Void() {
  @Override
  public Void call(JavaRDDString rdd) throws Exception {
ProcessPartitionFunction func = new ProcessPartitionFunction(param);
rdd.foreachPartition(func);
return null;
  }
});
param.deinitialize();

If param gets initialized to a significant memory footprint, are we better
off creating/initializing it before calling new ProcessPartitionFunction()
or perhaps in the 'call' method within that function?

I'm trying to avoid calling expensive init()/deinit() methods while
balancing against the serialization costs. Thanks.



--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/Objects-serialized-befor 
http://apache-spark-user-list.1001560.n3.nabble.com/Objects-serialized-before-foreachRDD-foreachPartition-tp23134.html
 
e-foreachRDD-foreachPartition-tp23134.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional
commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

 



columnar structure of RDDs from Parquet or ORC files

2015-06-03 Thread kiran lonikar
When spark reads parquet files (sqlContext.parquetFile), it creates a
DataFrame RDD. I would like to know if the resulting DataFrame has columnar
structure (many rows of a column coalesced together in memory) or its a row
wise structure that a spark RDD has. The section Spark SQL and DataFrames
http://spark.apache.org/docs/latest/sql-programming-guide.html#caching-data-in-memory
says
you need to call sqlContext.cacheTable(tableName) or df.cache() to make
it columnar. What exactly is this columnar structure?

To be precise: What does the row represent in the expression
df.cache().map{row = ...}?

Is it a logical row which maintains an array of columns and each column in
turn is an array of values for batchSize rows?

-Kiran


Re: Application is always in process when I check out logs of completed application

2015-06-03 Thread Richard Marscher
I had the same issue a couple days ago. It's a bug in 1.3.0 that is fixed
in 1.3.1 and up.

https://issues.apache.org/jira/browse/SPARK-6036

The ordering that the event logs are moved from in-progress to complete is
coded to be after the Master tries to build the history page for the logs.
The only reason it even works on occasion in 1.3.0 is because the Master
part is run asynchronously and the event log status change is synchronous,
so the Master part on some occasions could be executed afterwards as a race
condition.

On Wed, Jun 3, 2015 at 2:17 AM, ayan guha guha.a...@gmail.com wrote:

 Have you done sc.stop() ? :)
 On 3 Jun 2015 14:05, amghost zhengweita...@outlook.com wrote:

 I run spark application in spark standalone cluster with client deploy
 mode.
 I want to check out the logs of my finished application, but I always
 get  a
 page telling me Application history not found - Application xxx is still
 in
 process.
 I am pretty sure that the application has indeed completed because I can
 see
 it in the Completed Applications list show by Spark WebUI, and I have also
 found the log file with suffix .inprocessin the directory set by
 spark.eventLog.dir in my spark-default.conf

 Oh, BTW, I am using spark 1.3.0

 So, is there anything I missed?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Application-is-always-in-process-when-I-check-out-logs-of-completed-application-tp23123.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark Client

2015-06-03 Thread Richard Marscher
I think the short answer to the question is, no, there is no alternate API
that will not use the System.exit calls. You can craft a workaround like is
being suggested in this thread. For comparison, we are doing programmatic
submission of applications in a long-running client application. To get
around these issues we make a shadowed version of some of the Spark code in
our application to remove the System.exit calls so instead exceptions
bubble up to our application.

On Wed, Jun 3, 2015 at 7:19 AM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 Did you try this?

 Create an sbt project like:

  // Create your context
  val sconf = new
 SparkConf().setAppName(Sigmoid).setMaster(spark://sigmoid:7077)
  val sc = new SparkContext(sconf)

  // Do some computations
  sc.parallelize(1 to 1).take(10).foreach(println)

  //Now return the exit status
  System.exit(Some number)

  Now, make your workflow manager to trigger *sbt run* on the project
 instead of using spark-submit.



 Thanks
 Best Regards

 On Wed, Jun 3, 2015 at 2:18 PM, pavan kumar Kolamuri 
 pavan.kolam...@gmail.com wrote:

 Hi akhil , sorry i may not conveying the question properly .  Actually we
 are looking to Launch a spark job from a long running workflow manager,
 which invokes spark client via SparkSubmit. Unfortunately the client upon
 successful completion of the application exits with a System.exit(0) or
 System.exit(NON_ZERO) when there is a failure. Question is, Is there an
 alternate  api though which a spark application can be launched which can
 return a exit status back to the caller as opposed to initiating JVM halt.

 On Wed, Jun 3, 2015 at 12:58 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Run it as a standalone application. Create an sbt project and do sbt run?

 Thanks
 Best Regards

 On Wed, Jun 3, 2015 at 11:36 AM, pavan kumar Kolamuri 
 pavan.kolam...@gmail.com wrote:

 Hi guys , i am new to spark . I am using sparksubmit to submit spark
 jobs. But for my use case i don't want it to be exit with System.exit . Is
 there any other spark client which is api friendly other than SparkSubmit
 which shouldn't exit with system.exit. Please correct me if i am missing
 something.

 Thanks in advance




 --
 Regards
 Pavan Kumar Kolamuri





 --
 Regards
 Pavan Kumar Kolamuri





Does Apache Spark maintain a columnar structure when creating RDDs from Parquet or ORC files?

2015-06-03 Thread lonikar
When spark reads parquet files (sqlContext.parquetFile), it creates a
DataFrame RDD. I would like to know if the resulting DataFrame has columnar
structure (many rows of a column coalesced together in memory) or its a row
wise structure that a spark RDD has. The section  Spark SQL and DataFrames
http://spark.apache.org/docs/latest/sql-programming-guide.html#caching-data-in-memory
  
says you need to call sqlContext.cacheTable(tableName) or df.cache() to
make it columnar. What exactly is this columnar structure?

To be precise: What does the row represent in the expression
df.cache().map{row = ...}?

Is it a logical row which maintains an array of columns and each column in
turn is an array of values for batchSize rows?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Does-Apache-Spark-maintain-a-columnar-structure-when-creating-RDDs-from-Parquet-or-ORC-files-tp23139.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Equivalent to Storm's 'field grouping' in Spark.

2015-06-03 Thread allonsy
Hi everybody,
is there in Spark anything sharing the philosophy of Storm's field grouping?

I'd like to manage data partitioning across the workers by sending tuples
sharing the same key to the very same worker in the cluster, but I did not
find any method to do that.

Suggestions?

:)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Equivalent-to-Storm-s-field-grouping-in-Spark-tp23135.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Objects serialized before foreachRDD/foreachPartition ?

2015-06-03 Thread Evo Eftimov
Hmmm a spark streaming app code doesn't execute in the linear fashion
assumed in your previous code snippet - to achieve your objectives you
should do something like the following 

in terms of your second objective - saving the initialization and
serialization of the params you can:

a) broadcast them
b) have them as a Singleton (initialized from e.g. params in a file on HDFS)
on each Executor  

messageBodies.foreachRDD(new FunctionJavaRDDlt;String, Void() {

Param param = new Param();
param.initialize();

  @Override
  public Void call(JavaRDDString rdd) throws Exception {
ProcessPartitionFunction func = new ProcessPartitionFunction(param);
rdd.foreachPartition(func);
return null;
  }

});

//put this in e.g. the object destructor 
param.deinitialize();

-Original Message-
From: dgoldenberg [mailto:dgoldenberg...@gmail.com] 
Sent: Wednesday, June 3, 2015 1:56 PM
To: user@spark.apache.org
Subject: Objects serialized before foreachRDD/foreachPartition ?

I'm looking at https://spark.apache.org/docs/latest/tuning.html.  Basically
the takeaway is that all objects passed into the code processing RDD's must
be serializable. So if I've got a few objects that I'd rather initialize
once and deinitialize once outside of the logic processing the RDD's, I'd
need to think twice about the costs of serializing such objects, it would
seem.

In the below, does the Spark serialization happen before calling foreachRDD
or before calling foreachPartition?

Param param = new Param();
param.initialize();
messageBodies.foreachRDD(new FunctionJavaRDDlt;String, Void() {
  @Override
  public Void call(JavaRDDString rdd) throws Exception {
ProcessPartitionFunction func = new ProcessPartitionFunction(param);
rdd.foreachPartition(func);
return null;
  }
});
param.deinitialize();

If param gets initialized to a significant memory footprint, are we better
off creating/initializing it before calling new ProcessPartitionFunction()
or perhaps in the 'call' method within that function?

I'm trying to avoid calling expensive init()/deinit() methods while
balancing against the serialization costs. Thanks.



--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/Objects-serialized-befor
e-foreachRDD-foreachPartition-tp23134.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional
commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



ALS Rating Object

2015-06-03 Thread Yasemin Kaya
Hi,

I want to use Spark's ALS in my project. I have the userid
like 30011397223227125563254 and Rating Object which is the Object of ALS
wants Integer as a userid so the id field does not fit into a 32 bit
Integer. How can I solve that ? Thanks.

Best,
yasemin
-- 
hiç ender hiç


Re: in GraphX,program with Pregel runs slower and slower after several iterations

2015-06-03 Thread Cheuk Lam
I think you're exactly right.  I once had 100 iterations in a single Pregel
call, and got into the lineage problem right there.  I had to modify the
Pregel function and checkpoint both the graph and the newVerts RDD there to
cut off the lineage.  If you draw out the dependency graph among the g, the
newVerts RDD and the messages RDD inside the Pregel loop, then you will find
out we need to checkpoint two things to really cut off the lineage: the
graph itself and one of newVerts or messages.  This is how I did it inside
the Pregel loop:
...
prevG = g
g = g.outerJoinVertices(newVerts) { (vid, old, newOpt) =
newOpt.getOrElse(old) }
g.cache()
if (i % 50 == 0) {
  g.checkpoint
  newVerts.checkpoint
}
...

Also note: checkpointing is only effective before the RDD is materialized. 
If you checkpoint outside of Pregel, which means the graph is already
materialized (by the mapReduceTriplets call), then nothing will happen.  You
can examine that by looking at the RDD.toDebugString.  Therefore, I had to
apply the following workaround:
  val clonedGraph = graph.mapVertices((vid, vd) = vd).mapEdges{edge
= edge.attr}
  clonedGraph.checkpoint
  graph = clonedGraph





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/in-GraphX-program-with-Pregel-runs-slower-and-slower-after-several-iterations-tp23121p23133.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Objects serialized before foreachRDD/foreachPartition ?

2015-06-03 Thread dgoldenberg
I'm looking at https://spark.apache.org/docs/latest/tuning.html.  Basically
the takeaway is that all objects passed into the code processing RDD's must
be serializable. So if I've got a few objects that I'd rather initialize
once and deinitialize once outside of the logic processing the RDD's, I'd
need to think twice about the costs of serializing such objects, it would
seem.

In the below, does the Spark serialization happen before calling foreachRDD
or before calling foreachPartition?

Param param = new Param();
param.initialize();
messageBodies.foreachRDD(new FunctionJavaRDDlt;String, Void() {
  @Override
  public Void call(JavaRDDString rdd) throws Exception {
ProcessPartitionFunction func = new ProcessPartitionFunction(param);
rdd.foreachPartition(func);
return null;
  }
});
param.deinitialize();

If param gets initialized to a significant memory footprint, are we better
off creating/initializing it before calling new ProcessPartitionFunction()
or perhaps in the 'call' method within that function?

I'm trying to avoid calling expensive init()/deinit() methods while
balancing against the serialization costs. Thanks.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Objects-serialized-before-foreachRDD-foreachPartition-tp23134.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Objects serialized before foreachRDD/foreachPartition ?

2015-06-03 Thread Ted Yu
Considering memory footprint of param as mentioned by Dmitry, option b
seems better.

Cheers

On Wed, Jun 3, 2015 at 6:27 AM, Evo Eftimov evo.efti...@isecc.com wrote:

 Hmmm a spark streaming app code doesn't execute in the linear fashion
 assumed in your previous code snippet - to achieve your objectives you
 should do something like the following

 in terms of your second objective - saving the initialization and
 serialization of the params you can:

 a) broadcast them
 b) have them as a Singleton (initialized from e.g. params in a file on
 HDFS)
 on each Executor

 messageBodies.foreachRDD(new FunctionJavaRDDlt;String, Void() {

 Param param = new Param();
 param.initialize();

   @Override
   public Void call(JavaRDDString rdd) throws Exception {
 ProcessPartitionFunction func = new
 ProcessPartitionFunction(param);
 rdd.foreachPartition(func);
 return null;
   }

 });

 //put this in e.g. the object destructor
 param.deinitialize();

 -Original Message-
 From: dgoldenberg [mailto:dgoldenberg...@gmail.com]
 Sent: Wednesday, June 3, 2015 1:56 PM
 To: user@spark.apache.org
 Subject: Objects serialized before foreachRDD/foreachPartition ?

 I'm looking at https://spark.apache.org/docs/latest/tuning.html.
 Basically
 the takeaway is that all objects passed into the code processing RDD's must
 be serializable. So if I've got a few objects that I'd rather initialize
 once and deinitialize once outside of the logic processing the RDD's, I'd
 need to think twice about the costs of serializing such objects, it would
 seem.

 In the below, does the Spark serialization happen before calling foreachRDD
 or before calling foreachPartition?

 Param param = new Param();
 param.initialize();
 messageBodies.foreachRDD(new FunctionJavaRDDlt;String, Void() {
   @Override
   public Void call(JavaRDDString rdd) throws Exception {
 ProcessPartitionFunction func = new
 ProcessPartitionFunction(param);
 rdd.foreachPartition(func);
 return null;
   }
 });
 param.deinitialize();

 If param gets initialized to a significant memory footprint, are we better
 off creating/initializing it before calling new ProcessPartitionFunction()
 or perhaps in the 'call' method within that function?

 I'm trying to avoid calling expensive init()/deinit() methods while
 balancing against the serialization costs. Thanks.



 --
 View this message in context:

 http://apache-spark-user-list.1001560.n3.nabble.com/Objects-serialized-befor
 e-foreachRDD-foreachPartition-tp23134.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional
 commands, e-mail: user-h...@spark.apache.org



 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-06-03 Thread Dmitry Goldenberg
Great.

You should monitor vital performance / job clogging stats of the Spark
Streaming Runtime not “kafka topics” -- anything specific you were thinking
of?

On Wed, Jun 3, 2015 at 11:49 AM, Evo Eftimov evo.efti...@isecc.com wrote:

 Makes sense especially if you have a cloud with “infinite” resources /
 nodes which allows you to double, triple etc in the background/parallel the
 resources of the currently running cluster



 I was thinking more about the scenario where you have e.g. 100 boxes and
 want to / can add e.g. 20 more



 *From:* Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com]
 *Sent:* Wednesday, June 3, 2015 4:46 PM
 *To:* Evo Eftimov
 *Cc:* Cody Koeninger; Andrew Or; Gerard Maas; spark users
 *Subject:* Re: FW: Re: Autoscaling Spark cluster based on topic
 sizes/rate of growth in Kafka or Spark's metrics?



 Evo,



 One of the ideas is to shadow the current cluster. This way there's no
 extra latency incurred due to shutting down of the consumers. If two sets
 of consumers are running, potentially processing the same data, that is OK.
 We phase out the older cluster and gradually flip over to the new one,
 insuring no downtime or extra latency.  Thoughts?



 On Wed, Jun 3, 2015 at 11:27 AM, Evo Eftimov evo.efti...@isecc.com
 wrote:

 You should monitor vital performance / job clogging stats of the Spark
 Streaming Runtime not “kafka topics”



 You should be able to bring new worker nodes online and make them contact
 and register with the Master without bringing down the Master (or any of
 the currently running worker nodes)



 Then just shutdown your currently running spark streaming job/app and
 restart it with new params to take advantage of the larger cluster



 *From:* Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com]
 *Sent:* Wednesday, June 3, 2015 4:14 PM
 *To:* Cody Koeninger
 *Cc:* Andrew Or; Evo Eftimov; Gerard Maas; spark users
 *Subject:* Re: FW: Re: Autoscaling Spark cluster based on topic
 sizes/rate of growth in Kafka or Spark's metrics?



 Would it be possible to implement Spark autoscaling somewhat along these
 lines? --



 1. If we sense that a new machine is needed, by watching the data load in
 Kafka topic(s), then

 2. Provision a new machine via a Provisioner interface (e.g. talk to AWS
 and get a machine);

 3. Create a shadow/mirror Spark master running alongside the initial
 version which talks to N machines. The new mirror version is aware of N+1
 machines (or N+M if we had decided we needed M new boxes).

 4. The previous version of the Spark runtime is
 acquiesced/decommissioned.  We possibly get both clusters working on the
 same data which may actually be OK (at least for our specific use-cases).

 5. Now the new Spark cluster is running.



 Similarly, the decommissioning of M unused boxes would happen, via this
 notion of a mirror Spark runtime.  How feasible would it be for such a
 mirrorlike setup to be created, especially created programmatically?
 Especially point #3.



 The other idea we'd entertained was to bring in a new machine, acquiesce
 down all currently running workers by telling them to process their current
 batch then shut down, then restart the consumers now that Spark is aware of
 a modified cluster.  This has the drawback of a downtime that may not be
 tolerable in terms of latency, by the system's clients waiting for their
 responses in a synchronous fashion.



 Thanks.



 On Thu, May 28, 2015 at 5:15 PM, Cody Koeninger c...@koeninger.org
 wrote:

 I'm not sure that points 1 and 2 really apply to the kafka direct stream.
 There are no receivers, and you know at the driver how big each of your
 batches is.



 On Thu, May 28, 2015 at 2:21 PM, Andrew Or and...@databricks.com wrote:

 Hi all,



 As the author of the dynamic allocation feature I can offer a few insights
 here.



 Gerard's explanation was both correct and concise: dynamic allocation is
 not intended to be used in Spark streaming at the moment (1.4 or before).
 This is because of two things:



 (1) Number of receivers is necessarily fixed, and these are started in
 executors. Since we need a receiver for each InputDStream, if we kill these
 receivers we essentially stop the stream, which is not what we want. It
 makes little sense to close and restart a stream the same way we kill and
 relaunch executors.



 (2) Records come in every batch, and when there is data to process your
 executors are not idle. If your idle timeout is less than the batch
 duration, then you'll end up having to constantly kill and restart
 executors. If your idle timeout is greater than the batch duration, then
 you'll never kill executors.



 Long answer short, with Spark streaming there is currently no
 straightforward way to scale the size of your cluster. I had a long
 discussion with TD (Spark streaming lead) about what needs to be done to
 provide some semblance of dynamic scaling to streaming applications, e.g.
 take into account the batch queue instead. We came up with a 

Re: Spark 1.4.0 build Error on Windows

2015-06-03 Thread pawan kumar
I got the same error message when using maven 3.3 .
On Jun 3, 2015 8:58 AM, Ted Yu yuzhih...@gmail.com wrote:

 I used the same command on Linux but didn't reproduce the error.

 Can you include -X switch on your command line ?

 Also consider upgrading maven to 3.3.x

 Cheers

 On Wed, Jun 3, 2015 at 2:36 AM, Daniel Emaasit daniel.emaa...@gmail.com
 wrote:

 I run into errors while trying to build Spark from the 1.4 release
 branch: https://github.com/apache/spark/tree/branch-1.4. Any help will
 be much appreciated. Here is the log file from my windows 8.1 PC. (F.Y.I, I
 installed all the dependencies like Java 7, Maven 3.2.5 and set
 the environment variables)


 C:\Program Files\Apache Software Foundation\spark-branch-1.4mvn
 -Psparkr -Pyarn
  -Phadoop-2.4 -Dhadoop.version=2.4.0 -DskipTests clean package
 [INFO] Scanning for projects...
 [INFO]
 
 [INFO] Reactor Build Order:
 [INFO]
 [INFO] Spark Project Parent POM
 [INFO] Spark Launcher Project
 [INFO] Spark Project Networking
 [INFO] Spark Project Shuffle Streaming Service
 [INFO] Spark Project Unsafe
 [INFO] Spark Project Core
 [INFO] Spark Project Bagel
 [INFO] Spark Project GraphX
 [INFO] Spark Project Streaming
 [INFO] Spark Project Catalyst
 [INFO] Spark Project SQL
 [INFO] Spark Project ML Library
 [INFO] Spark Project Tools
 [INFO] Spark Project Hive
 [INFO] Spark Project REPL
 [INFO] Spark Project YARN
 [INFO] Spark Project Assembly
 [INFO] Spark Project External Twitter
 [INFO] Spark Project External Flume Sink
 [INFO] Spark Project External Flume
 [INFO] Spark Project External MQTT
 [INFO] Spark Project External ZeroMQ
 [INFO] Spark Project External Kafka
 [INFO] Spark Project Examples
 [INFO] Spark Project External Kafka Assembly
 [INFO] Spark Project YARN Shuffle Service
 [INFO]
 [INFO]
 
 [INFO] Building Spark Project Parent POM 1.4.0-SNAPSHOT
 [INFO]
 
 [INFO]
 [INFO] --- maven-clean-plugin:2.6.1:clean (default-clean) @ spark-parent_2.10
 --
 -
 [INFO]
 [INFO] --- maven-enforcer-plugin:1.4:enforce (enforce-versions) @ spark
 -parent_2
 .10 ---
 [INFO]
 [INFO] --- scala-maven-plugin:3.2.0:add-source (eclipse-add-source) @
 spark-pare
 nt_2.10 ---
 [INFO] Add Source directory: C:\Program Files\Apache Software Foundation\
 spark-b
 ranch-1.4\src\main\scala
 [INFO] Add Test Source directory: C:\Program Files\Apache Software
 Foundation\sp
 ark-branch-1.4\src\test\scala
 [INFO]
 [INFO] --- build-helper-maven-plugin:1.9.1:add-source (add-scala-sources)
 @ spar
 k-parent_2.10 ---
 [INFO] Source directory: C:\Program Files\Apache Software Foundation\
 spark-branc
 h-1.4\src\main\scala added.
 [INFO]
 [INFO] --- maven-remote-resources-plugin:1.5:process (default) @ spark
 -parent_2.
 10 ---
 [INFO]
 
 [INFO] Reactor Summary:
 [INFO]
 [INFO] Spark Project Parent POM ... FAILURE [
 23.528 s]
 [INFO] Spark Launcher Project . SKIPPED
 [INFO] Spark Project Networking ... SKIPPED
 [INFO] Spark Project Shuffle Streaming Service  SKIPPED
 [INFO] Spark Project Unsafe ... SKIPPED
 [INFO] Spark Project Core . SKIPPED
 [INFO] Spark Project Bagel  SKIPPED
 [INFO] Spark Project GraphX ... SKIPPED
 [INFO] Spark Project Streaming  SKIPPED
 [INFO] Spark Project Catalyst . SKIPPED
 [INFO] Spark Project SQL .. SKIPPED
 [INFO] Spark Project ML Library ... SKIPPED
 [INFO] Spark Project Tools  SKIPPED
 [INFO] Spark Project Hive . SKIPPED
 [INFO] Spark Project REPL . SKIPPED
 [INFO] Spark Project YARN . SKIPPED
 [INFO] Spark Project Assembly . SKIPPED
 [INFO] Spark Project External Twitter . SKIPPED
 [INFO] Spark Project External Flume Sink .. SKIPPED
 [INFO] Spark Project External Flume ... SKIPPED
 [INFO] Spark Project External MQTT  SKIPPED
 [INFO] Spark Project External ZeroMQ .. SKIPPED
 [INFO] Spark Project External Kafka ... SKIPPED
 [INFO] Spark Project Examples . SKIPPED
 [INFO] Spark Project External Kafka Assembly .. SKIPPED
 [INFO] Spark Project YARN Shuffle Service . SKIPPED
 [INFO]
 
 [INFO] BUILD FAILURE
 [INFO]
 

Re: Objects serialized before foreachRDD/foreachPartition ?

2015-06-03 Thread Dmitry Goldenberg
So Evo, option b is to singleton the Param, as in your modified snippet,
i.e. instantiate is once per an RDD.

But if I understand correctly the a) option is broadcast, meaning
instantiation is in the Driver once before any transformations and actions,
correct?  That's where my serialization costs concerns were.  There's the
Kryo serialization but Param might still be too heavy.  If some of its
member variables are lazy loaded we may be OK.  But it seems then on every
worker node the lazy initialization would have to happen to load these lazy
loaded resources into Param - ?

public class Param {
   // == potentially a very hefty resource to load
   private MapString, String dictionary = new HashMapString, String();
   ...
}

I'm groking that Spark will serialize Param right before the call to
foreachRDD, if we're to broadcast...



On Wed, Jun 3, 2015 at 9:58 AM, Evo Eftimov evo.efti...@isecc.com wrote:

 Dmitry was concerned about the “serialization cost” NOT the “memory
 footprint – hence option a) is still viable since a Broadcast is performed
 only ONCE for the lifetime of Driver instance



 *From:* Ted Yu [mailto:yuzhih...@gmail.com]
 *Sent:* Wednesday, June 3, 2015 2:44 PM
 *To:* Evo Eftimov
 *Cc:* dgoldenberg; user
 *Subject:* Re: Objects serialized before foreachRDD/foreachPartition ?



 Considering memory footprint of param as mentioned by Dmitry, option b
 seems better.



 Cheers



 On Wed, Jun 3, 2015 at 6:27 AM, Evo Eftimov evo.efti...@isecc.com wrote:

 Hmmm a spark streaming app code doesn't execute in the linear fashion
 assumed in your previous code snippet - to achieve your objectives you
 should do something like the following

 in terms of your second objective - saving the initialization and
 serialization of the params you can:

 a) broadcast them
 b) have them as a Singleton (initialized from e.g. params in a file on
 HDFS)
 on each Executor

 messageBodies.foreachRDD(new FunctionJavaRDDlt;String, Void() {

 Param param = new Param();
 param.initialize();

   @Override
   public Void call(JavaRDDString rdd) throws Exception {
 ProcessPartitionFunction func = new
 ProcessPartitionFunction(param);
 rdd.foreachPartition(func);
 return null;
   }

 });

 //put this in e.g. the object destructor
 param.deinitialize();


 -Original Message-
 From: dgoldenberg [mailto:dgoldenberg...@gmail.com]
 Sent: Wednesday, June 3, 2015 1:56 PM
 To: user@spark.apache.org
 Subject: Objects serialized before foreachRDD/foreachPartition ?

 I'm looking at https://spark.apache.org/docs/latest/tuning.html.
 Basically
 the takeaway is that all objects passed into the code processing RDD's must
 be serializable. So if I've got a few objects that I'd rather initialize
 once and deinitialize once outside of the logic processing the RDD's, I'd
 need to think twice about the costs of serializing such objects, it would
 seem.

 In the below, does the Spark serialization happen before calling foreachRDD
 or before calling foreachPartition?

 Param param = new Param();
 param.initialize();
 messageBodies.foreachRDD(new FunctionJavaRDDlt;String, Void() {
   @Override
   public Void call(JavaRDDString rdd) throws Exception {
 ProcessPartitionFunction func = new
 ProcessPartitionFunction(param);
 rdd.foreachPartition(func);
 return null;
   }
 });
 param.deinitialize();

 If param gets initialized to a significant memory footprint, are we better
 off creating/initializing it before calling new ProcessPartitionFunction()
 or perhaps in the 'call' method within that function?

 I'm trying to avoid calling expensive init()/deinit() methods while
 balancing against the serialization costs. Thanks.



 --
 View this message in context:

 http://apache-spark-user-list.1001560.n3.nabble.com/Objects-serialized-befor
 e-foreachRDD-foreachPartition-tp23134.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional
 commands, e-mail: user-h...@spark.apache.org



 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-06-03 Thread Dmitry Goldenberg
If we have a hand-off between the older consumer and the newer consumer, I
wonder if we need to manually manage the offsets in Kafka so as not to miss
some messages as the hand-off is happening.

Or if we let the new consumer run for a bit then let the old consumer know
the 'new guy is in town' then the old consumer can be shut off.  Some
overlap is OK...

On Wed, Jun 3, 2015 at 11:49 AM, Evo Eftimov evo.efti...@isecc.com wrote:

 Makes sense especially if you have a cloud with “infinite” resources /
 nodes which allows you to double, triple etc in the background/parallel the
 resources of the currently running cluster



 I was thinking more about the scenario where you have e.g. 100 boxes and
 want to / can add e.g. 20 more



 *From:* Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com]
 *Sent:* Wednesday, June 3, 2015 4:46 PM
 *To:* Evo Eftimov
 *Cc:* Cody Koeninger; Andrew Or; Gerard Maas; spark users
 *Subject:* Re: FW: Re: Autoscaling Spark cluster based on topic
 sizes/rate of growth in Kafka or Spark's metrics?



 Evo,



 One of the ideas is to shadow the current cluster. This way there's no
 extra latency incurred due to shutting down of the consumers. If two sets
 of consumers are running, potentially processing the same data, that is OK.
 We phase out the older cluster and gradually flip over to the new one,
 insuring no downtime or extra latency.  Thoughts?



 On Wed, Jun 3, 2015 at 11:27 AM, Evo Eftimov evo.efti...@isecc.com
 wrote:

 You should monitor vital performance / job clogging stats of the Spark
 Streaming Runtime not “kafka topics”



 You should be able to bring new worker nodes online and make them contact
 and register with the Master without bringing down the Master (or any of
 the currently running worker nodes)



 Then just shutdown your currently running spark streaming job/app and
 restart it with new params to take advantage of the larger cluster



 *From:* Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com]
 *Sent:* Wednesday, June 3, 2015 4:14 PM
 *To:* Cody Koeninger
 *Cc:* Andrew Or; Evo Eftimov; Gerard Maas; spark users
 *Subject:* Re: FW: Re: Autoscaling Spark cluster based on topic
 sizes/rate of growth in Kafka or Spark's metrics?



 Would it be possible to implement Spark autoscaling somewhat along these
 lines? --



 1. If we sense that a new machine is needed, by watching the data load in
 Kafka topic(s), then

 2. Provision a new machine via a Provisioner interface (e.g. talk to AWS
 and get a machine);

 3. Create a shadow/mirror Spark master running alongside the initial
 version which talks to N machines. The new mirror version is aware of N+1
 machines (or N+M if we had decided we needed M new boxes).

 4. The previous version of the Spark runtime is
 acquiesced/decommissioned.  We possibly get both clusters working on the
 same data which may actually be OK (at least for our specific use-cases).

 5. Now the new Spark cluster is running.



 Similarly, the decommissioning of M unused boxes would happen, via this
 notion of a mirror Spark runtime.  How feasible would it be for such a
 mirrorlike setup to be created, especially created programmatically?
 Especially point #3.



 The other idea we'd entertained was to bring in a new machine, acquiesce
 down all currently running workers by telling them to process their current
 batch then shut down, then restart the consumers now that Spark is aware of
 a modified cluster.  This has the drawback of a downtime that may not be
 tolerable in terms of latency, by the system's clients waiting for their
 responses in a synchronous fashion.



 Thanks.



 On Thu, May 28, 2015 at 5:15 PM, Cody Koeninger c...@koeninger.org
 wrote:

 I'm not sure that points 1 and 2 really apply to the kafka direct stream.
 There are no receivers, and you know at the driver how big each of your
 batches is.



 On Thu, May 28, 2015 at 2:21 PM, Andrew Or and...@databricks.com wrote:

 Hi all,



 As the author of the dynamic allocation feature I can offer a few insights
 here.



 Gerard's explanation was both correct and concise: dynamic allocation is
 not intended to be used in Spark streaming at the moment (1.4 or before).
 This is because of two things:



 (1) Number of receivers is necessarily fixed, and these are started in
 executors. Since we need a receiver for each InputDStream, if we kill these
 receivers we essentially stop the stream, which is not what we want. It
 makes little sense to close and restart a stream the same way we kill and
 relaunch executors.



 (2) Records come in every batch, and when there is data to process your
 executors are not idle. If your idle timeout is less than the batch
 duration, then you'll end up having to constantly kill and restart
 executors. If your idle timeout is greater than the batch duration, then
 you'll never kill executors.



 Long answer short, with Spark streaming there is currently no
 straightforward way to scale the size of your cluster. I had a long
 discussion 

Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-06-03 Thread Dmitry Goldenberg
Would it be possible to implement Spark autoscaling somewhat along these
lines? --

1. If we sense that a new machine is needed, by watching the data load in
Kafka topic(s), then
2. Provision a new machine via a Provisioner interface (e.g. talk to AWS
and get a machine);
3. Create a shadow/mirror Spark master running alongside the initial
version which talks to N machines. The new mirror version is aware of N+1
machines (or N+M if we had decided we needed M new boxes).
4. The previous version of the Spark runtime is acquiesced/decommissioned.
We possibly get both clusters working on the same data which may actually
be OK (at least for our specific use-cases).
5. Now the new Spark cluster is running.

Similarly, the decommissioning of M unused boxes would happen, via this
notion of a mirror Spark runtime.  How feasible would it be for such a
mirrorlike setup to be created, especially created programmatically?
Especially point #3.

The other idea we'd entertained was to bring in a new machine, acquiesce
down all currently running workers by telling them to process their current
batch then shut down, then restart the consumers now that Spark is aware of
a modified cluster.  This has the drawback of a downtime that may not be
tolerable in terms of latency, by the system's clients waiting for their
responses in a synchronous fashion.

Thanks.

On Thu, May 28, 2015 at 5:15 PM, Cody Koeninger c...@koeninger.org wrote:

 I'm not sure that points 1 and 2 really apply to the kafka direct stream.
 There are no receivers, and you know at the driver how big each of your
 batches is.

 On Thu, May 28, 2015 at 2:21 PM, Andrew Or and...@databricks.com wrote:

 Hi all,

 As the author of the dynamic allocation feature I can offer a few
 insights here.

 Gerard's explanation was both correct and concise: dynamic allocation is
 not intended to be used in Spark streaming at the moment (1.4 or before).
 This is because of two things:

 (1) Number of receivers is necessarily fixed, and these are started in
 executors. Since we need a receiver for each InputDStream, if we kill these
 receivers we essentially stop the stream, which is not what we want. It
 makes little sense to close and restart a stream the same way we kill and
 relaunch executors.

 (2) Records come in every batch, and when there is data to process your
 executors are not idle. If your idle timeout is less than the batch
 duration, then you'll end up having to constantly kill and restart
 executors. If your idle timeout is greater than the batch duration, then
 you'll never kill executors.

 Long answer short, with Spark streaming there is currently no
 straightforward way to scale the size of your cluster. I had a long
 discussion with TD (Spark streaming lead) about what needs to be done to
 provide some semblance of dynamic scaling to streaming applications, e.g.
 take into account the batch queue instead. We came up with a few ideas that
 I will not detail here, but we are looking into this and do intend to
 support it in the near future.

 -Andrew



 2015-05-28 8:02 GMT-07:00 Evo Eftimov evo.efti...@isecc.com:

 Probably you should ALWAYS keep the RDD storage policy to MEMORY AND DISK
 – it will be your insurance policy against sys crashes due to memory leaks.
 Until there is free RAM, spark streaming (spark) will NOT resort to disk –
 and of course resorting to disk from time to time (ie when there is no free
 RAM ) and taking a performance hit from that, BUT only until there is no
 free RAM



 *From:* Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com]
 *Sent:* Thursday, May 28, 2015 2:34 PM
 *To:* Evo Eftimov
 *Cc:* Gerard Maas; spark users
 *Subject:* Re: FW: Re: Autoscaling Spark cluster based on topic
 sizes/rate of growth in Kafka or Spark's metrics?



 Evo, good points.



 On the dynamic resource allocation, I'm surmising this only works within
 a particular cluster setup.  So it improves the usage of current cluster
 resources but it doesn't make the cluster itself elastic. At least, that's
 my understanding.



 Memory + disk would be good and hopefully it'd take *huge* load on the
 system to start exhausting the disk space too.  I'd guess that falling onto
 disk will make things significantly slower due to the extra I/O.



 Perhaps we'll really want all of these elements eventually.  I think
 we'd want to start with memory only, keeping maxRate low enough not to
 overwhelm the consumers; implement the cluster autoscaling.  We might
 experiment with dynamic resource allocation before we get to implement the
 cluster autoscale.







 On Thu, May 28, 2015 at 9:05 AM, Evo Eftimov evo.efti...@isecc.com
 wrote:

 You can also try Dynamic Resource Allocation




 https://spark.apache.org/docs/1.3.1/job-scheduling.html#dynamic-resource-allocation



 Also re the Feedback Loop for automatic message consumption rate
 adjustment – there is a “dumb” solution option – simply set the storage
 policy for the DStream RDDs to MEMORY AND DISK – when the 

RE: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-06-03 Thread Evo Eftimov
Makes sense especially if you have a cloud with “infinite” resources / nodes 
which allows you to double, triple etc in the background/parallel the resources 
of the currently running cluster 

 

I was thinking more about the scenario where you have e.g. 100 boxes and want 
to / can add e.g. 20 more 

 

From: Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com] 
Sent: Wednesday, June 3, 2015 4:46 PM
To: Evo Eftimov
Cc: Cody Koeninger; Andrew Or; Gerard Maas; spark users
Subject: Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of 
growth in Kafka or Spark's metrics?

 

Evo,

 

One of the ideas is to shadow the current cluster. This way there's no extra 
latency incurred due to shutting down of the consumers. If two sets of 
consumers are running, potentially processing the same data, that is OK. We 
phase out the older cluster and gradually flip over to the new one, insuring no 
downtime or extra latency.  Thoughts?

 

On Wed, Jun 3, 2015 at 11:27 AM, Evo Eftimov evo.efti...@isecc.com wrote:

You should monitor vital performance / job clogging stats of the Spark 
Streaming Runtime not “kafka topics”

 

You should be able to bring new worker nodes online and make them contact and 
register with the Master without bringing down the Master (or any of the 
currently running worker nodes) 

 

Then just shutdown your currently running spark streaming job/app and restart 
it with new params to take advantage of the larger cluster 

 

From: Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com] 
Sent: Wednesday, June 3, 2015 4:14 PM
To: Cody Koeninger
Cc: Andrew Or; Evo Eftimov; Gerard Maas; spark users
Subject: Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of 
growth in Kafka or Spark's metrics?

 

Would it be possible to implement Spark autoscaling somewhat along these lines? 
--

 

1. If we sense that a new machine is needed, by watching the data load in Kafka 
topic(s), then

2. Provision a new machine via a Provisioner interface (e.g. talk to AWS and 
get a machine);

3. Create a shadow/mirror Spark master running alongside the initial version 
which talks to N machines. The new mirror version is aware of N+1 machines (or 
N+M if we had decided we needed M new boxes).

4. The previous version of the Spark runtime is acquiesced/decommissioned.  We 
possibly get both clusters working on the same data which may actually be OK 
(at least for our specific use-cases).

5. Now the new Spark cluster is running.

 

Similarly, the decommissioning of M unused boxes would happen, via this notion 
of a mirror Spark runtime.  How feasible would it be for such a mirrorlike 
setup to be created, especially created programmatically?  Especially point #3.

 

The other idea we'd entertained was to bring in a new machine, acquiesce down 
all currently running workers by telling them to process their current batch 
then shut down, then restart the consumers now that Spark is aware of a 
modified cluster.  This has the drawback of a downtime that may not be 
tolerable in terms of latency, by the system's clients waiting for their 
responses in a synchronous fashion.

 

Thanks.

 

On Thu, May 28, 2015 at 5:15 PM, Cody Koeninger c...@koeninger.org wrote:

I'm not sure that points 1 and 2 really apply to the kafka direct stream.  
There are no receivers, and you know at the driver how big each of your batches 
is.

 

On Thu, May 28, 2015 at 2:21 PM, Andrew Or and...@databricks.com wrote:

Hi all,

 

As the author of the dynamic allocation feature I can offer a few insights here.

 

Gerard's explanation was both correct and concise: dynamic allocation is not 
intended to be used in Spark streaming at the moment (1.4 or before). This is 
because of two things:

 

(1) Number of receivers is necessarily fixed, and these are started in 
executors. Since we need a receiver for each InputDStream, if we kill these 
receivers we essentially stop the stream, which is not what we want. It makes 
little sense to close and restart a stream the same way we kill and relaunch 
executors.

 

(2) Records come in every batch, and when there is data to process your 
executors are not idle. If your idle timeout is less than the batch duration, 
then you'll end up having to constantly kill and restart executors. If your 
idle timeout is greater than the batch duration, then you'll never kill 
executors.

 

Long answer short, with Spark streaming there is currently no straightforward 
way to scale the size of your cluster. I had a long discussion with TD (Spark 
streaming lead) about what needs to be done to provide some semblance of 
dynamic scaling to streaming applications, e.g. take into account the batch 
queue instead. We came up with a few ideas that I will not detail here, but we 
are looking into this and do intend to support it in the near future.

 

-Andrew

 

 

 

2015-05-28 8:02 GMT-07:00 Evo Eftimov evo.efti...@isecc.com:

 

Probably you should ALWAYS keep the RDD storage policy to MEMORY 

Re: Spark 1.4.0 build Error on Windows

2015-06-03 Thread Ted Yu
I used the same command on Linux but didn't reproduce the error.

Can you include -X switch on your command line ?

Also consider upgrading maven to 3.3.x

Cheers

On Wed, Jun 3, 2015 at 2:36 AM, Daniel Emaasit daniel.emaa...@gmail.com
wrote:

 I run into errors while trying to build Spark from the 1.4 release
 branch: https://github.com/apache/spark/tree/branch-1.4. Any help will be
 much appreciated. Here is the log file from my windows 8.1 PC. (F.Y.I, I
 installed all the dependencies like Java 7, Maven 3.2.5 and set
 the environment variables)


 C:\Program Files\Apache Software Foundation\spark-branch-1.4mvn -Psparkr
 -Pyarn
  -Phadoop-2.4 -Dhadoop.version=2.4.0 -DskipTests clean package
 [INFO] Scanning for projects...
 [INFO]
 
 [INFO] Reactor Build Order:
 [INFO]
 [INFO] Spark Project Parent POM
 [INFO] Spark Launcher Project
 [INFO] Spark Project Networking
 [INFO] Spark Project Shuffle Streaming Service
 [INFO] Spark Project Unsafe
 [INFO] Spark Project Core
 [INFO] Spark Project Bagel
 [INFO] Spark Project GraphX
 [INFO] Spark Project Streaming
 [INFO] Spark Project Catalyst
 [INFO] Spark Project SQL
 [INFO] Spark Project ML Library
 [INFO] Spark Project Tools
 [INFO] Spark Project Hive
 [INFO] Spark Project REPL
 [INFO] Spark Project YARN
 [INFO] Spark Project Assembly
 [INFO] Spark Project External Twitter
 [INFO] Spark Project External Flume Sink
 [INFO] Spark Project External Flume
 [INFO] Spark Project External MQTT
 [INFO] Spark Project External ZeroMQ
 [INFO] Spark Project External Kafka
 [INFO] Spark Project Examples
 [INFO] Spark Project External Kafka Assembly
 [INFO] Spark Project YARN Shuffle Service
 [INFO]
 [INFO]
 
 [INFO] Building Spark Project Parent POM 1.4.0-SNAPSHOT
 [INFO]
 
 [INFO]
 [INFO] --- maven-clean-plugin:2.6.1:clean (default-clean) @ spark-parent_2.10
 --
 -
 [INFO]
 [INFO] --- maven-enforcer-plugin:1.4:enforce (enforce-versions) @ spark
 -parent_2
 .10 ---
 [INFO]
 [INFO] --- scala-maven-plugin:3.2.0:add-source (eclipse-add-source) @
 spark-pare
 nt_2.10 ---
 [INFO] Add Source directory: C:\Program Files\Apache Software Foundation\
 spark-b
 ranch-1.4\src\main\scala
 [INFO] Add Test Source directory: C:\Program Files\Apache Software
 Foundation\sp
 ark-branch-1.4\src\test\scala
 [INFO]
 [INFO] --- build-helper-maven-plugin:1.9.1:add-source (add-scala-sources)
 @ spar
 k-parent_2.10 ---
 [INFO] Source directory: C:\Program Files\Apache Software Foundation\spark
 -branc
 h-1.4\src\main\scala added.
 [INFO]
 [INFO] --- maven-remote-resources-plugin:1.5:process (default) @ spark
 -parent_2.
 10 ---
 [INFO]
 
 [INFO] Reactor Summary:
 [INFO]
 [INFO] Spark Project Parent POM ... FAILURE [
 23.528 s]
 [INFO] Spark Launcher Project . SKIPPED
 [INFO] Spark Project Networking ... SKIPPED
 [INFO] Spark Project Shuffle Streaming Service  SKIPPED
 [INFO] Spark Project Unsafe ... SKIPPED
 [INFO] Spark Project Core . SKIPPED
 [INFO] Spark Project Bagel  SKIPPED
 [INFO] Spark Project GraphX ... SKIPPED
 [INFO] Spark Project Streaming  SKIPPED
 [INFO] Spark Project Catalyst . SKIPPED
 [INFO] Spark Project SQL .. SKIPPED
 [INFO] Spark Project ML Library ... SKIPPED
 [INFO] Spark Project Tools  SKIPPED
 [INFO] Spark Project Hive . SKIPPED
 [INFO] Spark Project REPL . SKIPPED
 [INFO] Spark Project YARN . SKIPPED
 [INFO] Spark Project Assembly . SKIPPED
 [INFO] Spark Project External Twitter . SKIPPED
 [INFO] Spark Project External Flume Sink .. SKIPPED
 [INFO] Spark Project External Flume ... SKIPPED
 [INFO] Spark Project External MQTT  SKIPPED
 [INFO] Spark Project External ZeroMQ .. SKIPPED
 [INFO] Spark Project External Kafka ... SKIPPED
 [INFO] Spark Project Examples . SKIPPED
 [INFO] Spark Project External Kafka Assembly .. SKIPPED
 [INFO] Spark Project YARN Shuffle Service . SKIPPED
 [INFO]
 
 [INFO] BUILD FAILURE
 [INFO]
 
 [INFO] Total time: 24.680 s
 [INFO] Finished at: 2015-06-03T02:11:35-07:00
 [INFO] Final 

Re: Example Page Java Function2

2015-06-03 Thread Sean Owen
Yes, I think you're right. Since this is a change to the ASF hosted
site, I can make this change to the .md / .html directly rather than
go through the usual PR.

On Wed, Jun 3, 2015 at 6:23 PM, linkstar350 . tweicomepan...@gmail.com wrote:
 Hi, I'm Taira.

 I notice that this example page may be a mistake.

 https://spark.apache.org/examples.html

 
 Word Count (Java)

 JavaRDDString textFile = spark.textFile(hdfs://...);
 JavaRDDString words = textFile.flatMap(new FlatMapFunctionString, 
 String() {
   public IterableString call(String s) { return Arrays.asList(s.split( 
 )); }
 });
 JavaPairRDDString, Integer pairs = words.mapToPair(new
 PairFunctionString, String, Integer() {
   public Tuple2String, Integer call(String s) { return new
 Tuple2String, Integer(s, 1); }
 });
 JavaPairRDDString, Integer counts = pairs.reduceByKey(new
 Function2Integer, Integer() {
   public Integer call(Integer a, Integer b) { return a + b; }
 });
 counts.saveAsTextFile(hdfs://...);
 

 Function2 should have three generic type arguments, but there are only two.

 I hope for your consideration.

 Taira

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Example Page Java Function2

2015-06-03 Thread linkstar350 .
Hi, I'm Taira.

I notice that this example page may be a mistake.

https://spark.apache.org/examples.html


Word Count (Java)

JavaRDDString textFile = spark.textFile(hdfs://...);
JavaRDDString words = textFile.flatMap(new FlatMapFunctionString, String() {
  public IterableString call(String s) { return Arrays.asList(s.split( )); }
});
JavaPairRDDString, Integer pairs = words.mapToPair(new
PairFunctionString, String, Integer() {
  public Tuple2String, Integer call(String s) { return new
Tuple2String, Integer(s, 1); }
});
JavaPairRDDString, Integer counts = pairs.reduceByKey(new
Function2Integer, Integer() {
  public Integer call(Integer a, Integer b) { return a + b; }
});
counts.saveAsTextFile(hdfs://...);


Function2 should have three generic type arguments, but there are only two.

I hope for your consideration.

Taira

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



StreamingListener, anyone?

2015-06-03 Thread dgoldenberg
Hi,

I've got a Spark Streaming driver job implemented and in it, I register a
streaming listener, like so:

JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
   Durations.milliseconds(params.getBatchDurationMillis()));
jssc.addStreamingListener(new JobListener(jssc));

where JobListener is defined like so
private static class JobListener implements StreamingListener {

private JavaStreamingContext jssc;

JobListener(JavaStreamingContext jssc) {
this.jssc = jssc;
}

@Override
public void onBatchCompleted(StreamingListenerBatchCompleted
batchCompleted) {
System.out.println( Batch completed.);
jssc.stop(true);
System.out.println( The job has been stopped.);
}


I do not seem to be seeing onBatchCompleted being triggered.  Am I doing
something wrong?

In this particular case, I was trying to implement a bulk ingest type of
logic where the first batch is all we're interested in (reading out of a
Kafka topic with offset reset set to smallest).




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/StreamingListener-anyone-tp23140.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Broadcast variables can be rebroadcast?

2015-06-03 Thread NB
I am pasting some of the exchanges I had on this topic via the mailing list
directly so it may help someone else too. (Don't know why those responses
don't show up here).

---

Thanks Imran. It does help clarify. I believe I had it right all along then
but was confused by documentation talking about never changing the
broadcasted variables.

I've tried it on a local mode process till now and does seem to work as
intended. When (and if !) we start running on a real cluster, I hope this
holds up.

Thanks
NB


On Tue, May 19, 2015 at 6:25 AM, Imran Rashid iras...@cloudera.com wrote:

 hmm, I guess it depends on the way you look at it.  In a way, I'm saying
 that spark does *not* have any built in auto-re-broadcast if you try to
 mutate a broadcast variable.  Instead, you should create something new,
 and
 just broadcast it separately.  Then just have all the code you have
 operating on your RDDs look at the new broadcast variable.

 But I guess there is another way to look at it -- you are creating new
 broadcast variables each time, but they all point to the same underlying
 mutable data structure.  So in a way, you are rebroadcasting the same
 underlying data structure.

 Let me expand my example from earlier a little bit more:


 def oneIteration(myRDD: RDD[...], myBroadcastVar: Broadcast[...]): Unit =
 {
  ...
 }

 // this is a val, because the data structure itself is mutable
 val myMutableDataStructue = ...
 // this is a var, because you will create new broadcasts
 var myBroadcast = sc.broadcast(myMutableDataStructure)
 (0 to 20).foreach { iteration =
   oneIteration(myRDD, myBroadcast)
   // update your mutable data structure in place
   myMutableDataStructure.update(...)
   // ... but that doesn't effect the broadcast variables living out on the
 cluster, so we need to
   // create a new one

   // this line is not required -- the broadcast var will automatically get
 unpersisted when a gc
   // cleans up the old broadcast on the driver, but I'm including this
 here for completeness,
   // in case you want to more proactively clean up old blocks if you are
 low on space
   myBroadcast.unpersist()

   // now we create a new broadcast which has the updated data in our
 mutable data structure
   myBroadcast = sc.broadcast(myMutableDataStructure)
 }


 hope this clarifies things!

 Imran

 On Tue, May 19, 2015 at 3:06 AM, N B nb.nos...@gmail.com wrote:

 Hi Imran,

 If I understood you correctly, you are suggesting to simply call
 broadcast again from the driver program. This is exactly what I am hoping
 will work as I have the Broadcast data wrapped up and I am indeed
 (re)broadcasting the wrapper over again when the underlying data changes.
 However, documentation seems to suggest that one cannot re-broadcast. Is
 my
 understanding accurate?

 Thanks
 NB


 On Mon, May 18, 2015 at 6:24 PM, Imran Rashid iras...@cloudera.com
 wrote:

 Rather than updating the broadcast variable, can't you simply create a
 new one?  When the old one can be gc'ed in your program, it will also
 get
 gc'ed from spark's cache (and all executors).

 I think this will make your code *slightly* more complicated, as you
 need to add in another layer of indirection for which broadcast variable
 to
 use, but not too bad.  Eg., from

 var myBroadcast = sc.broadcast( ...)
 (0 to 20).foreach{ iteration =
   //  ... some rdd operations that involve myBroadcast ...
   myBroadcast.update(...) // wrong! dont' update a broadcast variable
 }

 instead do something like:

 def oneIteration(myRDD: RDD[...], myBroadcastVar: Broadcast[...]): Unit
 = {
  ...
 }

 var myBroadcast = sc.broadcast(...)
 (0 to 20).foreach { iteration =
   oneIteration(myRDD, myBroadcast)
   var myBroadcast = sc.broadcast(...) // create a NEW broadcast here,
 with whatever you need to update it
 }

 On Sat, May 16, 2015 at 2:01 AM, N B nb.nos...@gmail.com wrote:

 Thanks Ayan. Can we rebroadcast after updating in the driver?

 Thanks
 NB.


 On Fri, May 15, 2015 at 6:40 PM, ayan guha guha.a...@gmail.com wrote:

 Hi

 broadcast variables are shipped for the first time it is accessed in a
 transformation to the executors used by the transformation. It will
 NOT
 updated subsequently, even if the value has changed. However, a new
 value
 will be shipped to any new executor comes into play after the value
 has
 changed. This way, changing value of broadcast variable is not a good
 idea
 as it can create inconsistency within cluster. From documentatins:

  In addition, the object v should not be modified after it is
 broadcast in order to ensure that all nodes get the same value of the
 broadcast variable


 On Sat, May 16, 2015 at 10:39 AM, N B nb.nos...@gmail.com wrote:

 Thanks Ilya. Does one have to call broadcast again once the
 underlying data is updated in order to get the changes visible on all
 nodes?

 Thanks
 NB


 On Fri, May 15, 2015 at 5:29 PM, Ilya Ganelin ilgan...@gmail.com
 wrote:

 The broadcast variable is like a pointer. 

Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-06-03 Thread Dmitry Goldenberg
I think what we'd want to do is track the ingestion rate in the consumer(s)
via Spark's aggregation functions and such. If we're at a critical level
(load too high / load too low) then we issue a request into our
Provisioning Component to add/remove machines. Once it comes back with an
OK, each consumer can finish its current batch, then terminate itself,
and restart with a new context.  The new context would be aware of the
updated cluster - correct?  Therefore the refreshed consumer would restart
on the updated cluster.

Could we even terminate the consumer immediately upon sensing a critical
event?  When it would restart, could it resume right where it left off?

On Wed, Jun 3, 2015 at 11:49 AM, Evo Eftimov evo.efti...@isecc.com wrote:

 Makes sense especially if you have a cloud with “infinite” resources /
 nodes which allows you to double, triple etc in the background/parallel the
 resources of the currently running cluster



 I was thinking more about the scenario where you have e.g. 100 boxes and
 want to / can add e.g. 20 more



 *From:* Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com]
 *Sent:* Wednesday, June 3, 2015 4:46 PM
 *To:* Evo Eftimov
 *Cc:* Cody Koeninger; Andrew Or; Gerard Maas; spark users
 *Subject:* Re: FW: Re: Autoscaling Spark cluster based on topic
 sizes/rate of growth in Kafka or Spark's metrics?



 Evo,



 One of the ideas is to shadow the current cluster. This way there's no
 extra latency incurred due to shutting down of the consumers. If two sets
 of consumers are running, potentially processing the same data, that is OK.
 We phase out the older cluster and gradually flip over to the new one,
 insuring no downtime or extra latency.  Thoughts?



 On Wed, Jun 3, 2015 at 11:27 AM, Evo Eftimov evo.efti...@isecc.com
 wrote:

 You should monitor vital performance / job clogging stats of the Spark
 Streaming Runtime not “kafka topics”



 You should be able to bring new worker nodes online and make them contact
 and register with the Master without bringing down the Master (or any of
 the currently running worker nodes)



 Then just shutdown your currently running spark streaming job/app and
 restart it with new params to take advantage of the larger cluster



 *From:* Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com]
 *Sent:* Wednesday, June 3, 2015 4:14 PM
 *To:* Cody Koeninger
 *Cc:* Andrew Or; Evo Eftimov; Gerard Maas; spark users
 *Subject:* Re: FW: Re: Autoscaling Spark cluster based on topic
 sizes/rate of growth in Kafka or Spark's metrics?



 Would it be possible to implement Spark autoscaling somewhat along these
 lines? --



 1. If we sense that a new machine is needed, by watching the data load in
 Kafka topic(s), then

 2. Provision a new machine via a Provisioner interface (e.g. talk to AWS
 and get a machine);

 3. Create a shadow/mirror Spark master running alongside the initial
 version which talks to N machines. The new mirror version is aware of N+1
 machines (or N+M if we had decided we needed M new boxes).

 4. The previous version of the Spark runtime is
 acquiesced/decommissioned.  We possibly get both clusters working on the
 same data which may actually be OK (at least for our specific use-cases).

 5. Now the new Spark cluster is running.



 Similarly, the decommissioning of M unused boxes would happen, via this
 notion of a mirror Spark runtime.  How feasible would it be for such a
 mirrorlike setup to be created, especially created programmatically?
 Especially point #3.



 The other idea we'd entertained was to bring in a new machine, acquiesce
 down all currently running workers by telling them to process their current
 batch then shut down, then restart the consumers now that Spark is aware of
 a modified cluster.  This has the drawback of a downtime that may not be
 tolerable in terms of latency, by the system's clients waiting for their
 responses in a synchronous fashion.



 Thanks.



 On Thu, May 28, 2015 at 5:15 PM, Cody Koeninger c...@koeninger.org
 wrote:

 I'm not sure that points 1 and 2 really apply to the kafka direct stream.
 There are no receivers, and you know at the driver how big each of your
 batches is.



 On Thu, May 28, 2015 at 2:21 PM, Andrew Or and...@databricks.com wrote:

 Hi all,



 As the author of the dynamic allocation feature I can offer a few insights
 here.



 Gerard's explanation was both correct and concise: dynamic allocation is
 not intended to be used in Spark streaming at the moment (1.4 or before).
 This is because of two things:



 (1) Number of receivers is necessarily fixed, and these are started in
 executors. Since we need a receiver for each InputDStream, if we kill these
 receivers we essentially stop the stream, which is not what we want. It
 makes little sense to close and restart a stream the same way we kill and
 relaunch executors.



 (2) Records come in every batch, and when there is data to process your
 executors are not idle. If your idle timeout is less than the batch
 

Re: Behavior of the spark.streaming.kafka.maxRatePerPartition config param?

2015-06-03 Thread Cody Koeninger
The default of 0 means no limit.  Each batch will grab as much as is
available, ie a range of offsets spanning from the end of the previous
batch to the highest available offsets on the leader.

If you set spark.streaming.kafka.maxRatePerPartition  0, the number you
set is the maximum number of messages per partition per second.

If you have a reproducible case that behaves differently, please share it.





On Tue, Jun 2, 2015 at 5:28 PM, dgoldenberg dgoldenberg...@gmail.com
wrote:

 Hi,

 Could someone explain the behavior of the
 spark.streaming.kafka.maxRatePerPartition parameter? The doc says An
 important (configuration) is spark.streaming.kafka.maxRatePerPartition
 which
 is the maximum rate at which each Kafka partition will be read by (the)
 direct API.

 What is the default behavior for this parameter? From some testing it
 appears that with it not being set, the RDD size tends to be quite low.
 With
 it set, we're seeing the consumer picking up items off the topic quite more
 actively, e.g. -Dspark.streaming.kafka.maxRatePerPartition=1000 in
 --driver-java-options.

 Does this parameter set the RDD size to a very low value?

 seems to be defaulting to 0... but what's the effect of that?
   protected val maxMessagesPerPartition: Option[Long] = {
 val ratePerSec = context.sparkContext.getConf.getInt(
   spark.streaming.kafka.maxRatePerPartition, 0)
 if (ratePerSec  0) {
   val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble
 /
 1000
   Some((secsPerBatch * ratePerSec).toLong)
 } else {
   None
 }
   }
   // limits the maximum number of messages per partition
   protected def clamp(
 leaderOffsets: Map[TopicAndPartition, LeaderOffset]):
 Map[TopicAndPartition, LeaderOffset] = {
 maxMessagesPerPartition.map { mmp =
   leaderOffsets.map { case (tp, lo) =
 tp - lo.copy(offset = Math.min(currentOffsets(tp) + mmp,
 lo.offset))
   }
 }.getOrElse(leaderOffsets)
   }

 what would we limit by default?  And once Spark Streaming does pick up
 messages, would it be at the maximum value? does it ever fall below max
 even
 if there are max or more than max in the topic? Thanks.




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Behavior-of-the-spark-streaming-kafka-maxRatePerPartition-config-param-tp23117.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Adding new Spark workers on AWS EC2 - access error

2015-06-03 Thread barmaley
I have the existing operating Spark cluster that was launched with spark-ec2
script. I'm trying to add new slave by following the instructions:

Stop the cluster
On AWS console launch more like this on one of the slaves
Start the cluster
Although the new instance is added to the same security group and I can
successfully SSH to it with the same private key, spark-ec2 ... start call
can't access this machine for some reason:

Running setup-slave on all cluster nodes to mount filesystems, etc...
[1] 00:59:59 [FAILURE] ec2-52-25-53-64.us-west-2.compute.amazonaws.com
Exited with error code 255 Stderr: Permission denied (publickey).

, obviously, followed by tons of other errors while trying to deploy Spark
stuff on this instance.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Adding-new-Spark-workers-on-AWS-EC2-access-error-tp23143.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Make HTTP requests from within Spark

2015-06-03 Thread Pat McDonough
Try something like the following.

Create a function to make the HTTP call, e.g.
using org.apache.commons.httpclient.HttpClient as in below.

def getUrlAsString(url: String): String = {
  val client = new org.apache.http.impl.client.DefaultHttpClient()
  val request = new org.apache.http.client.methods.HttpGet(url)
  val response = client.execute(request)
  val handler = new org.apache.http.impl.client.BasicResponseHandler()
  handler.handleResponse(response).trim
}

Then build up your set of urls, and pass the, as a parameter to your HTTP
function. This is not the most basic example, but it includes some logic to
handle paging for a REST API and also control the number of concurrent
threads if there are fewer than number of CPUs.


val (max, batchSize, threads) = (1500, 200, 20)

val calls = sc.parallelize(
  (0 to max by batchSize).map(
page = shttps://some.url/jsonapi?_start=${page}_limit=${batchSize};)
  ,threads)

if (debug) {
  def partMapper(index: Int, iter: Iterator[String]) : Iterator[ Map[
String, Any ] ] = {
iter.toList.map(callString = Map(thread - index, call -
callString)).iterator
  }
  calls.mapPartitionsWithIndex(partMapper).collect.foreach(println)
}

val callRDD = calls.map(getUrlAsString(_)))

val yourDataFrame = jsonRDD(callRDD)




On Wed, Jun 3, 2015 at 7:25 PM, William Briggs wrbri...@gmail.com wrote:

 Hi Kaspar,

 This is definitely doable, but in my opinion, it's important to remember
 that, at its core, Spark is based around a functional programming paradigm
 - you're taking input sets of data and, by applying various
 transformations, you end up with a dataset that represents your answer.
 Without knowing more about your use case, and keeping in mind that I'm very
 new to Spark, here are a few things I would want to think about if I were
 writing this as a non-Streaming Spark application:

1. What is your starting dataset? Do you have an initial set of
parameters or a data source that is used to define each of the millions of
requests? If so, then that should comprise your first RDD and you can
perform subsequent transformations to prepare your HTTP requests (e.g.,
start with the information that drives the generation of the requests, and
use map/flatMap to create an RDD that has the full list of requests you
want to run).
2. Are the HTTP requests read-only, and/or idempotent (are you only
looking up data, or are you performing requests that cause some sort of
side effect)? Spark operations against RDDs work by defining a lineage
graph, and transformations will be re-run if a partition in the lineage
needs to be recalculated for any reason. If your HTTP requests are causing
side-effects that should not be repeated, then Spark may not be the best
fit for that portion of the job, and you might want to use something else,
pipe the results into HDFS, and then analyze those using Spark..
3. If your web service requests are lookups or are idempotent, then
we're on the right track. Keep in mind that your web service probably will
not scale as well as the Spark job - a naive first-pass implementation
could easily overwhelm many services, particularly if/when partitions need
to be recalculated. There are a few mechanisms you can use to mitigate this
- one is to use mapPartitions rather than map when transforming the set of
requests to the set of results, initialize an HTTP connection for each
partition, and transform the data that defines the request into your
desired dataset by invoking the web service. Using mapPartitions allows you
to limit the number of concurrent HTTP connections to one per partition
(although this may be too slow if your service is slow... there is
obviously a bit of analysis, testing and profiling that would need to be
done on the entire job). Another consideration would be to look at
persisting or caching the intermediate results after you've successfully
retrieved your results from the service, to reduce the likelihood of
hitting the web service more than necessary.
4. Just realized you might be looking for help invoking an HTTP
service programmatically from Scala / Spark - if so, you might want to look
at the spray-client http://spray.io/documentation/1.2.3/spray-client/
library.
5. With millions of web service requests, it's highly likely some will
fail, for a variety of reasons. Look into using Scala's Try
http://www.scala-lang.org/api/2.11.5/index.html#scala.util.Try or
Either
http://www.scala-lang.org/api/2.11.5/index.html#scala.util.Either monads
to encode success / failure, and treat failed requests as first-class
citizens in your RDD of results (by retrying them, filtering them, logging
them, etc., based on your specific needs and use case). Make sure you are
setting reasonable timeouts on your service calls to prevent the jSpark ob
from getting stuck if the service 

Spark 1.4 HiveContext fails to initialise with native libs

2015-06-03 Thread Night Wolf
Hi all,

Trying out Spark 1.4 RC4 on MapR4/Hadoop 2.5.1 running in yarn-client mode with
Hive support.

*Build command;*
./make-distribution.sh --name mapr4.0.2_yarn_j6_2.10 --tgz -Pyarn -Pmapr4
-Phadoop-2.4 -Pmapr4 -Phive -Phadoop-provided
-Dhadoop.version=2.5.1-mapr-1501 -Dyarn.version=2.5.1-mapr-1501 -DskipTests
-e -X


When trying to run a hive query in the spark shell *sqlContext.sql(show
tables)* I get the following exception;

scala sqlContext.sql(show tables)
15/06/04 04:33:16 INFO hive.HiveContext: Initializing
HiveMetastoreConnection version 0.13.1 using Spark classes.
java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.mapr.fs.ShimLoader.loadNativeLibrary(ShimLoader.java:323)
at com.mapr.fs.ShimLoader.load(ShimLoader.java:198)
at
org.apache.hadoop.conf.CoreDefaultProperties.clinit(CoreDefaultProperties.java:59)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:274)
at
org.apache.hadoop.conf.Configuration.getClassByNameOrNull(Configuration.java:1857)
at
org.apache.hadoop.conf.Configuration.getProperties(Configuration.java:2072)
at
org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2282)
at
org.apache.hadoop.conf.Configuration.loadResources(Configuration.java:2234)
at org.apache.hadoop.conf.Configuration.getProps(Configuration.java:2151)
at org.apache.hadoop.conf.Configuration.set(Configuration.java:1002)
at org.apache.hadoop.conf.Configuration.set(Configuration.java:974)
at org.apache.hadoop.mapred.JobConf.setJar(JobConf.java:518)
at org.apache.hadoop.mapred.JobConf.setJarByClass(JobConf.java:536)
at org.apache.hadoop.mapred.JobConf.init(JobConf.java:430)
at org.apache.hadoop.hive.conf.HiveConf.initialize(HiveConf.java:1366)
at org.apache.hadoop.hive.conf.HiveConf.init(HiveConf.java:1332)
at
org.apache.spark.sql.hive.client.ClientWrapper.init(ClientWrapper.scala:99)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at
org.apache.spark.sql.hive.client.IsolatedClientLoader.liftedTree1$1(IsolatedClientLoader.scala:170)
at
org.apache.spark.sql.hive.client.IsolatedClientLoader.init(IsolatedClientLoader.scala:166)
at
org.apache.spark.sql.hive.HiveContext.metadataHive$lzycompute(HiveContext.scala:212)
at org.apache.spark.sql.hive.HiveContext.metadataHive(HiveContext.scala:175)
at
org.apache.spark.sql.hive.HiveContext$$anon$2.init(HiveContext.scala:370)
at
org.apache.spark.sql.hive.HiveContext.catalog$lzycompute(HiveContext.scala:370)
at org.apache.spark.sql.hive.HiveContext.catalog(HiveContext.scala:369)
at
org.apache.spark.sql.hive.HiveContext$$anon$1.init(HiveContext.scala:382)
at
org.apache.spark.sql.hive.HiveContext.analyzer$lzycompute(HiveContext.scala:382)
at org.apache.spark.sql.hive.HiveContext.analyzer(HiveContext.scala:381)
at
org.apache.spark.sql.SQLContext$QueryExecution.assertAnalyzed(SQLContext.scala:901)
at org.apache.spark.sql.DataFrame.init(DataFrame.scala:131)
at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:51)
at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:725)
at
$line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:21)
at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:26)
at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:28)
at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:30)
at $line37.$read$$iwC$$iwC$$iwC$$iwC.init(console:32)
at $line37.$read$$iwC$$iwC$$iwC.init(console:34)
at $line37.$read$$iwC$$iwC.init(console:36)
at $line37.$read$$iwC.init(console:38)
at $line37.$read.init(console:40)
at $line37.$read$.init(console:44)
at $line37.$read$.clinit(console)
at $line37.$eval$.init(console:7)
at $line37.$eval$.clinit(console)
at $line37.$eval.$print(console)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
at
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338)
at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
at

Re: ALS Rating Object

2015-06-03 Thread Yasemin Kaya
Hi Joseph,

I think about converting IDS but there will be birthday problem. The
probability of a Hash Collision
http://preshing.com/20110504/hash-collision-probabilities/ is important
for me because of the user number. I don't know how can I modify ALS to use
Integer.

yasemin


2015-06-04 2:28 GMT+03:00 Joseph Bradley jos...@databricks.com:

 Hi Yasemin,

 If you can convert your user IDs to Integers in pre-processing (if you
 have  a couple billion users), that would work.  Otherwise...
 In Spark 1.3: You may need to modify ALS to use Long instead of Int.
 In Spark 1.4: spark.ml.recommendation.ALS (in the Pipeline API) exposes
 ALS.train as a DeveloperApi to allow users to use Long instead of Int.
 We're also thinking about better ways to permit Long IDs.

 Joseph

 On Wed, Jun 3, 2015 at 5:04 AM, Yasemin Kaya godo...@gmail.com wrote:

 Hi,

 I want to use Spark's ALS in my project. I have the userid
 like 30011397223227125563254 and Rating Object which is the Object of ALS
 wants Integer as a userid so the id field does not fit into a 32 bit
 Integer. How can I solve that ? Thanks.

 Best,
 yasemin
 --
 hiç ender hiç





-- 
hiç ender hiç


Re: Example Page Java Function2

2015-06-03 Thread linkstar350 .
Thank you. I confirmed the page.

2015-06-04 1:35 GMT+09:00 Sean Owen so...@cloudera.com:
 Yes, I think you're right. Since this is a change to the ASF hosted
 site, I can make this change to the .md / .html directly rather than
 go through the usual PR.

 On Wed, Jun 3, 2015 at 6:23 PM, linkstar350 . tweicomepan...@gmail.com 
 wrote:
 Hi, I'm Taira.

 I notice that this example page may be a mistake.

 https://spark.apache.org/examples.html

 
 Word Count (Java)

 JavaRDDString textFile = spark.textFile(hdfs://...);
 JavaRDDString words = textFile.flatMap(new FlatMapFunctionString, 
 String() {
   public IterableString call(String s) { return Arrays.asList(s.split( 
 )); }
 });
 JavaPairRDDString, Integer pairs = words.mapToPair(new
 PairFunctionString, String, Integer() {
   public Tuple2String, Integer call(String s) { return new
 Tuple2String, Integer(s, 1); }
 });
 JavaPairRDDString, Integer counts = pairs.reduceByKey(new
 Function2Integer, Integer() {
   public Integer call(Integer a, Integer b) { return a + b; }
 });
 counts.saveAsTextFile(hdfs://...);
 

 Function2 should have three generic type arguments, but there are only two.

 I hope for your consideration.

 Taira

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Standard Scaler taking 1.5hrs

2015-06-03 Thread Piero Cinquegrana
The fit part is very slow, transform not at all.

The number of partitions was 210 vs number of executors 80.

Spark 1.4 sounds great but as my company is using Qubole we are dependent upon 
them to upgrade from version 1.3.1. Until that happens, can you think of any 
other reasons as to why it could be slow. Sparse vectors? Excessive number of 
columns?

Sent from my mobile device. Please excuse any typos.

On Jun 3, 2015, at 9:53 PM, DB Tsai 
dbt...@dbtsai.commailto:dbt...@dbtsai.com wrote:

Which part of StandardScaler is slow? Fit or transform? Fit has shuffle but 
very small, and transform doesn't do shuffle. I guess you don't have enough 
partition, so please repartition your input dataset to a number at least larger 
than the # of executors you have.

In Spark 1.4's new ML pipeline api, we have Linear Regression with elastic net, 
and in that version, we use quasi newton for optimization, so it will be a way 
faster than SGD implementation. Also, in that implementation, StandardScaler is 
not required since in computing the loss function, we implicitly do this for 
you.

https://github.com/apache/spark/commit/6a827d5d1ec520f129e42c3818fe7d0d870dcbef

Please try this out and give us feedback. Thanks.

On Wednesday, June 3, 2015, Piero Cinquegrana 
pcinquegr...@marketshare.commailto:pcinquegr...@marketshare.com wrote:
Hello User group,

I have a RDD of LabeledPoint composed of sparse vectors like showing below. In 
the next step, I am standardizing the columns with the Standard Scaler. The 
data has 2450 columns and ~110M rows. It took 1.5hrs to complete the 
standardization with 10 nodes and 80 executors. The spark.executor.memory was 
set to 2g and the driver memory to 5g.

scala val parsedData = stack_sorted.mapPartitions( partition =
partition.map{row = 
LabeledPoint(row._2._1.getDouble(4), sparseVectorCat(row._2, CategoriesIdx, 
InteractionIds, tupleMap, vecLength))
 }, 
preservesPartitioning=true).cache()

CategoriesIdx: Array[Int] = Array(3, 8, 12)
InteractionIds: Array[(Int, Int)] = Array((13,12))
vecLength: Int = 2450
parsedData: 
org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] = 
MapPartitionsRDD[93] at mapPartitions at console:111
(1.0,(2450,[1322,1361,2430],[1.0,1.0,1.0]))
(0.0,(2450,[1322,1781,2430],[1.0,1.0,1.0]))
(2.0,(2450,[1322,2193,2430],[1.0,1.0,1.0]))
(1.0,(2450,[297,1322,2430],[1.0,1.0,1.0]))
(0.0,(2450,[898,1322,2430],[1.0,1.0,1.0]))


My suspicious is that because the data is partitioned using a custom 
partitioner the Standard Scaler is causing a major shuffle operation. Any 
suggestion on how to improve the performance this step and a 
LinearRegressionWithSGD() which is also taking a very long time?

scala parsedData.partitioner
res72: Option[org.apache.spark.Partitioner] = 
Some(org.apache.spark.HashPartitioner@d2)

scala val scaler = new StandardScaler(withMean = false, withStd = 
true).fit(parsedData.map( row =  row.features))
scala val scaledData = parsedData.mapPartitions(partition = partition.map{row 
= LabeledPoint(row.label, scaler.transform(row.features))}).cache()

scala val numIterations = 100
scala val stepSize = 0.1
scala val miniBatchFraction = 0.1
scala val algorithm = new LinearRegressionWithSGD()

scala algorithm.setIntercept(false)
scala algorithm.optimizer.setNumIterations(numIterations)
scala algorithm.optimizer.setStepSize(stepSize)
scala algorithm.optimizer.setMiniBatchFraction(miniBatchFraction)

scala val model = algorithm.run(scaledData)

Best,

Piero Cinquegrana
Marketing Scientist | MarketShare
11150 Santa Monica Blvd, 5th Floor, Los Angeles, CA 90025
P: 310.914.5677 x242tel:310.914.5677%20x242 M: 323.377.9197tel:323.377.9197
www.marketshare.comhttp://www.marketsharepartners.com/
twitter.com/marketsharephttp://twitter.com/marketsharep



TreeReduce Functionality in Spark

2015-06-03 Thread raggy
I am trying to understand what the treeReduce function for an RDD does, and
how it is different from the normal reduce function. My current
understanding is that treeReduce tries to split up the reduce into multiple
steps. We do a partial reduce on different nodes, and then a final reduce is
done to get the final result. Is this correct? If so, I guess what I am
curious about is, how does spark decide how many nodes will be on each
level, and how many partitions will be sent to a given node? 

The bulk of the implementation is within this function:

partiallyReduced.treeAggregate(Option.empty[T])(op, op, depth)
  .getOrElse(throw new UnsupportedOperationException(empty
collection))

The above function is expanded to

val cleanSeqOp = context.clean(seqOp)
  val cleanCombOp = context.clean(combOp)
  val aggregatePartition =
(it: Iterator[T]) = it.aggregate(zeroValue)(cleanSeqOp,
cleanCombOp)
  var partiallyAggregated = mapPartitions(it =
Iterator(aggregatePartition(it)))
  var numPartitions = partiallyAggregated.partitions.length
  val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 /
depth)).toInt, 2)
  // If creating an extra level doesn't help reduce
  // the wall-clock time, we stop tree aggregation.
  while (numPartitions  scale + numPartitions / scale) {
numPartitions /= scale
val curNumPartitions = numPartitions
partiallyAggregated = partiallyAggregated.mapPartitionsWithIndex {
  (i, iter) = iter.map((i % curNumPartitions, _))
}.reduceByKey(new HashPartitioner(curNumPartitions),
cleanCombOp).values
  }
  partiallyAggregated.reduce(cleanCombOp)

I am completely lost about what is happening in this function. I would
greatly appreciate some sort of explanation. 




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/TreeReduce-Functionality-in-Spark-tp23147.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Standard Scaler taking 1.5hrs

2015-06-03 Thread DB Tsai
Which part of StandardScaler is slow? Fit or transform? Fit has shuffle but
very small, and transform doesn't do shuffle. I guess you don't have enough
partition, so please repartition your input dataset to a number at least
larger than the # of executors you have.

In Spark 1.4's new ML pipeline api, we have Linear Regression with elastic
net, and in that version, we use quasi newton for optimization, so it will
be a way faster than SGD implementation. Also, in that
implementation, StandardScaler is not required since in computing the loss
function, we implicitly do this for you.

https://github.com/apache/spark/commit/6a827d5d1ec520f129e42c3818fe7d0d870dcbef

Please try this out and give us feedback. Thanks.

On Wednesday, June 3, 2015, Piero Cinquegrana pcinquegr...@marketshare.com
wrote:

  Hello User group,



 I have a RDD of LabeledPoint composed of sparse vectors like showing
 below. In the next step, I am standardizing the columns with the Standard
 Scaler. The data has 2450 columns and ~110M rows. It took 1.5hrs to
 complete the standardization with 10 nodes and 80 executors. The
 spark.executor.memory was set to 2g and the driver memory to 5g.



 scala val parsedData = stack_sorted.mapPartitions( partition =

 partition.map{row
 = LabeledPoint(row._2._1.getDouble(4), sparseVectorCat(row._2,
 CategoriesIdx, InteractionIds, tupleMap, vecLength))

  },
 preservesPartitioning=true).cache()


 CategoriesIdx: Array[Int] = Array(3, 8, 12)

 InteractionIds: Array[(Int, Int)] = Array((13,12))

 vecLength: Int = 2450

 parsedData:
 org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] =
 MapPartitionsRDD[93] at mapPartitions at console:111

 (1.0,(2450,[1322,1361,2430],[1.0,1.0,1.0]))

 (0.0,(2450,[1322,1781,2430],[1.0,1.0,1.0]))

 (2.0,(2450,[1322,2193,2430],[1.0,1.0,1.0]))

 (1.0,(2450,[297,1322,2430],[1.0,1.0,1.0]))

 (0.0,(2450,[898,1322,2430],[1.0,1.0,1.0]))





 My suspicious is that because the data is partitioned using a custom
 partitioner the Standard Scaler is causing a major shuffle operation. Any
 suggestion on how to improve the performance this step and a
 LinearRegressionWithSGD() which is also taking a very long time?



 scala parsedData.partitioner

 res72: Option[org.apache.spark.Partitioner] = Some(
 org.apache.spark.HashPartitioner@d2)



 scala val scaler = new StandardScaler(withMean = false, withStd =
 true).fit(parsedData.map( row =  row.features))

 scala val scaledData = parsedData.mapPartitions(partition =
 partition.map{row = LabeledPoint(row.label,
 scaler.transform(row.features))}).cache()



 scala val numIterations = 100

 scala val stepSize = 0.1

 scala val miniBatchFraction = 0.1

 scala val algorithm = new LinearRegressionWithSGD()



 scala algorithm.setIntercept(false)

 scala algorithm.optimizer.setNumIterations(numIterations)

 scala algorithm.optimizer.setStepSize(stepSize)

 scala algorithm.optimizer.setMiniBatchFraction(miniBatchFraction)



 scala val model = algorithm.run(scaledData)



 Best,



 Piero Cinquegrana

 Marketing Scientist | MarketShare
 11150 Santa Monica Blvd, 5th Floor, Los Angeles, CA 90025
 P: 310.914.5677 x242 M: 323.377.9197
 www.marketshare.com http://www.marketsharepartners.com/
 twitter.com/marketsharep





Re: Make HTTP requests from within Spark

2015-06-03 Thread William Briggs
Hi Kaspar,

This is definitely doable, but in my opinion, it's important to remember
that, at its core, Spark is based around a functional programming paradigm
- you're taking input sets of data and, by applying various
transformations, you end up with a dataset that represents your answer.
Without knowing more about your use case, and keeping in mind that I'm very
new to Spark, here are a few things I would want to think about if I were
writing this as a non-Streaming Spark application:

   1. What is your starting dataset? Do you have an initial set of
   parameters or a data source that is used to define each of the millions of
   requests? If so, then that should comprise your first RDD and you can
   perform subsequent transformations to prepare your HTTP requests (e.g.,
   start with the information that drives the generation of the requests, and
   use map/flatMap to create an RDD that has the full list of requests you
   want to run).
   2. Are the HTTP requests read-only, and/or idempotent (are you only
   looking up data, or are you performing requests that cause some sort of
   side effect)? Spark operations against RDDs work by defining a lineage
   graph, and transformations will be re-run if a partition in the lineage
   needs to be recalculated for any reason. If your HTTP requests are causing
   side-effects that should not be repeated, then Spark may not be the best
   fit for that portion of the job, and you might want to use something else,
   pipe the results into HDFS, and then analyze those using Spark..
   3. If your web service requests are lookups or are idempotent, then
   we're on the right track. Keep in mind that your web service probably will
   not scale as well as the Spark job - a naive first-pass implementation
   could easily overwhelm many services, particularly if/when partitions need
   to be recalculated. There are a few mechanisms you can use to mitigate this
   - one is to use mapPartitions rather than map when transforming the set of
   requests to the set of results, initialize an HTTP connection for each
   partition, and transform the data that defines the request into your
   desired dataset by invoking the web service. Using mapPartitions allows you
   to limit the number of concurrent HTTP connections to one per partition
   (although this may be too slow if your service is slow... there is
   obviously a bit of analysis, testing and profiling that would need to be
   done on the entire job). Another consideration would be to look at
   persisting or caching the intermediate results after you've successfully
   retrieved your results from the service, to reduce the likelihood of
   hitting the web service more than necessary.
   4. Just realized you might be looking for help invoking an HTTP service
   programmatically from Scala / Spark - if so, you might want to look at the
   spray-client http://spray.io/documentation/1.2.3/spray-client/ library.
   5. With millions of web service requests, it's highly likely some will
   fail, for a variety of reasons. Look into using Scala's Try
   http://www.scala-lang.org/api/2.11.5/index.html#scala.util.Try or
   Either
   http://www.scala-lang.org/api/2.11.5/index.html#scala.util.Either monads
   to encode success / failure, and treat failed requests as first-class
   citizens in your RDD of results (by retrying them, filtering them, logging
   them, etc., based on your specific needs and use case). Make sure you are
   setting reasonable timeouts on your service calls to prevent the jSpark ob
   from getting stuck if the service turns into a black hole.

As I said above, I'm pretty new to Spark, so others may have some better
advice, or even tell you to ignore mine completely (no hard feelings, I
promise - this is all very new to me).

Good luck!

Regards,
Will

On Wed, Jun 3, 2015 at 3:49 AM, kasparfischer kaspar.fisc...@dreizak.com
wrote:

 Hi everybody,

 I'm new to Spark, apologies if my question is very basic.

 I have a need to send millions of requests to a web service and analyse and
 store the responses in an RDD. I can easy express the analysing part using
 Spark's filter/map/etc. primitives but I don't know how to make the
 requests. Is that something I can do from within Spark? Or Spark Streaming?
 Or does it conflict with the way Spark works?

 I've found a similar question but am not sure whether the answer applies
 here:



 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-Spark-Streaming-from-an-HTTP-api-tp12330.html

 Any clarifications or pointers would be super helpful!

 Thanks,
 Kaspar



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Make-HTTP-requests-from-within-Spark-tp23129.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: 

Re: Spark Cluster Benchmarking Frameworks

2015-06-03 Thread Zhen Jia
Hi Jonathan,
Maybe you can try BigDataBench.  http://prof.ict.ac.cn/BigDataBench/
http://prof.ict.ac.cn/BigDataBench/  . It provides lots of workloads,
including both Hadoop and Spark based workloads.

Zhen Jia

hodgesz wrote
 Hi Spark Experts,
 
 I am curious what people are using to benchmark their Spark clusters.  We
 are about to start a build (bare metal) vs buy (AWS/Google Cloud/Qubole)
 project to determine our Hadoop and Spark deployment selection.  On the
 Hadoop side we will test live workloads as well as simulated ones with
 frameworks like TestDFSIO, TeraSort, MRBench, GridMix, etc.
 
 Do any equivalent benchmarking frameworks exist for Spark?  A quick Google
 search yielded https://github.com/databricks/spark-perf which looks pretty
 interesting.  It would be great to hear what others are doing here.
 
 Thanks for the help!
 
 Jonathan





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Cluster-Benchmarking-Frameworks-tp12699p23146.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Standard Scaler taking 1.5hrs

2015-06-03 Thread DB Tsai
Can you do count() before fit to force materialize the RDD? I think
something before fit is slow.

On Wednesday, June 3, 2015, Piero Cinquegrana pcinquegr...@marketshare.com
wrote:

  The fit part is very slow, transform not at all.

  The number of partitions was 210 vs number of executors 80.

  Spark 1.4 sounds great but as my company is using Qubole we are
 dependent upon them to upgrade from version 1.3.1. Until that happens, can
 you think of any other reasons as to why it could be slow. Sparse vectors?
 Excessive number of columns?

 Sent from my mobile device. Please excuse any typos.

 On Jun 3, 2015, at 9:53 PM, DB Tsai dbt...@dbtsai.com
 javascript:_e(%7B%7D,'cvml','dbt...@dbtsai.com'); wrote:

   Which part of StandardScaler is slow? Fit or transform? Fit has shuffle
 but very small, and transform doesn't do shuffle. I guess you don't have
 enough partition, so please repartition your input dataset to a number at
 least larger than the # of executors you have.

  In Spark 1.4's new ML pipeline api, we have Linear Regression with
 elastic net, and in that version, we use quasi newton for optimization, so
 it will be a way faster than SGD implementation. Also, in that
 implementation, StandardScaler is not required since in computing the loss
 function, we implicitly do this for you.


 https://github.com/apache/spark/commit/6a827d5d1ec520f129e42c3818fe7d0d870dcbef

  Please try this out and give us feedback. Thanks.

 On Wednesday, June 3, 2015, Piero Cinquegrana 
 pcinquegr...@marketshare.com
 javascript:_e(%7B%7D,'cvml','pcinquegr...@marketshare.com'); wrote:

  Hello User group,



 I have a RDD of LabeledPoint composed of sparse vectors like showing
 below. In the next step, I am standardizing the columns with the Standard
 Scaler. The data has 2450 columns and ~110M rows. It took 1.5hrs to
 complete the standardization with 10 nodes and 80 executors. The
 spark.executor.memory was set to 2g and the driver memory to 5g.



 scala val parsedData = stack_sorted.mapPartitions( partition =

 partition.map{row
 = LabeledPoint(row._2._1.getDouble(4), sparseVectorCat(row._2,
 CategoriesIdx, InteractionIds, tupleMap, vecLength))

  },
 preservesPartitioning=true).cache()


 CategoriesIdx: Array[Int] = Array(3, 8, 12)

 InteractionIds: Array[(Int, Int)] = Array((13,12))

 vecLength: Int = 2450

 parsedData:
 org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] =
 MapPartitionsRDD[93] at mapPartitions at console:111

 (1.0,(2450,[1322,1361,2430],[1.0,1.0,1.0]))

 (0.0,(2450,[1322,1781,2430],[1.0,1.0,1.0]))

 (2.0,(2450,[1322,2193,2430],[1.0,1.0,1.0]))

 (1.0,(2450,[297,1322,2430],[1.0,1.0,1.0]))

 (0.0,(2450,[898,1322,2430],[1.0,1.0,1.0]))





 My suspicious is that because the data is partitioned using a custom
 partitioner the Standard Scaler is causing a major shuffle operation. Any
 suggestion on how to improve the performance this step and a
 LinearRegressionWithSGD() which is also taking a very long time?



 scala parsedData.partitioner

 res72: Option[org.apache.spark.Partitioner] = Some(
 org.apache.spark.HashPartitioner@d2)



 scala val scaler = new StandardScaler(withMean = false, withStd =
 true).fit(parsedData.map( row =  row.features))

 scala val scaledData = parsedData.mapPartitions(partition =
 partition.map{row = LabeledPoint(row.label,
 scaler.transform(row.features))}).cache()



 scala val numIterations = 100

 scala val stepSize = 0.1

 scala val miniBatchFraction = 0.1

 scala val algorithm = new LinearRegressionWithSGD()



 scala algorithm.setIntercept(false)

 scala algorithm.optimizer.setNumIterations(numIterations)

 scala algorithm.optimizer.setStepSize(stepSize)

 scala algorithm.optimizer.setMiniBatchFraction(miniBatchFraction)



 scala val model = algorithm.run(scaledData)



 Best,



 Piero Cinquegrana

 Marketing Scientist | MarketShare
 11150 Santa Monica Blvd, 5th Floor, Los Angeles, CA 90025
 P: 310.914.5677 x242 M: 323.377.9197
 www.marketshare.com http://www.marketsharepartners.com/
 twitter.com/marketsharep





-- 
- DB

Sent from my iPhone


Re: Spark Client

2015-06-03 Thread pavan kumar Kolamuri
Thanks Akhil, Richard, Oleg for your quick response .

@Oleg we have actually tried the same thing but unfortunately when we throw
exception Akka framework is catching all exceptions and thinking job failed
and rerunning the spark jobs infinitely.  Since in OneForOneStrategy in
akka ,  max no of retries is set to infinite , is there any way to
configure this value ? Or else is there any other way to solve this problem
?


If we don't throw exception in checkExit()  JVM will exit right ?  Is there
a way to stop JVM exit ?

On Wed, Jun 3, 2015 at 9:01 PM, Oleg Zhurakousky oleg.zhurakou...@gmail.com
 wrote:

 I am not sure why Spark is relying on System.exit, hopefully someone will
 be able to provide a technical justification for it (very curious to hear
 it), but for your use case you can easily trap System.exit call before JVM
 exit with a simple implementation of SecurityManager and try/catch.
 Here are more details (extracted from some of the code I am using to deal
 with the same problem in Hadoop processes, so it's java but you'll get the
 point)

 1. Create a simple implementation of Security Manager:

 public class SystemExitDisallowingSecurityManager extends SecurityManager {
 @Override
 public void checkPermission(Permission perm) {
   // allow everything
 }

 @Override
 public void checkPermission(Permission perm, Object context) {
   // allow everything
 }

 @Override
 public void checkExit(int status) {
   throw new SystemExitException();
 }
 }

 2. Create SystemExitExcepion

 public class SystemExitException extends RuntimeException {
   public SystemExitException() { }
 }

 3. When instantiating your workflow engine register the aforementioned
 SecurityManager (e.g., in the constructor)

 System.setSecurityManager(new SystemExitDisallowingSecurityManager
 ());

 3.1. In your workflow engine invoke the process in try/catch block

 try {
  // invoke the spark-submit process
 }
 catch (SystemExitException e) {
  // log some message but allow to continue
 }

 When Spark triggers System.exit the SecurityManager will trigger
 SystemExitException which you simply ignore. Or you can even avoid
 triggering SystemExitException all together essentially ignoring all the
 calls to System.exit.

 Cheers
 Oleg


 On Wed, Jun 3, 2015 at 10:55 AM, Richard Marscher 
 rmarsc...@localytics.com wrote:

 I think the short answer to the question is, no, there is no alternate
 API that will not use the System.exit calls. You can craft a workaround
 like is being suggested in this thread. For comparison, we are doing
 programmatic submission of applications in a long-running client
 application. To get around these issues we make a shadowed version of some
 of the Spark code in our application to remove the System.exit calls so
 instead exceptions bubble up to our application.

 On Wed, Jun 3, 2015 at 7:19 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Did you try this?

 Create an sbt project like:

  // Create your context
  val sconf = new
 SparkConf().setAppName(Sigmoid).setMaster(spark://sigmoid:7077)
  val sc = new SparkContext(sconf)

  // Do some computations
  sc.parallelize(1 to 1).take(10).foreach(println)

  //Now return the exit status
  System.exit(Some number)

  Now, make your workflow manager to trigger *sbt run* on the project
 instead of using spark-submit.



 Thanks
 Best Regards

 On Wed, Jun 3, 2015 at 2:18 PM, pavan kumar Kolamuri 
 pavan.kolam...@gmail.com wrote:

 Hi akhil , sorry i may not conveying the question properly .  Actually
 we are looking to Launch a spark job from a long running workflow manager,
 which invokes spark client via SparkSubmit. Unfortunately the client upon
 successful completion of the application exits with a System.exit(0) or
 System.exit(NON_ZERO) when there is a failure. Question is, Is there an
 alternate  api though which a spark application can be launched which can
 return a exit status back to the caller as opposed to initiating JVM halt.

 On Wed, Jun 3, 2015 at 12:58 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Run it as a standalone application. Create an sbt project and do sbt
 run?

 Thanks
 Best Regards

 On Wed, Jun 3, 2015 at 11:36 AM, pavan kumar Kolamuri 
 pavan.kolam...@gmail.com wrote:

 Hi guys , i am new to spark . I am using sparksubmit to submit spark
 jobs. But for my use case i don't want it to be exit with System.exit . 
 Is
 there any other spark client which is api friendly other than SparkSubmit
 which shouldn't exit with system.exit. Please correct me if i am missing
 something.

 Thanks in advance




 --
 Regards
 Pavan Kumar Kolamuri





 --
 Regards
 Pavan Kumar Kolamuri







-- 
Regards
Pavan Kumar Kolamuri


Spark 1.4.0-rc4 HiveContext.table(db.tbl) NoSuchTableException

2015-06-03 Thread Doug Balog
Hi, 
 
sqlContext.table(“db.tbl”) isn’t working for me, I get a NoSuchTableException.

But I can access the table via 

sqlContext.sql(“select * from db.tbl”)

So I know it has the table info from the metastore. 

Anyone else see this ?

I’ll keep digging. 
I compiled via make-distribution  -Pyarn -phadoop-2.4 -Phive -Phive-thriftserver
It worked for me in 1.3.1

Cheers,

Doug


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Equivalent to Storm's 'field grouping' in Spark.

2015-06-03 Thread Matei Zaharia
This happens automatically when you use the byKey operations, e.g. reduceByKey, 
updateStateByKey, etc. Spark Streaming keeps the state for a given set of keys 
on a specific node and sends new tuples with that key to that.

Matei

 On Jun 3, 2015, at 6:31 AM, allonsy luke1...@gmail.com wrote:
 
 Hi everybody,
 is there in Spark anything sharing the philosophy of Storm's field grouping?
 
 I'd like to manage data partitioning across the workers by sending tuples
 sharing the same key to the very same worker in the cluster, but I did not
 find any method to do that.
 
 Suggestions?
 
 :)
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Equivalent-to-Storm-s-field-grouping-in-Spark-tp23135.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org