Re: MLLib SVMWithSGD is failing for large dataset

2015-05-17 Thread Xiangrui Meng
Reducing the number of instances won't help in this case. We use the
driver to collect partial gradients. Even with tree aggregation, it
still puts heavy workload on the driver with 20M features. Please try
to reduce the number of partitions before training. We are working on
a more scalable implementation of logistic regression now, which
should be able to solve this problem efficiently. -Xiangrui

On Tue, Apr 28, 2015 at 3:43 PM, sarathkrishn...@gmail.com
 wrote:
> Hi,
>
> I'm just calling the standard SVMWithSGD implementation of Spark's MLLib.
> I'm not using any method like "collect".
>
> Thanks,
> Sarath
>
> On Tue, Apr 28, 2015 at 4:35 PM, ai he  wrote:
>>
>> Hi Sarath,
>>
>> It might be questionable to set num-executors as 64 if you only has 8
>> nodes. Do you use any action like "collect" which will overwhelm the
>> driver since you have a large dataset?
>>
>> Thanks
>>
>> On Tue, Apr 28, 2015 at 10:50 AM, sarath 
>> wrote:
>> >
>> > I am trying to train a large dataset consisting of 8 million data points
>> > and
>> > 20 million features using SVMWithSGD. But it is failing after running
>> > for
>> > some time. I tried increasing num-partitions, driver-memory,
>> > executor-memory, driver-max-resultSize. Also I tried by reducing the
>> > size of
>> > dataset from 8 million to 25K (keeping number of features same 20 M).
>> > But
>> > after using the entire 64GB driver memory for 20 to 30 min it failed.
>> >
>> > I'm using a cluster of 8 nodes (each with 8 cores and 64G RAM).
>> > executor-memory - 60G
>> > driver-memory - 60G
>> > num-executors - 64
>> > And other default settings
>> >
>> > This is the error log :
>> >
>> > 15/04/20 11:51:09 WARN NativeCodeLoader: Unable to load native-hadoop
>> > library for your platform... using builtin-java classes where applicable
>> > 15/04/20 11:51:29 WARN BLAS: Failed to load implementation from:
>> > com.github.fommil.netlib.NativeSystemBLAS
>> > 15/04/20 11:51:29 WARN BLAS: Failed to load implementation from:
>> > com.github.fommil.netlib.NativeRefBLAS
>> > 15/04/20 11:56:11 WARN TransportChannelHandler: Exception in connection
>> > from
>> > xxx.xxx.xxx.net/xxx.xxx.xxx.xxx:41029
>> > java.io.IOException: Connection reset by peer
>> > at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
>> > at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
>> > ...
>> > 15/04/20 11:56:11 ERROR TransportResponseHandler: Still have 7 requests
>> > outstanding when connection from xxx.xxx.xxx.net/xxx.xxx.xxx.xxx:41029
>> > is
>> > closed
>> > 15/04/20 11:56:11 ERROR OneForOneBlockFetcher: Failed while starting
>> > block
>> > fetches
>> > java.io.IOException: Connection reset by peer
>> > at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
>> > at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
>> > ...
>> > 15/04/20 11:56:11 ERROR OneForOneBlockFetcher: Failed while starting
>> > block
>> > fetches
>> > java.io.IOException: Connection reset by peer
>> > at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
>> > at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
>> > ...
>> > 15/04/20 11:56:12 ERROR RetryingBlockFetcher: Exception while beginning
>> > fetch of 1 outstanding blocks
>> > java.io.IOException: Failed to connect to
>> > xxx.xxx.xxx.net/xxx.xxx.xxx.xxx:41029
>> > at
>> >
>> > org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191)
>> > at
>> >
>> > org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
>> > at
>> >
>> > org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78)
>> > at
>> >
>> > org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
>> > at
>> >
>> > org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
>> > at
>> >
>> > org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:87)
>> > at
>> >
>> > org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:149)
>> > at
>> >
>> > org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:290)
>> > at
>> >
>> > org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:53)
>> > at
>> > scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>> > at
>> >
>> > org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>> > at
>> >
>> > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>> > at
>> > org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:91)
>> > at
>> >
>> > org.apache.spark.shuffle.hash.HashShuffleReader.rea

Re: bug: numClasses is not a valid argument of LogisticRegressionWithSGD

2015-05-17 Thread Xiangrui Meng
LogisticRegressionWithSGD doesn't support multi-class. Please use
LogisticRegressionWithLBFGS instead. -Xiangrui

On Mon, Apr 27, 2015 at 12:37 PM, Pagliari, Roberto
 wrote:
> With the Python APIs, the available arguments I got (using inspect module)
> are the following:
>
> ['cls', 'data', 'iterations', 'step', 'miniBatchFraction', 'initialWeights',
> 'regParam', 'regType', 'intercept']
>
> numClasses is not available. Can someone comment on this?
>
> Thanks,
>
>
>
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: StandardScaler failing with OOM errors in PySpark

2015-05-17 Thread Xiangrui Meng
AFAIK, there are two places where you can specify the driver memory.
One is via spark-summit --driver-memory and the other is via
spark.driver.memory in spark-defaults.conf. Please try these
approaches and see whether they work or not. You can find detailed
instructions at http://spark.apache.org/docs/latest/configuration.html
and http://spark.apache.org/docs/latest/submitting-applications.html.
-Xiangrui

On Tue, Apr 28, 2015 at 4:06 AM, Rok Roskar  wrote:
> That's exactly what I'm saying -- I specify the memory options using spark
> options, but this is not reflected in how the JVM is created. No matter
> which memory settings I specify, the JVM for the driver is always made with
> 512Mb of memory. So I'm not sure if this is a feature or a bug?
>
> rok
>
> On Mon, Apr 27, 2015 at 6:54 PM, Xiangrui Meng  wrote:
>>
>> You might need to specify driver memory in spark-submit instead of
>> passing JVM options. spark-submit is designed to handle different
>> deployments correctly. -Xiangrui
>>
>> On Thu, Apr 23, 2015 at 4:58 AM, Rok Roskar  wrote:
>> > ok yes, I think I have narrowed it down to being a problem with driver
>> > memory settings. It looks like the application master/driver is not
>> > being
>> > launched with the settings specified:
>> >
>> > For the driver process on the main node I see "-XX:MaxPermSize=128m
>> > -Xms512m
>> > -Xmx512m" as options used to start the JVM, even though I specified
>> >
>> > 'spark.yarn.am.memory', '5g'
>> > 'spark.yarn.am.memoryOverhead', '2000'
>> >
>> > The info shows that these options were read:
>> >
>> > 15/04/23 13:47:47 INFO yarn.Client: Will allocate AM container, with
>> > 7120 MB
>> > memory including 2000 MB overhead
>> >
>> > Is there some reason why these options are being ignored and instead
>> > starting the driver with just 512Mb of heap?
>> >
>> > On Thu, Apr 23, 2015 at 8:06 AM, Rok Roskar  wrote:
>> >>
>> >> the feature dimension is 800k.
>> >>
>> >> yes, I believe the driver memory is likely the problem since it doesn't
>> >> crash until the very last part of the tree aggregation.
>> >>
>> >> I'm running it via pyspark through YARN -- I have to run in client mode
>> >> so
>> >> I can't set spark.driver.memory -- I've tried setting the
>> >> spark.yarn.am.memory and overhead parameters but it doesn't seem to
>> >> have an
>> >> effect.
>> >>
>> >> Thanks,
>> >>
>> >> Rok
>> >>
>> >> On Apr 23, 2015, at 7:47 AM, Xiangrui Meng  wrote:
>> >>
>> >> > What is the feature dimension? Did you set the driver memory?
>> >> > -Xiangrui
>> >> >
>> >> > On Tue, Apr 21, 2015 at 6:59 AM, rok  wrote:
>> >> >> I'm trying to use the StandardScaler in pyspark on a relatively
>> >> >> small
>> >> >> (a few
>> >> >> hundred Mb) dataset of sparse vectors with 800k features. The fit
>> >> >> method of
>> >> >> StandardScaler crashes with Java heap space or Direct buffer memory
>> >> >> errors.
>> >> >> There should be plenty of memory around -- 10 executors with 2 cores
>> >> >> each
>> >> >> and 8 Gb per core. I'm giving the executors 9g of memory and have
>> >> >> also
>> >> >> tried
>> >> >> lots of overhead (3g), thinking it might be the array creation in
>> >> >> the
>> >> >> aggregators that's causing issues.
>> >> >>
>> >> >> The bizarre thing is that this isn't always reproducible --
>> >> >> sometimes
>> >> >> it
>> >> >> actually works without problems. Should I be setting up executors
>> >> >> differently?
>> >> >>
>> >> >> Thanks,
>> >> >>
>> >> >> Rok
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >> --
>> >> >> View this message in context:
>> >> >>
>> >> >> http://apache-spark-user-list.1001560.n3.nabble.com/StandardScaler-failing-with-OOM-errors-in-PySpark-tp22593.html
>> >> >> Sent from the Apache Spark User List mailing list archive at
>> >> >> Nabble.com.
>> >> >>
>> >> >>
>> >> >> -
>> >> >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> >> >> For additional commands, e-mail: user-h...@spark.apache.org
>> >> >>
>> >>
>> >
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to run multiple jobs in one sparkcontext from separate threads in pyspark?

2015-05-17 Thread MEETHU MATHEW
Hi Akhil, The python wrapper for Spark Job Server did not help me. I actually 
need the pyspark code sample  which shows how  I can call a function from 2 
threads and execute it simultaneously. Thanks & Regards,
Meethu M 


 On Thursday, 14 May 2015 12:38 PM, Akhil Das  
wrote:
   

 Did you happened to have a look at the spark job server? Someone wrote a 
python wrapper around it, give it a try.
ThanksBest Regards
On Thu, May 14, 2015 at 11:10 AM, MEETHU MATHEW  wrote:

Hi all,
 Quote "Inside a given Spark application (SparkContext instance), multiple 
parallel jobs can run simultaneously if they were submitted from separate 
threads. " 
How to run multiple jobs in one SPARKCONTEXT using separate threads in pyspark? 
I found some examples in scala and java, but couldn't find python code. Can 
anyone help me with a pyspark example? 
Thanks & Regards,
Meethu M



  

Re: InferredSchema Example in Spark-SQL

2015-05-17 Thread Simon Elliston Ball
You mean toDF() not toRD(). It stands for data frame of that makes it easier to 
remember.

Simon

> On 18 May 2015, at 01:07, Rajdeep Dua  wrote:
> 
> Hi All,
> Was trying the Inferred Schema spart example
> http://spark.apache.org/docs/latest/sql-programming-guide.html#overview
> 
> I am getting the following compilation error on the function toRD()
> 
> value toRD is not a member of org.apache.spark.rdd.RDD[Person]
> [error] val people = 
> sc.textFile("/home/ubuntu/work/spark-src/spark/examples/src/main/resources/people.txt").map(_.split(",")).map(p
>  => Person(p(0), p(1).trim.toInt)).toRD()
> [error]  
> 
> Thanks
> Rajdeep   
> 
> 
> 


Implementing custom metrics under MLPipeline's BinaryClassificationEvaluator

2015-05-17 Thread Justin Yip
Hello,

I would like to use other metrics in BinaryClassificaitonEvaluator, I am
thinking about simple ones (i.e. PrecisionByThreshold). From the api site,
I can't tell much about how to implement it.

>From the code, it seems like I will have to override this function, using
most of the existing code for checking column schema, then replace the line
which compute the actual score

.

Is my understanding correct? Or there are more convenient way of
implementing a metric in order to be used by ML pipeline?

Thanks.

Justin




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Implementing-custom-metrics-under-MLPipeline-s-BinaryClassificationEvaluator-tp22930.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Getting the best parameter set back from CrossValidatorModel

2015-05-17 Thread Justin Yip
Thanks Ram.

Your sample look is very helpful. (there is a minor bug that
PipelineModel.stages is hidden under private[ml], just need a wrapper
around it. :)

Justin

On Sat, May 16, 2015 at 10:44 AM, Ram Sriharsha 
wrote:

> Hi Justin
>
> The CrossValidatorExample here
> https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/ml/CrossValidatorExample.scala
> is a good example of how to set up an ML Pipeline for extracting a model
> with the best parameter set.
>
> You set up the pipeline as in here:
>
> https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/ml/CrossValidatorExample.scala#L73
>
> This pipeline is treated as an estimator and wrapped into a Cross
> Validator to do grid search and return the model with the best parameters .
> Once you have trained the best model as in here
>
> https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/ml/CrossValidatorExample.scala#L93
>
> The result is a CrossValidatorModel which contains the best estimator
> (i.e. the best pipeline above) and you can extract the best pipeline and
> inquire its parameters as follows:
>
> // what are the best parameters?
> val bestPipelineModel = cvModel.bestModel.asInstanceOf[PipelineModel]
> val stages = bestPipelineModel.stages
>
> val hashingStage = stages(1).asInstanceOf[HashingTF]
> println(hashingStage.getNumFeatures)
> val lrStage = stages(2).asInstanceOf[LogisticRegressionModel]
> println(lrStage.getRegParam)
>
>
>
> Ram
>
> On Sat, May 16, 2015 at 3:17 AM, Justin Yip 
> wrote:
>
>> Hello,
>>
>> I am using MLPipeline. I would like to extract the best parameter found
>> by CrossValidator. But I cannot find much document about how to do it. Can
>> anyone give me some pointers?
>>
>> Thanks.
>>
>> Justin
>>
>> --
>> View this message in context: Getting the best parameter set back from
>> CrossValidatorModel
>> 
>> Sent from the Apache Spark User List mailing list archive
>>  at Nabble.com.
>>
>
>


Re: --jars works in "yarn-client" but not "yarn-cluster" mode, why?

2015-05-17 Thread Fengyun RAO
Thanks, Wilfred.

The problem is, the jar "/opt/cloudera/parcels/CD
H/lib/hbase/lib/htrace-core-3.1.0-incubating.jar"
is on every node in the cluster, since we installed CDH 5.4.

thus no matter we run on client or cluster, the driver has access to the
jar.

What's more, the driver does not depend on the jar, it is the executor that
throws the "ClassNotFoundException"


2015-05-18 6:53 GMT+08:00 Wilfred Spiegelenburg :

> When you run the driver in the cluster the application really runs from
> the cluster and the client goes away. If the driver does not have access to
> the jars, i.e. if they are not on the cluster available somewhere, this
> will happen.
> If you run the driver on the client the driver has access to the jars
> there. Unless you have copied the jars onto the cluster it will not work.
> That is what SPARK-5377 is all about.
>
> Wilfred
>
> On 15/05/2015 00:37, Fengyun RAO wrote:
>
>> thanks, Wilfred.
>>
>> In our program, the "htrace-core-3.1.0-incubating.jar" dependency is
>> only required in the executor, not in the driver.
>> while in both "yarn-client" and "yarn-cluster", the executor runs in
>> cluster.
>>
>> and it's clearly in "yarn-cluster" mode, the jar IS in
>> "spark.yarn.secondary.jars", but still throws ClassNotFoundException
>>
>> 2015-05-14 18:52 GMT+08:00 Wilfred Spiegelenburg
>> mailto:wspiegelenb...@cloudera.com>>:
>>
>> In the cluster the driver runs in the cluster and not locally in the
>> spark-submit JVM. This changes what is available on your classpath.
>> It looks like you are running into a similar situation as described
>> in SPARK-5377.
>>
>> Wilfred
>>
>> On 14/05/2015 13:47, Fengyun RAO wrote:
>>
>> I look into the "Environment" in both modes.
>>
>> yarn-client:
>> spark.jars
>>
>> local:/opt/cloudera/parcels/CDH/lib/hbase/lib/htrace-core-3.1.0-incubating.jar,file:/home/xxx/my-app.jar
>>
>> yarn-cluster:
>> spark.yarn.secondary.jars
>>
>> local:/opt/cloudera/parcels/CDH/lib/hbase/lib/htrace-core-3.1.0-incubating.jar
>>
>> I wonder why htrace exists in "spark.yarn.secondary.jars" but
>> still not
>> found in URLClassLoader.
>>
>> I tried both "local" and "file" mode for the jar, still the same
>> error.
>>
>>
>> 2015-05-14 11:37 GMT+08:00 Fengyun RAO > 
>> >>:
>>
>>
>>
>>  Hadoop version: CDH 5.4.
>>
>>  We need to connect to HBase, thus need extra
>>
>>
>> "/opt/cloudera/parcels/CDH/lib/hbase/lib/htrace-core-3.1.0-incubating.jar"
>>  dependency.
>>
>>  It works in yarn-client mode:
>>  "spark-submit --class xxx.xxx.MyApp --master yarn-client
>>  --num-executors 10 --executor-memory 10g --jars
>>
>>
>> /opt/cloudera/parcels/CDH/lib/hbase/lib/htrace-core-3.1.0-incubating.jar
>>  my-app.jar /input /output"
>>
>>  However, if we change "yarn-client" to "yarn-cluster', it
>> throws an
>>  ClassNotFoundException (actually the class exists in
>>  htrace-core-3.1.0-incubating.jar):
>>
>>  Caused by: java.lang.NoClassDefFoundError:
>> org/apache/htrace/Trace
>>  at
>>
>> org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper.exists(RecoverableZooKeeper.java:218)
>>  at
>>
>> org.apache.hadoop.hbase.zookeeper.ZKUtil.checkExists(ZKUtil.java:481)
>>  at
>>
>> org.apache.hadoop.hbase.zookeeper.ZKClusterId.readClusterIdZNode(ZKClusterId.java:65)
>>  at
>>
>> org.apache.hadoop.hbase.client.ZooKeeperRegistry.getClusterId(ZooKeeperRegistry.java:86)
>>  at
>>
>> org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.retrieveClusterId(ConnectionManager.java:850)
>>  at
>>
>> org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.(ConnectionManager.java:635)
>>  ... 21 more
>>  Caused by: java.lang.ClassNotFoundException:
>> org.apache.htrace.Trace
>>  at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>>  at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>>  at java.security.AccessController.doPrivileged(Native
>> Method)
>>  at
>> java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>>  at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>>  at
>> sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>>  at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>>
>>
>>  Why --jars doesn't work in yarn-cluster mode? How to add
>> extra dependency in "yarn-cluster" mode?
>>
>>
>>
>> --
>>
>> ---
>> You received this message because you are subscr

Re: InferredSchema Example in Spark-SQL

2015-05-17 Thread Ram Sriharsha
you are missing sqlContext.implicits._

On Sun, May 17, 2015 at 8:05 PM, Rajdeep Dua  wrote:

> Here are my imports
>
> *import* org.apache.spark.SparkContext
>
> *import* org.apache.spark.SparkContext._
>
> *import* org.apache.spark.SparkConf
>
> *import* org.apache.spark.sql.SQLContext
>
> *import* org.apache.spark.sql.SchemaRDD
>
> On Sun, May 17, 2015 at 8:05 PM, Rajdeep Dua 
> wrote:
>
>> Sorry .. toDF() gives an error
>>
>> [error]
>> /home/ubuntu/work/spark/spark-samples/ml-samples/src/main/scala/sql/InferredSchema.scala:24:
>> value toDF is not a member of org.apache.spark.rdd.RDD[Person]
>> [error] val people =
>> sc.textFile("/home/ubuntu/work/spark-src/spark/examples/src/main/resources/people.txt").map(_.split(",")).map(p
>> => Person(p(0), p(1).trim.toInt)).toDF()
>>
>>
>> On Sun, May 17, 2015 at 5:41 PM, Cheng, Hao  wrote:
>>
>>>  Typo? Should be .toDF(), not .toRD()
>>>
>>>
>>>
>>> *From:* Ram Sriharsha [mailto:sriharsha@gmail.com]
>>> *Sent:* Monday, May 18, 2015 8:31 AM
>>> *To:* Rajdeep Dua
>>> *Cc:* user
>>> *Subject:* Re: InferredSchema Example in Spark-SQL
>>>
>>>
>>>
>>> you mean toDF() ? (toDF converts the RDD to a DataFrame, in this case
>>> inferring schema from the case class)
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Sun, May 17, 2015 at 5:07 PM, Rajdeep Dua 
>>> wrote:
>>>
>>>  Hi All,
>>>
>>> Was trying the Inferred Schema spart example
>>>
>>> http://spark.apache.org/docs/latest/sql-programming-guide.html#overview
>>>
>>>
>>>
>>> I am getting the following compilation error on the function toRD()
>>>
>>>
>>>
>>> value toRD is not a member of org.apache.spark.rdd.RDD[Person]
>>>
>>> [error] val people =
>>> sc.textFile("/home/ubuntu/work/spark-src/spark/examples/src/main/resources/people.txt").map(_.split(",")).map(p
>>> => Person(p(0), p(1).trim.toInt)).toRD()
>>>
>>> [error]
>>>
>>>
>>>
>>> Thanks
>>>
>>> Rajdeep
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>
>>
>


Re: InferredSchema Example in Spark-SQL

2015-05-17 Thread Rajdeep Dua
Here are my imports

*import* org.apache.spark.SparkContext

*import* org.apache.spark.SparkContext._

*import* org.apache.spark.SparkConf

*import* org.apache.spark.sql.SQLContext

*import* org.apache.spark.sql.SchemaRDD

On Sun, May 17, 2015 at 8:05 PM, Rajdeep Dua  wrote:

> Sorry .. toDF() gives an error
>
> [error]
> /home/ubuntu/work/spark/spark-samples/ml-samples/src/main/scala/sql/InferredSchema.scala:24:
> value toDF is not a member of org.apache.spark.rdd.RDD[Person]
> [error] val people =
> sc.textFile("/home/ubuntu/work/spark-src/spark/examples/src/main/resources/people.txt").map(_.split(",")).map(p
> => Person(p(0), p(1).trim.toInt)).toDF()
>
>
> On Sun, May 17, 2015 at 5:41 PM, Cheng, Hao  wrote:
>
>>  Typo? Should be .toDF(), not .toRD()
>>
>>
>>
>> *From:* Ram Sriharsha [mailto:sriharsha@gmail.com]
>> *Sent:* Monday, May 18, 2015 8:31 AM
>> *To:* Rajdeep Dua
>> *Cc:* user
>> *Subject:* Re: InferredSchema Example in Spark-SQL
>>
>>
>>
>> you mean toDF() ? (toDF converts the RDD to a DataFrame, in this case
>> inferring schema from the case class)
>>
>>
>>
>>
>>
>>
>>
>> On Sun, May 17, 2015 at 5:07 PM, Rajdeep Dua 
>> wrote:
>>
>>  Hi All,
>>
>> Was trying the Inferred Schema spart example
>>
>> http://spark.apache.org/docs/latest/sql-programming-guide.html#overview
>>
>>
>>
>> I am getting the following compilation error on the function toRD()
>>
>>
>>
>> value toRD is not a member of org.apache.spark.rdd.RDD[Person]
>>
>> [error] val people =
>> sc.textFile("/home/ubuntu/work/spark-src/spark/examples/src/main/resources/people.txt").map(_.split(",")).map(p
>> => Person(p(0), p(1).trim.toInt)).toRD()
>>
>> [error]
>>
>>
>>
>> Thanks
>>
>> Rajdeep
>>
>>
>>
>>
>>
>>
>>
>>
>
>


Re: InferredSchema Example in Spark-SQL

2015-05-17 Thread Rajdeep Dua
Sorry .. toDF() gives an error

[error]
/home/ubuntu/work/spark/spark-samples/ml-samples/src/main/scala/sql/InferredSchema.scala:24:
value toDF is not a member of org.apache.spark.rdd.RDD[Person]
[error] val people =
sc.textFile("/home/ubuntu/work/spark-src/spark/examples/src/main/resources/people.txt").map(_.split(",")).map(p
=> Person(p(0), p(1).trim.toInt)).toDF()


On Sun, May 17, 2015 at 5:41 PM, Cheng, Hao  wrote:

>  Typo? Should be .toDF(), not .toRD()
>
>
>
> *From:* Ram Sriharsha [mailto:sriharsha@gmail.com]
> *Sent:* Monday, May 18, 2015 8:31 AM
> *To:* Rajdeep Dua
> *Cc:* user
> *Subject:* Re: InferredSchema Example in Spark-SQL
>
>
>
> you mean toDF() ? (toDF converts the RDD to a DataFrame, in this case
> inferring schema from the case class)
>
>
>
>
>
>
>
> On Sun, May 17, 2015 at 5:07 PM, Rajdeep Dua 
> wrote:
>
>  Hi All,
>
> Was trying the Inferred Schema spart example
>
> http://spark.apache.org/docs/latest/sql-programming-guide.html#overview
>
>
>
> I am getting the following compilation error on the function toRD()
>
>
>
> value toRD is not a member of org.apache.spark.rdd.RDD[Person]
>
> [error] val people =
> sc.textFile("/home/ubuntu/work/spark-src/spark/examples/src/main/resources/people.txt").map(_.split(",")).map(p
> => Person(p(0), p(1).trim.toInt)).toRD()
>
> [error]
>
>
>
> Thanks
>
> Rajdeep
>
>
>
>
>
>
>
>


RE: [SparkStreaming] Is it possible to delay the start of some DStream in the application?

2015-05-17 Thread Haopu Wang
I want to use file stream as input. And I look at SparkStreaming
document again, it's saying file stream doesn't need a receiver at all.

So I'm wondering if I can control a specific DStream instance.

 



From: Evo Eftimov [mailto:evo.efti...@isecc.com] 
Sent: Monday, May 18, 2015 12:39 AM
To: 'Akhil Das'; Haopu Wang
Cc: 'user'
Subject: RE: [SparkStreaming] Is it possible to delay the start of some
DStream in the application?

 

You can make ANY standard receiver sleep by implementing a custom
Message Deserializer class with sleep method inside it. 

 

From: Akhil Das [mailto:ak...@sigmoidanalytics.com] 
Sent: Sunday, May 17, 2015 4:29 PM
To: Haopu Wang
Cc: user
Subject: Re: [SparkStreaming] Is it possible to delay the start of some
DStream in the application?

 

Why not just trigger your batch job with that event?

 

If you really need streaming, then you can create a custom receiver and
make the receiver sleep till the event has happened. That will obviously
run your streaming pipelines without having any data to process.




Thanks

Best Regards

 

On Fri, May 15, 2015 at 4:39 AM, Haopu Wang  wrote:

In my application, I want to start a DStream computation only after an
special event has happened (for example, I want to start the receiver
only after the reference data has been properly initialized).

My question is: it looks like the DStream will be started right after
the StreaminContext has been started. Is it possible to delay the start
of specific DStream?

Thank you very much!

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

 



RE: InferredSchema Example in Spark-SQL

2015-05-17 Thread Cheng, Hao
Typo? Should be .toDF(), not .toRD()

From: Ram Sriharsha [mailto:sriharsha@gmail.com]
Sent: Monday, May 18, 2015 8:31 AM
To: Rajdeep Dua
Cc: user
Subject: Re: InferredSchema Example in Spark-SQL

you mean toDF() ? (toDF converts the RDD to a DataFrame, in this case inferring 
schema from the case class)



On Sun, May 17, 2015 at 5:07 PM, Rajdeep Dua 
mailto:rajdeep@gmail.com>> wrote:
Hi All,
Was trying the Inferred Schema spart example
http://spark.apache.org/docs/latest/sql-programming-guide.html#overview

I am getting the following compilation error on the function toRD()

value toRD is not a member of org.apache.spark.rdd.RDD[Person]
[error] val people = 
sc.textFile("/home/ubuntu/work/spark-src/spark/examples/src/main/resources/people.txt").map(_.split(",")).map(p
 => Person(p(0), p(1).trim.toInt)).toRD()
[error]

Thanks
Rajdeep





RE: InferredSchema Example in Spark-SQL

2015-05-17 Thread Cheng, Hao
Forgot to import the implicit functions/classes?

import sqlContext.implicits._

From: Rajdeep Dua [mailto:rajdeep@gmail.com]
Sent: Monday, May 18, 2015 8:08 AM
To: user@spark.apache.org
Subject: InferredSchema Example in Spark-SQL

Hi All,
Was trying the Inferred Schema spart example
http://spark.apache.org/docs/latest/sql-programming-guide.html#overview

I am getting the following compilation error on the function toRD()

value toRD is not a member of org.apache.spark.rdd.RDD[Person]
[error] val people = 
sc.textFile("/home/ubuntu/work/spark-src/spark/examples/src/main/resources/people.txt").map(_.split(",")).map(p
 => Person(p(0), p(1).trim.toInt)).toRD()
[error]

Thanks
Rajdeep




Re: InferredSchema Example in Spark-SQL

2015-05-17 Thread Ram Sriharsha
you mean toDF() ? (toDF converts the RDD to a DataFrame, in this case
inferring schema from the case class)



On Sun, May 17, 2015 at 5:07 PM, Rajdeep Dua  wrote:

> Hi All,
> Was trying the Inferred Schema spart example
> http://spark.apache.org/docs/latest/sql-programming-guide.html#overview
>
> I am getting the following compilation error on the function toRD()
>
> value toRD is not a member of org.apache.spark.rdd.RDD[Person]
> [error] val people =
> sc.textFile("/home/ubuntu/work/spark-src/spark/examples/src/main/resources/people.txt").map(_.split(",")).map(p
> => Person(p(0), p(1).trim.toInt)).toRD()
> [error]
>
> Thanks
> Rajdeep
>
>
>
>


InferredSchema Example in Spark-SQL

2015-05-17 Thread Rajdeep Dua
Hi All,
Was trying the Inferred Schema spart example
http://spark.apache.org/docs/latest/sql-programming-guide.html#overview

I am getting the following compilation error on the function toRD()

value toRD is not a member of org.apache.spark.rdd.RDD[Person]
[error] val people =
sc.textFile("/home/ubuntu/work/spark-src/spark/examples/src/main/resources/people.txt").map(_.split(",")).map(p
=> Person(p(0), p(1).trim.toInt)).toRD()
[error]

Thanks
Rajdeep


Re: Union of checkpointed RDD in Apache Spark has long (> 10 hour) between-stage latency

2015-05-17 Thread Peng Cheng
Turns out the above thread is unrelated: it was caused by using s3:// instead
of s3n://. Which I already avoided in my checkpointDir configuration.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Union-of-checkpointed-RDD-in-Apache-Spark-has-long-10-hour-between-stage-latency-tp22925p22928.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Union of checkpointed RDD in Apache Spark has long (> 10 hour) between-stage latency

2015-05-17 Thread Peng Cheng
Looks like this problem has been mentioned before:

http://qnalist.com/questions/5666463/downloads-from-s3-exceedingly-slow-when-running-on-spark-ec2

and a temporarily solution is to deploy on a dedicated EMR/S3 configuration.
I'll go for that one for a shot.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Union-of-checkpointed-RDD-in-Apache-Spark-has-long-10-hour-between-stage-latency-tp22925p22927.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Union of checkpointed RDD in Apache Spark has long (> 10 hour) between-stage latency

2015-05-17 Thread Peng Cheng
BTW: My thread dump of the driver's main thread looks like it is stuck on
waiting for Amazon S3 bucket metadata for a long time (which may suggests
that I should move checkpointing directory from S3 to HDFS):

Thread 1: main (RUNNABLE) 
java.net.SocketInputStream.socketRead0(Native Method)
java.net.SocketInputStream.read(SocketInputStream.java:152)
java.net.SocketInputStream.read(SocketInputStream.java:122)
sun.security.ssl.InputRecord.readFully(InputRecord.java:442)
sun.security.ssl.InputRecord.read(InputRecord.java:480)
sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:934)
sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:891)
sun.security.ssl.AppInputStream.read(AppInputStream.java:102)
org.apache.http.impl.io.AbstractSessionInputBuffer.fillBuffer(AbstractSessionInputBuffer.java:160)
org.apache.http.impl.io.SocketInputBuffer.fillBuffer(SocketInputBuffer.java:84)
org.apache.http.impl.io.AbstractSessionInputBuffer.readLine(AbstractSessionInputBuffer.java:273)
org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:140)
org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:57)
org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:260)
org.apache.http.impl.AbstractHttpClientConnection.receiveResponseHeader(AbstractHttpClientConnection.java:283)
org.apache.http.impl.conn.DefaultClientConnection.receiveResponseHeader(DefaultClientConnection.java:251)
org.apache.http.impl.conn.AbstractClientConnAdapter.receiveResponseHeader(AbstractClientConnAdapter.java:223)
org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:271)
org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:123)
org.apache.http.impl.client.DefaultRequestDirector.tryExecute(DefaultRequestDirector.java:685)
org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:487)
org.apache.http.impl.client.AbstractHttpClient.doExecute(AbstractHttpClient.java:863)
org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82)
org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:57)
org.jets3t.service.impl.rest.httpclient.RestStorageService.performRequest(RestStorageService.java:326)
org.jets3t.service.impl.rest.httpclient.RestStorageService.performRequest(RestStorageService.java:277)
org.jets3t.service.impl.rest.httpclient.RestStorageService.performRestHead(RestStorageService.java:1038)
org.jets3t.service.impl.rest.httpclient.RestStorageService.getObjectImpl(RestStorageService.java:2250)
org.jets3t.service.impl.rest.httpclient.RestStorageService.getObjectDetailsImpl(RestStorageService.java:2179)
org.jets3t.service.StorageService.getObjectDetails(StorageService.java:1120)
org.jets3t.service.StorageService.getObjectDetails(StorageService.java:575)
org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:172)
sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:606)
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:190)
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:103)
org.apache.hadoop.fs.s3native.$Proxy10.retrieveMetadata(Unknown Source)
org.apache.hadoop.fs.s3native.NativeS3FileSystem.getFileStatus(NativeS3FileSystem.java:414)
org.apache.spark.rdd.CheckpointRDD.getPreferredLocations(CheckpointRDD.scala:66)




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Union-of-checkpointed-RDD-in-Apache-Spark-has-long-10-hour-between-stage-latency-tp22925p22926.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Union of checkpointed RDD in Apache Spark has long (> 10 hour) between-stage latency

2015-05-17 Thread Peng Cheng
I'm implementing one of my machine learning/graph analysis algorithm on
Apache Spark:

The algorithm is very iterative (like all other ML algorithms), but it has a
rather strange workflow: first a subset of all training data (called seeds
RDD: {S_1} is randomly selected) and loaded, in each iteration, the seeds
{S_n} will update itself {S_n+1} and yield a model RDD: {M_n}. After the
seeds have reached a condition the iteration will stop and all model RDD are
aggregated to yield the final result.

Like all iterative implementation in MLLib, both {S_} and {M_} has to be
checkpointed regularly (which seems to be more efficient than commiting {M}
into a growing RDD and cache/checkpoint it: old data already on HDFS don't
have to be written into disk again or take memory space until the final
stage).

However, before the final step when all {M_*} are aggregated. The spark
seems to get frozen: all stages/jobs are completed, no new stage/job are
pending, and all drivers and clusters are running but doing nothing (the
algorithm is still far from completion).

I have to wait for 10+ hours before it start to proceed. So the latency
between stages on UI looks really weird (see the sharp contrast between 15s
task running time and 10h+ between-stage latency?):

 

I wonder if my implementation for algorithm is not optimized for Spark? Or I
simply encounter a hidden issue? Thanks a lot for your opinion



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Union-of-checkpointed-RDD-in-Apache-Spark-has-long-10-hour-between-stage-latency-tp22925.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Best practice to avoid ambiguous columns in DataFrame.join

2015-05-17 Thread Jan-Paul Bultmann
It’s probably not advisable to use 1 though since it will break when `df = df2`,
which can easily happen when you’ve written a function that does such a join 
internally.

This could be solved by an identity like function that returns the dataframe 
unchanged but with a different identity.
`.as` would be such a candidate but that doesn’t work.

Thoughts?

> On 16 May 2015, at 00:55, Michael Armbrust  > wrote:
> 
> There are several ways to solve this ambiguity:
> 
> 1. use the DataFrames to get the attribute so its already "resolved" and not 
> just a string we need to map to a DataFrame.
> 
> df.join(df2, df("_1") === df2("_1"))
> 
> 2. Use aliases
> 
> df.as ('a).join(df2.as ('b), $"a._1" === 
> $"b._1")
> 
> 3. rename the columns as you suggested.
> 
> df.join(df2.withColumnRenamed("_1", "right_key"), $"_1" === 
> $"right_key").printSchema
> 
> 4. (Spark 1.4 only) use def join(right: DataFrame, usingColumn: String): 
> DataFrame
> 
> df.join(df1, "_1")
> 
> This has the added benefit of only outputting a single _1 column.
> 
> On Fri, May 15, 2015 at 3:44 PM, Justin Yip  > wrote:
> Hello,
> 
> I would like ask know if there are recommended ways of preventing ambiguous 
> columns when joining dataframes. When we join dataframes, it usually happen 
> we join the column with identical name. I could have rename the columns on 
> the right data frame, as described in the following code. Is there a better 
> way to achieve this? 
> 
> scala> val df = sqlContext.createDataFrame(Seq((1, "a"), (2, "b"), (3, "b"), 
> (4, "b")))
> df: org.apache.spark.sql.DataFrame = [_1: int, _2: string]
> 
> scala> val df2 = sqlContext.createDataFrame(Seq((1, 10), (2, 20), (3, 30), 
> (4, 40)))
> df2: org.apache.spark.sql.DataFrame = [_1: int, _2: int]
> 
> scala> df.join(df2.withColumnRenamed("_1", "right_key"), $"_1" === 
> $"right_key").printSchema
> 
> Thanks.
> 
> Justin
> 
> View this message in context: Best practice to avoid ambiguous columns in 
> DataFrame.join 
> 
> Sent from the Apache Spark User List mailing list archive 
>  at Nabble.com 
> .
> 



Re: Best practice to avoid ambiguous columns in DataFrame.join

2015-05-17 Thread Michael Armbrust
In Spark 1.4 there is an extra heuristic to detect self-joins, which means
that even option 1 will still work.

On Sun, May 17, 2015 at 9:31 AM, Jan-Paul Bultmann 
wrote:

> It’s probably not advisable to use 1 though since it will break when `df =
> df2`,
> which can easily happen when you’ve written a function that does such a
> join internally.
>
> This could be solved by an identity like function that returns the
> dataframe unchanged but with a different identity.
> `.as` would be such a candidate but that doesn’t work.
>
> Thoughts?
>
>
> On 16 May 2015, at 00:55, Michael Armbrust  wrote:
>
> There are several ways to solve this ambiguity:
>
> *1. use the DataFrames to get the attribute so its already "resolved" and
> not just a string we need to map to a DataFrame.*
>
> df.join(df2, df("_1") === df2("_1"))
>
> *2. Use aliases*
>
> df.as('a).join(df2.as('b), $"a._1" === $"b._1")
>
> *3. rename the columns as you suggested.*
>
> df.join(df2.withColumnRenamed("_1", "right_key"), $"_1" ===
> $"right_key").printSchema
>
> *4. (Spark 1.4 only) use def join(right: DataFrame, usingColumn: String):
> DataFrame*
>
> df.join(df1, "_1")
>
> This has the added benefit of only outputting a single _1 column.
>
> On Fri, May 15, 2015 at 3:44 PM, Justin Yip 
> wrote:
>
>> Hello,
>>
>> I would like ask know if there are recommended ways of preventing
>> ambiguous columns when joining dataframes. When we join dataframes, it
>> usually happen we join the column with identical name. I could have rename
>> the columns on the right data frame, as described in the following code. Is
>> there a better way to achieve this?
>>
>> scala> val df = sqlContext.createDataFrame(Seq((1, "a"), (2, "b"), (3,
>> "b"), (4, "b")))
>> df: org.apache.spark.sql.DataFrame = [_1: int, _2: string]
>>
>> scala> val df2 = sqlContext.createDataFrame(Seq((1, 10), (2, 20), (3,
>> 30), (4, 40)))
>> df2: org.apache.spark.sql.DataFrame = [_1: int, _2: int]
>>
>> scala> df.join(df2.withColumnRenamed("_1", "right_key"), $"_1" ===
>> $"right_key").printSchema
>>
>> Thanks.
>>
>> Justin
>>
>> --
>> View this message in context: Best practice to avoid ambiguous columns
>> in DataFrame.join
>> 
>> Sent from the Apache Spark User List mailing list archive
>>  at Nabble.com.
>>
>
>
>


Re: textFileStream Question

2015-05-17 Thread Vadim Bichutskiy
This is cool. Thanks Akhil.

ᐧ

On Sun, May 17, 2015 at 11:25 AM, Akhil Das 
wrote:

> With file timestamp, you can actually see the finding new files logic
> from here
> 
>
> Thanks
> Best Regards
>
> On Fri, May 15, 2015 at 2:25 AM, Vadim Bichutskiy <
> vadim.bichuts...@gmail.com> wrote:
>
>> How does textFileStream work behind the scenes? How does Spark Streaming
>> know what files are new and need to be processed? Is it based on time
>> stamp, file name?
>>
>> Thanks,
>> Vadim
>> ᐧ
>>
>
>


Re: Effecient way to fetch all records on a particular node/partition in GraphX

2015-05-17 Thread Ankur Dave
If you know the partition IDs, you can launch a job that runs tasks on only
those partitions by calling sc.runJob
.
For example, we do this in IndexedRDD

to get particular keys without launching a task on every partition.

Ankur 

On Sun, May 17, 2015 at 8:32 AM, mas  wrote:

> I have distributed my RDD into say 10 nodes. I want to fetch the data that
> resides on a particular node say "node 5". How i can achieve this?
> I have tried mapPartitionWithIndex function to filter the data of that
> corresponding node, however it is pretty expensive.
>


Re: Data partitioning and node tracking in Spark-GraphX

2015-05-17 Thread MUHAMMAD AAMIR
Can you please elaborate the way to fetch the records from a particular
partition (node in our case) For example, my RDD is distributed to 10 nodes
and i want to fetch the data of one particular node/partition  i.e.
partition/node with index "5".
How can i do this?
I have tried mapPartitionswithIndex as well as partitions.foreach
functions. However, these are expensive. Does any body know more efficient
way ?

Thanks in anticipation.


On Thu, Apr 16, 2015 at 5:49 PM, Evo Eftimov  wrote:

> Well you can have a two level index structure, still without any need for
> physical cluster node awareness
>
>
>
> Level 1 Index is the previously described partitioned [K,V] RDD – this
> gets you to the value (RDD element) you need on the respective cluster node
>
>
>
> Level 2 Index – it will be built and reside within the Value of each [K,V]
> RDD element – so after you retrieve the appropriate Element from the
> appropriate cluster node based on Level 1 Index, then you query the Value
> in the element based on Level 2 Index
>
>
>
> *From:* MUHAMMAD AAMIR [mailto:mas.ha...@gmail.com]
> *Sent:* Thursday, April 16, 2015 4:32 PM
>
> *To:* Evo Eftimov
> *Cc:* user@spark.apache.org
> *Subject:* Re: Data partitioning and node tracking in Spark-GraphX
>
>
>
> Thanks a lot for the reply. Indeed it is useful but to be more precise i
> have 3D data and want to index it using octree. Thus i aim to build a two
> level indexing mechanism i.e. First at global level i want to partition and
> send the data to the nodes then at node level i again want to use octree to
> inded my data at local level.
>
> Could you please elaborate the solution in this context ?
>
>
>
> On Thu, Apr 16, 2015 at 5:23 PM, Evo Eftimov 
> wrote:
>
> Well you can use a [Key, Value] RDD and partition it based on hash
> function on the Key and even a specific number of partitions (and hence
> cluster nodes). This will a) index the data, b) divide it and send it to
> multiple nodes. Re your last requirement - in a cluster programming
> environment/framework your app code should not be bothered on which
> physical node exactly, a partition resides
>
>
>
> Regards
>
> Evo Eftimov
>
>
>
> *From:* MUHAMMAD AAMIR [mailto:mas.ha...@gmail.com]
> *Sent:* Thursday, April 16, 2015 4:20 PM
> *To:* Evo Eftimov
> *Cc:* user@spark.apache.org
> *Subject:* Re: Data partitioning and node tracking in Spark-GraphX
>
>
>
> I want to use Spark functions/APIs to do this task. My basic purpose is to
> index the data and divide and send it to multiple nodes. Then at the time
> of accessing i want to reach the right node and data partition. I don't
> have any clue how to do this.
>
> Thanks,
>
>
>
> On Thu, Apr 16, 2015 at 5:13 PM, Evo Eftimov 
> wrote:
>
> How do you intend to "fetch the required data" - from within Spark or using
> an app / code / module outside Spark
>
> -Original Message-
> From: mas [mailto:mas.ha...@gmail.com]
> Sent: Thursday, April 16, 2015 4:08 PM
> To: user@spark.apache.org
> Subject: Data partitioning and node tracking in Spark-GraphX
>
> I have a big data file, i aim to create index on the data. I want to
> partition the data based on user defined function in Spark-GraphX (Scala).
> Further i want to keep track the node on which a particular data partition
> is send and being processed so i could fetch the required data by accessing
> the right node and data partition.
> How can i achieve this?
> Any help in this regard will be highly appreciated.
>
>
>
> --
> View this message in context:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Data-partitioning-and-no
> de-tracking-in-Spark-GraphX-tp22527.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional
> commands, e-mail: user-h...@spark.apache.org
>
>
>
>
>
> --
>
> Regards,
> Muhammad Aamir
>
>
> *CONFIDENTIALITY:This email is intended solely for the person(s) named and
> may be confidential and/or privileged.If you are not the intended
> recipient,please delete it,notify me and do not copy,use,or disclose its
> content.*
>
>
>
>
>
> --
>
> Regards,
> Muhammad Aamir
>
>
> *CONFIDENTIALITY:This email is intended solely for the person(s) named and
> may be confidential and/or privileged.If you are not the intended
> recipient,please delete it,notify me and do not copy,use,or disclose its
> content.*
>



-- 
Regards,
Muhammad Aamir


*CONFIDENTIALITY:This email is intended solely for the person(s) named and
may be confidential and/or privileged.If you are not the intended
recipient,please delete it,notify me and do not copy,use,or disclose its
content.*


Trying to understand sc.textFile better

2015-05-17 Thread Justin Pihony
All,
I am trying to understand the textFile method deeply, but I think my
lack of deep Hadoop knowledge is holding me back here. Let me lay out my
understanding and maybe you can correct anything that is incorrect

When sc.textFile(path) is called, then defaultMinPartitions is used,
which is really just math.min(taskScheduler.defaultParallelism, 2). Let's
assume we are using the SparkDeploySchedulerBackend and this is 
conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(),
2))
So, now let's say the default is 2, going back to the textFile, this is
passed in to HadoopRDD. The true size is determined in getPartitions() using
inputFormat.getSplits(jobConf, minPartitions). But, from what I can find,
the partitions is merely a hint and is in fact mostly ignored, so you will
probably get the total number of blocks.
OK, this fits with expectations, however what if the default is not used and
you provide a partition size that is larger than the block size. If my
research is right and the getSplits call simply ignores this parameter, then
wouldn't the provided min end up being ignored and you would still just get
the block size?

Thanks,
Justin



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Trying-to-understand-sc-textFile-better-tp22924.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Spark Streaming and reducing latency

2015-05-17 Thread Evo Eftimov
This is the nature of Spark Streaming as a System Architecture:

 

1.   It is a batch processing system architecture (Spark Batch) optimized 
for Streaming Data

2.   In terms of sources of Latency in such System Architecture, bear in 
mind that besides “batching”, there is also the Central “Driver” 
function/module, which is essentially a Central Job/Task Manager (ie running on 
a dedicated node, which doesn’t sit on the Path of the Messages), which even in 
a Streaming Data scenario, FOR EACH Streaming BATCH schedules tasks (as per the 
DAG for the streaming job), sends them to the workers, receives the results, 
then schedules and sends more tasks (as per the DAG for the job) and so on and 
so forth

 

In terms of Parallel Programming Patterns/Architecture, the above is known as 
Data Parallel Architecture with Central Job/Task Manager.

 

There are other alternatives for achieving lower latency and in terms of 
Parallel Programming Patterns they are known as Pipelines or Task Parallel 
Architecture – essentially every messages streams individually through an 
assembly line of Tasks. As the tasks can be run on multiple cores of one box or 
in a distributed environment. Storm for example implements this pattern or you 
can just put together your own solution 

 

From: Akhil Das [mailto:ak...@sigmoidanalytics.com] 
Sent: Sunday, May 17, 2015 4:04 PM
To: dgoldenberg
Cc: user@spark.apache.org
Subject: Re: Spark Streaming and reducing latency

 

With receiver based streaming, you can actually specify 
spark.streaming.blockInterval which is the interval at which the receiver will 
fetch data from the source. Default value is 200ms and hence if your batch 
duration is 1 second, it will produce 5 blocks of data. And yes, with 
sparkstreaming when your processing time goes beyond your batch duration and 
you are having a higher data consumption then you will overwhelm the receiver's 
memory and hence will throw up block not found exceptions. 




Thanks

Best Regards

 

On Sun, May 17, 2015 at 7:21 PM, dgoldenberg  wrote:

I keep hearing the argument that the way Discretized Streams work with Spark
Streaming is a lot more of a batch processing algorithm than true streaming.
For streaming, one would expect a new item, e.g. in a Kafka topic, to be
available to the streaming consumer immediately.

With the discretized streams, streaming is done with batch intervals i.e.
the consumer has to wait the interval to be able to get at the new items. If
one wants to reduce latency it seems the only way to do this would be by
reducing the batch interval window. However, that may lead to a great deal
of churn, with many requests going into Kafka out of the consumers,
potentially with no results whatsoever as there's nothing new in the topic
at the moment.

Is there a counter-argument to this reasoning? What are some of the general
approaches to reduce latency  folks might recommend? Or, perhaps there are
ways of dealing with this at the streaming API level?

If latency is of great concern, is it better to look into streaming from
something like Flume where data is pushed to consumers rather than pulled by
them? Are there techniques, in that case, to ensure the consumers don't get
overwhelmed with new data?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-and-reducing-latency-tp22922.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

 



RE: [SparkStreaming] Is it possible to delay the start of some DStream in the application?

2015-05-17 Thread Evo Eftimov
You can make ANY standard receiver sleep by implementing a custom Message 
Deserializer class with sleep method inside it. 

 

From: Akhil Das [mailto:ak...@sigmoidanalytics.com] 
Sent: Sunday, May 17, 2015 4:29 PM
To: Haopu Wang
Cc: user
Subject: Re: [SparkStreaming] Is it possible to delay the start of some DStream 
in the application?

 

Why not just trigger your batch job with that event?

 

If you really need streaming, then you can create a custom receiver and make 
the receiver sleep till the event has happened. That will obviously run your 
streaming pipelines without having any data to process.




Thanks

Best Regards

 

On Fri, May 15, 2015 at 4:39 AM, Haopu Wang  wrote:

In my application, I want to start a DStream computation only after an
special event has happened (for example, I want to start the receiver
only after the reference data has been properly initialized).

My question is: it looks like the DStream will be started right after
the StreaminContext has been started. Is it possible to delay the start
of specific DStream?

Thank you very much!

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

 



Re: number of executors

2015-05-17 Thread xiaohe lan
Sorry, them both are assigned task actually.

Aggregated Metrics by Executor
Executor IDAddressTask TimeTotal TasksFailed TasksSucceeded TasksInput Size
/ RecordsShuffle Write Size / RecordsShuffle Spill (Memory)Shuffle Spill
(Disk)1host1:61841.7 min505640.0 MB / 12318400382.3 MB / 121007701630.4 MB295.4
MB2host2:620721.7 min505640.0 MB / 12014510386.0 MB / 109269121646.6 MB304.8
MB

On Sun, May 17, 2015 at 11:50 PM, xiaohe lan  wrote:

> bash-4.1$ ps aux | grep SparkSubmit
> xilan 1704 13.2  1.2 5275520 380244 pts/0  Sl+  08:39   0:13
> /scratch/xilan/jdk1.8.0_45/bin/java -cp
> /scratch/xilan/spark/conf:/scratch/xilan/spark/lib/spark-assembly-1.3.1-hadoop2.4.0.jar:/scratch/xilan/spark/lib/datanucleus-core-3.2.10.jar:/scratch/xilan/spark/lib/datanucleus-api-jdo-3.2.6.jar:/scratch/xilan/spark/lib/datanucleus-rdbms-3.2.9.jar:/scratch/xilan/hadoop/etc/hadoop
> -Xms512m -Xmx512m org.apache.spark.deploy.SparkSubmit --master yarn
> target/scala-2.10/simple-project_2.10-1.0.jar --class scala.SimpleApp
> --num-executors 5 --executor-cores 4
> xilan 1949  0.0  0.0 103292   800 pts/1S+   08:40   0:00 grep
> --color SparkSubmit
>
>
> When look at the sparkui, I see the following:
> Aggregated Metrics by ExecutorExecutor IDAddressTask TimeTotal TasksFailed
> TasksSucceeded TasksShuffle Read Size / Records1host1:304836 s101127.1 MB
> / 28089782host2:49970 ms00063.4 MB / 1810945
>
> So executor 2 is not even assigned a task ? Maybe I have some problems in
> my setting, but I don't know what could be the possible settings I set
> wrong or have not set.
>
>
> Thanks,
> Xiaohe
>
> On Sun, May 17, 2015 at 11:16 PM, Akhil Das 
> wrote:
>
>> Did you try --executor-cores param? While you submit the job, do a ps aux
>> | grep spark-submit and see the exact command parameters.
>>
>> Thanks
>> Best Regards
>>
>> On Sat, May 16, 2015 at 12:31 PM, xiaohe lan 
>> wrote:
>>
>>> Hi,
>>>
>>> I have a 5 nodes yarn cluster, I used spark-submit to submit a simple
>>> app.
>>>
>>>  spark-submit --master yarn
>>> target/scala-2.10/simple-project_2.10-1.0.jar --class scala.SimpleApp
>>> --num-executors 5
>>>
>>> I have set the number of executor to 5, but from sparkui I could see
>>> only two executors and it ran very slow. What did I miss ?
>>>
>>> Thanks,
>>> Xiaohe
>>>
>>
>>
>


Re: number of executors

2015-05-17 Thread xiaohe lan
bash-4.1$ ps aux | grep SparkSubmit
xilan 1704 13.2  1.2 5275520 380244 pts/0  Sl+  08:39   0:13
/scratch/xilan/jdk1.8.0_45/bin/java -cp
/scratch/xilan/spark/conf:/scratch/xilan/spark/lib/spark-assembly-1.3.1-hadoop2.4.0.jar:/scratch/xilan/spark/lib/datanucleus-core-3.2.10.jar:/scratch/xilan/spark/lib/datanucleus-api-jdo-3.2.6.jar:/scratch/xilan/spark/lib/datanucleus-rdbms-3.2.9.jar:/scratch/xilan/hadoop/etc/hadoop
-Xms512m -Xmx512m org.apache.spark.deploy.SparkSubmit --master yarn
target/scala-2.10/simple-project_2.10-1.0.jar --class scala.SimpleApp
--num-executors 5 --executor-cores 4
xilan 1949  0.0  0.0 103292   800 pts/1S+   08:40   0:00 grep
--color SparkSubmit


When look at the sparkui, I see the following:
Aggregated Metrics by ExecutorExecutor IDAddressTask TimeTotal TasksFailed
TasksSucceeded TasksShuffle Read Size / Records1host1:304836 s101127.1 MB /
28089782host2:49970 ms00063.4 MB / 1810945

So executor 2 is not even assigned a task ? Maybe I have some problems in
my setting, but I don't know what could be the possible settings I set
wrong or have not set.


Thanks,
Xiaohe

On Sun, May 17, 2015 at 11:16 PM, Akhil Das 
wrote:

> Did you try --executor-cores param? While you submit the job, do a ps aux
> | grep spark-submit and see the exact command parameters.
>
> Thanks
> Best Regards
>
> On Sat, May 16, 2015 at 12:31 PM, xiaohe lan 
> wrote:
>
>> Hi,
>>
>> I have a 5 nodes yarn cluster, I used spark-submit to submit a simple app.
>>
>>  spark-submit --master yarn target/scala-2.10/simple-project_2.10-1.0.jar
>> --class scala.SimpleApp --num-executors 5
>>
>> I have set the number of executor to 5, but from sparkui I could see only
>> two executors and it ran very slow. What did I miss ?
>>
>> Thanks,
>> Xiaohe
>>
>
>


Effecient way to fetch all records on a particular node/partition in GraphX

2015-05-17 Thread mas
Hi All,

I have distributed my RDD into say 10 nodes. I want to fetch the data that
resides on a particular node say "node 5". How i can achieve this?
I have tried mapPartitionWithIndex function to filter the data of that
corresponding node, however it is pretty expensive. 
Any efficient way to do that ? 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Effecient-way-to-fetch-all-records-on-a-particular-node-partition-in-GraphX-tp22923.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: [SparkStreaming] Is it possible to delay the start of some DStream in the application?

2015-05-17 Thread Akhil Das
Why not just trigger your batch job with that event?

If you really need streaming, then you can create a custom receiver and
make the receiver sleep till the event has happened. That will obviously
run your streaming pipelines without having any data to process.

Thanks
Best Regards

On Fri, May 15, 2015 at 4:39 AM, Haopu Wang  wrote:

> In my application, I want to start a DStream computation only after an
> special event has happened (for example, I want to start the receiver
> only after the reference data has been properly initialized).
>
> My question is: it looks like the DStream will be started right after
> the StreaminContext has been started. Is it possible to delay the start
> of specific DStream?
>
> Thank you very much!
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: textFileStream Question

2015-05-17 Thread Akhil Das
With file timestamp, you can actually see the finding new files logic from
here


Thanks
Best Regards

On Fri, May 15, 2015 at 2:25 AM, Vadim Bichutskiy <
vadim.bichuts...@gmail.com> wrote:

> How does textFileStream work behind the scenes? How does Spark Streaming
> know what files are new and need to be processed? Is it based on time
> stamp, file name?
>
> Thanks,
> Vadim
> ᐧ
>


Re: Forbidded : Error Code: 403

2015-05-17 Thread Akhil Das
I think you can try this way also:

DataFrame df = 
sqlContext.load("s3n://ACCESS-KEY:SECRET-KEY@bucket-name/file.avro",
"com.databricks.spark.avro");


Thanks
Best Regards

On Sat, May 16, 2015 at 2:02 AM, Mohammad Tariq  wrote:

> Thanks for the suggestion Steve. I'll try that out.
>
> Read the long story last night while struggling with this :). I made sure
> that I don't have any '/' in my key.
>
> On Saturday, May 16, 2015, Steve Loughran  wrote:
>
>>
>> > On 15 May 2015, at 21:20, Mohammad Tariq  wrote:
>> >
>> > Thank you Ayan and Ted for the prompt response. It isn't working with
>> s3n either.
>> >
>> > And I am able to download the file. In fact I am able to read the same
>> file using s3 API without any issue.
>> >
>>
>>
>> sounds like an S3n config problem. Check your configurations - you can
>> test locally via the hdfs dfs command without even starting spark
>>
>>  Oh, and if there is a "/" in your secret key, you're going to to need to
>> generate new one. Long story
>>
>
>
> --
>
> [image: http://]
> Tariq, Mohammad
> about.me/mti
> [image: http://]
> 
>
>
>


Re: number of executors

2015-05-17 Thread Akhil Das
Did you try --executor-cores param? While you submit the job, do a ps aux |
grep spark-submit and see the exact command parameters.

Thanks
Best Regards

On Sat, May 16, 2015 at 12:31 PM, xiaohe lan  wrote:

> Hi,
>
> I have a 5 nodes yarn cluster, I used spark-submit to submit a simple app.
>
>  spark-submit --master yarn target/scala-2.10/simple-project_2.10-1.0.jar
> --class scala.SimpleApp --num-executors 5
>
> I have set the number of executor to 5, but from sparkui I could see only
> two executors and it ran very slow. What did I miss ?
>
> Thanks,
> Xiaohe
>


Re: Spark Streaming and reducing latency

2015-05-17 Thread Akhil Das
With receiver based streaming, you can actually
specify spark.streaming.blockInterval which is the interval at which the
receiver will fetch data from the source. Default value is 200ms and hence
if your batch duration is 1 second, it will produce 5 blocks of data. And
yes, with sparkstreaming when your processing time goes beyond your batch
duration and you are having a higher data consumption then you will
overwhelm the receiver's memory and hence will throw up block not found
exceptions.

Thanks
Best Regards

On Sun, May 17, 2015 at 7:21 PM, dgoldenberg 
wrote:

> I keep hearing the argument that the way Discretized Streams work with
> Spark
> Streaming is a lot more of a batch processing algorithm than true
> streaming.
> For streaming, one would expect a new item, e.g. in a Kafka topic, to be
> available to the streaming consumer immediately.
>
> With the discretized streams, streaming is done with batch intervals i.e.
> the consumer has to wait the interval to be able to get at the new items.
> If
> one wants to reduce latency it seems the only way to do this would be by
> reducing the batch interval window. However, that may lead to a great deal
> of churn, with many requests going into Kafka out of the consumers,
> potentially with no results whatsoever as there's nothing new in the topic
> at the moment.
>
> Is there a counter-argument to this reasoning? What are some of the general
> approaches to reduce latency  folks might recommend? Or, perhaps there are
> ways of dealing with this at the streaming API level?
>
> If latency is of great concern, is it better to look into streaming from
> something like Flume where data is pushed to consumers rather than pulled
> by
> them? Are there techniques, in that case, to ensure the consumers don't get
> overwhelmed with new data?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-and-reducing-latency-tp22922.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Spark Streaming and reducing latency

2015-05-17 Thread dgoldenberg
I keep hearing the argument that the way Discretized Streams work with Spark
Streaming is a lot more of a batch processing algorithm than true streaming. 
For streaming, one would expect a new item, e.g. in a Kafka topic, to be
available to the streaming consumer immediately.

With the discretized streams, streaming is done with batch intervals i.e.
the consumer has to wait the interval to be able to get at the new items. If
one wants to reduce latency it seems the only way to do this would be by
reducing the batch interval window. However, that may lead to a great deal
of churn, with many requests going into Kafka out of the consumers,
potentially with no results whatsoever as there's nothing new in the topic
at the moment.

Is there a counter-argument to this reasoning? What are some of the general
approaches to reduce latency  folks might recommend? Or, perhaps there are
ways of dealing with this at the streaming API level? 

If latency is of great concern, is it better to look into streaming from
something like Flume where data is pushed to consumers rather than pulled by
them? Are there techniques, in that case, to ensure the consumers don't get
overwhelmed with new data?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-and-reducing-latency-tp22922.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Big Data Day LA: FREE Big Data Conference in Los Angeles on June 27, 2015

2015-05-17 Thread Slim Baltagi
Please register for the 3rd annual full day ‘Big Data Day LA’ here: -
http://bigdatadayla.org
•   Location: Los Angeles
•   Date: June 27, 2015 
•   Completely FREE: Attendance, Food (Breakfast, Lunch & Coffee Breaks) and
Networking Reception
•   Vendor neutral
•   Great lineup of presentations, workshops, panels, and keynotes.
•   Over 45 talks organized in 5 tracks: Hadoop/Spark, Big Data, Business 
Use
Cases, NoSQL, Data Science
•   Led by over 40+ volunteers
•   800+ expected attendees
Register before it sells out! http://bigdatadayla.org

Thanks

Slim Baltagi



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Big-Data-Day-LA-FREE-Big-Data-Conference-in-Los-Angeles-on-June-27-2015-tp22921.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: println in spark-shell

2015-05-17 Thread Sean Owen
println does not execute in the driver since you are executing it on
elements of the RDD. It executes in an executor, which can happen to
execute in-process in local mode. In general you should not expect
this to print results in the driver.

On Sun, May 17, 2015 at 10:01 AM, xiaohe lan  wrote:
> Hi,
>
> When I start spark shell by passing yarn to master option, println does not
> print elements in RDD:
>
> bash-4.1$ spark-shell --master yarn
> 15/05/17 01:50:08 WARN NativeCodeLoader: Unable to load native-hadoop
> library for your platform... using builtin-java classes where applicable
> Welcome to
>     __
>  / __/__  ___ _/ /__
> _\ \/ _ \/ _ `/ __/  '_/
>/___/ .__/\_,_/_/ /_/\_\   version 1.3.1
>   /_/
>
> Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java
> 1.8.0_45)
> Type in expressions to have them evaluated.
> Type :help for more information.
> Spark context available as sc.
> SQL context available as sqlContext.
>
> scala> val lines = sc.parallelize(List("hello world", "hi"))
> lines: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at
> parallelize at :21
>
> scala> lines.first()
> res1: String = hello world
>
> scala> lines.foreach(println)
>
> scala>
>
> If I start spark shell in local mode, the elements are printed. What's the
> difference here ?
>
> Thanks,
> Xiaohe

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



println in spark-shell

2015-05-17 Thread xiaohe lan
Hi,

When I start spark shell by passing yarn to master option, println does not
print elements in RDD:

bash-4.1$ spark-shell --master yarn
15/05/17 01:50:08 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
Welcome to
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.3.1
  /_/

Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java
1.8.0_45)
Type in expressions to have them evaluated.
Type :help for more information.
Spark context available as sc.
SQL context available as sqlContext.

scala> val lines = sc.parallelize(List("hello world", "hi"))
lines: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at
parallelize at :21

scala> lines.first()
res1: String = hello world

scala> lines.foreach(println)

scala>

If I start spark shell in local mode, the elements are printed. What's the
difference here ?

Thanks,
Xiaohe


Re: Multiple DataFrames per Parquet file?

2015-05-17 Thread Davies Liu
You can union all the df together, then call repartition().

On Sun, May 10, 2015 at 8:34 AM, Peter Aberline
 wrote:
> Hi
>
> Thanks for the quick response.
>
> No I'm not using Streaming. Each DataFrame represents tabular data read from
> a CSV file. They have the same schema.
>
> There is also the option of appending each DF to the parquet file, but then
> I can't maintain them as separate DF when reading back in without filtering.
>
> I'll rethink maintaining each CSV file as a single DF.
>
> Thanks,
> Peter
>
>
> On 10 May 2015 at 15:51, ayan guha  wrote:
>>
>> How did you end up with thousands of df? Are you using streaming?  In that
>> case you can do foreachRDD and keep merging incoming rdds to single rdd and
>> then save it through your own checkpoint mechanism.
>>
>> If not, please share your use case.
>>
>> On 11 May 2015 00:38, "Peter Aberline"  wrote:
>>>
>>> Hi
>>>
>>> I have many thousands of small DataFrames that I would like to save to
>>> the one Parquet file to avoid the HDFS 'small files' problem. My
>>> understanding is that there is a 1:1 relationship between DataFrames and
>>> Parquet files if a single partition is used.
>>>
>>> Is it possible to have multiple DataFrames within the one Parquet File
>>> using PySpark?
>>> Or is the only way to achieve this to union the DataFrames into one?
>>>
>>> Thanks,
>>> Peter
>>>
>>>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: how to set random seed

2015-05-17 Thread Davies Liu
The python workers used for each stage may be different, this may not
work as expected.

You can create a Random object, set the seed, use it to do the shuffle().

r  = random.Random()
r.seek(my_seed)

def f(x):
   r.shuffle(l)
rdd.map(f)

On Thu, May 14, 2015 at 6:21 AM, Charles Hayden
 wrote:
> Thanks for the reply.
>
>
> I have not tried it out (I will today and report on my results) but I think
> what I need to do is to call mapPartitions and pass it a function that sets
> the seed.  I was planning to pass the seed value in the closure.
>
>
> Something like:
>
> my_seed = 42
> def f(iterator):
> random.seed(my_seed)
> yield my_seed
> rdd.mapPartitions(f)
>
>
> 
> From: ayan guha 
> Sent: Thursday, May 14, 2015 2:29 AM
>
> To: Charles Hayden
> Cc: user
> Subject: Re: how to set random seed
>
> Sorry for late reply.
>
> Here is what I was thinking
>
> import random as r
> def main():
> get SparkContext
> #Just for fun, lets assume seed is an id
> filename="bin.dat"
> seed = id(filename)
> #broadcast it
> br = sc.broadcast(seed)
>
> #set up dummy list
> lst = []
> for i in range(4):
> x=[]
> for j in range(4):
> x.append(j)
> lst.append(x)
> print lst
> base = sc.parallelize(lst)
> print base.map(randomize).collect()
>
> Randomize looks like
> def randomize(lst):
> local_seed = br.value
> r.seed(local_seed)
> r.shuffle(lst)
> return lst
>
>
> Let me know if this helps...
>
>
>
>
> base = sc.parallelize(lst)
> print base.map(randomize).collect()
>
> On Wed, May 13, 2015 at 11:41 PM, Charles Hayden 
> wrote:
>>
>> Can you elaborate? Broadcast will distribute the seed, which is only one
>> number.  But what construct do I use to "plant" the seed (call
>> random.seed()) once on each worker?
>>
>> 
>> From: ayan guha 
>> Sent: Tuesday, May 12, 2015 11:17 PM
>> To: Charles Hayden
>> Cc: user
>> Subject: Re: how to set random seed
>>
>>
>> Easiest way is to broadcast it.
>>
>> On 13 May 2015 10:40, "Charles Hayden"  wrote:
>>>
>>> In pySpark, I am writing a map with a lambda that calls random.shuffle.
>>> For testing, I want to be able to give it a seed, so that successive runs
>>> will produce the same shuffle.
>>> I am looking for a way to set this same random seed once on each worker.
>>> Is there any simple way to do it?
>>>
>>>
>
>
>
> --
> Best Regards,
> Ayan Guha

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: PySpark: slicing issue with dataframes

2015-05-17 Thread Davies Liu
Yes, it's a bug, please file a JIRA.

On Sun, May 3, 2015 at 10:36 AM, Ali Bajwa  wrote:
> Friendly reminder on this one. Just wanted to get a confirmation that this
> is not by design before I logged a JIRA
>
> Thanks!
> Ali
>
>
> On Tue, Apr 28, 2015 at 9:53 AM, Ali Bajwa  wrote:
>>
>> Hi experts,
>>
>> Trying to use the "slicing" functionality in strings as part of a Spark
>> program (PySpark) I get this error:
>>
>>  Code 
>>
>> import pandas as pd
>> from pyspark.sql import SQLContext
>> hc = SQLContext(sc)
>> A = pd.DataFrame({'Firstname': ['James', 'Ali', 'Daniel'], 'Lastname':
>> ['Jones', 'Bajwa', 'Day']})
>> a = hc.createDataFrame(A)
>> print A
>>
>> b = a.select(a.Firstname[:2])
>> print b.toPandas()
>> c = a.select(a.Lastname[2:])
>> print c.toPandas()
>>
>> Output:
>>
>>  Firstname Lastname
>> 0 JamesJones
>> 1   AliBajwa
>> 2Daniel  Day
>>   SUBSTR(Firstname, 0, 2)
>> 0  Ja
>> 1  Al
>> 2  Da
>>
>>
>> ---
>> Py4JError Traceback (most recent call
>> last)
>>  in ()
>>  10 b = a.select(a.Firstname[:2])
>>  11 print b.toPandas()
>> ---> 12 c = a.select(a.Lastname[2:])
>>  13 print c.toPandas()
>>
>> /home/jupyter/spark-1.3.1/python/pyspark/sql/dataframe.pyc in substr(self,
>> startPos, length)
>>1089 raise TypeError("Can not mix the type")
>>1090 if isinstance(startPos, (int, long)):
>> -> 1091 jc = self._jc.substr(startPos, length)
>>1092 elif isinstance(startPos, Column):
>>1093 jc = self._jc.substr(startPos._jc, length._jc)
>>
>>
>> /home/jupyter/spark-1.3.1/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py
>> in __call__(self, *args)
>> 536 answer = self.gateway_client.send_command(command)
>> 537 return_value = get_return_value(answer,
>> self.gateway_client,
>> --> 538 self.target_id, self.name)
>> 539
>> 540 for temp_arg in temp_args:
>>
>> /home/jupyter/spark-1.3.1/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py
>> in get_return_value(answer, gateway_client, target_id, name)
>> 302 raise Py4JError(
>> 303 'An error occurred while calling {0}{1}{2}.
>> Trace:\n{3}\n'.
>> --> 304 format(target_id, '.', name, value))
>> 305 else:
>> 306 raise Py4JError(
>>
>> Py4JError: An error occurred while calling o1887.substr. Trace:
>> py4j.Py4JException: Method substr([class java.lang.Integer, class
>> java.lang.Long]) does not exist
>> at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:333)
>> at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:342)
>> at py4j.Gateway.invoke(Gateway.java:252)
>> at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
>> at py4j.commands.CallCommand.execute(CallCommand.java:79)
>> at py4j.GatewayConnection.run(GatewayConnection.java:207)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> Looks like X[:2] works but X[2:] fails with the error above
>> Anyone else have this issue?
>>
>> Clearly I can use substr() to workaround this, but if this is a confirmed
>> bug we should open a JIRA.
>>
>> Thanks,
>> Ali
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org