Re: spark 1.5, ML Pipeline Decision Tree Dataframe Problem

2015-09-18 Thread Feynman Liang
What is the type of unlabeledTest?

SQL should be using the VectorUDT we've defined for Vectors

so
you should be able to just "import sqlContext.implicits._" and then call
"rdd.toDf()" on your RDD to convert it into a dataframe.

On Fri, Sep 18, 2015 at 7:32 AM, Yasemin Kaya  wrote:

> Hi,
>
> I am using *spark 1.5, ML Pipeline Decision Tree
> *
> to get tree's probability. But I have to convert my data to Dataframe type.
> While creating model there is no problem but when I am using model on my
> data there is a problem about converting to data frame type. My data type
> is *JavaPairRDD* , when I am creating dataframe
>
> DataFrame production = sqlContext.createDataFrame(
> unlabeledTest.values(), Vector.class);
>
> *Error says me: *
> Exception in thread "main" java.lang.ClassCastException:
> org.apache.spark.mllib.linalg.VectorUDT cannot be cast to
> org.apache.spark.sql.types.StructType
>
> I know if I give LabeledPoint type, there will be no problem. But the data
> have no label, I wanna predict the label because of this reason I use model
> on it.
>
> Is there way to handle my problem?
> Thanks.
>
>
> Best,
> yasemin
> --
> hiç ender hiç
>


Re: Caching intermediate results in Spark ML pipeline?

2015-09-15 Thread Feynman Liang
If you're doing hyperparameter grid search, consider using
ml.tuning.CrossValidator which does cache the dataset
<https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala#L85>
.

Otherwise, perhaps you can elaborate more on your particular use case for
caching intermediate results and if the current API doesn't support it we
can create a JIRA for it.

On Tue, Sep 15, 2015 at 10:26 PM, Jingchu Liu <liujing...@gmail.com> wrote:

> Yeah I understand on the low-level we should do as you said.
>
> But since ML pipeline is a high-level API, it is pretty natural to expect
> the ability to recognize overlapping parameters between successive runs.
> (Actually, this happen A LOT when we have lots of hyper-params to search
> for)
>
> I can also imagine the implementation by appending parameter information
> to the cached results. Let's say if we implemented an "equal" method for
> param1. By comparing param1 with the previous run, the program will know
> data1 is reusable. And time used for generating data1 can be saved.
>
> Best,
> Lewis
>
> 2015-09-15 23:05 GMT+08:00 Feynman Liang <fli...@databricks.com>:
>
>> Nope, and that's intentional. There is no guarantee that rawData did not
>> change between intermediate calls to searchRun, so reusing a cached data1
>> would be incorrect.
>>
>> If you want data1 to be cached between multiple runs, you have a few
>> options:
>> * cache it first and pass it in as an argument to searchRun
>> * use a creational pattern like singleton to ensure only one instantiation
>>
>> On Tue, Sep 15, 2015 at 12:49 AM, Jingchu Liu <liujing...@gmail.com>
>> wrote:
>>
>>> Hey Feynman,
>>>
>>> I doubt DF persistence will work in my case. Let's use the following
>>> code:
>>> ==
>>> def searchRun( params = [param1, param2] )
>>>   data1 = hashing1.transform(rawData, param1)
>>>   data1.cache()
>>>   data2 = hashing2.transform(data1, param2)
>>>   data2.someAction()
>>> ==
>>> Say if we run "searchRun()" for 2 times with the same "param1" but
>>> different "param2". Will spark recognize that the two local variables
>>> "data1" in consecutive runs has the same content?
>>>
>>>
>>> Best,
>>> Lewis
>>>
>>> 2015-09-15 13:58 GMT+08:00 Feynman Liang <fli...@databricks.com>:
>>>
>>>> You can persist the transformed Dataframes, for example
>>>>
>>>> val data : DF = ...
>>>> val hashedData = hashingTF.transform(data)
>>>> hashedData.cache() // to cache DataFrame in memory
>>>>
>>>> Future usage of hashedData read from an in-memory cache now.
>>>>
>>>> You can also persist to disk, eg:
>>>>
>>>> hashedData.write.parquet(FilePath) // to write DataFrame in Parquet
>>>> format to disk
>>>> ...
>>>> val savedHashedData = sqlContext.read.parquet(FilePath)
>>>>
>>>> Future uses of hash
>>>>
>>>> Like my earlier response, this will still require you call each
>>>> PipelineStage's `transform` method (i.e. to NOT use the overall
>>>> Pipeline.setStages API)
>>>>
>>>> On Mon, Sep 14, 2015 at 10:45 PM, Jingchu Liu <liujing...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hey Feynman,
>>>>>
>>>>> Thanks for your response, but I'm afraid "model save/load" is not
>>>>> exactly the feature I'm looking for.
>>>>>
>>>>> What I need to cache and reuse are the intermediate outputs of
>>>>> transformations, not transformer themselves. Do you know any related dev.
>>>>> activities or plans?
>>>>>
>>>>> Best,
>>>>> Lewis
>>>>>
>>>>> 2015-09-15 13:03 GMT+08:00 Feynman Liang <fli...@databricks.com>:
>>>>>
>>>>>> Lewis,
>>>>>>
>>>>>> Many pipeline stages implement save/load methods, which can be used
>>>>>> if you instantiate and call the underlying pipeline stages `transform`
>>>>>> methods individually (instead of using the Pipeline.setStages API). See
>>>>>> associated JIRAs <https://issues.apache.org/jira/browse/SPARK-4587>.
>>>>>>
>>>>>> 

Re: Caching intermediate results in Spark ML pipeline?

2015-09-15 Thread Feynman Liang
Nope, and that's intentional. There is no guarantee that rawData did not
change between intermediate calls to searchRun, so reusing a cached data1
would be incorrect.

If you want data1 to be cached between multiple runs, you have a few
options:
* cache it first and pass it in as an argument to searchRun
* use a creational pattern like singleton to ensure only one instantiation

On Tue, Sep 15, 2015 at 12:49 AM, Jingchu Liu <liujing...@gmail.com> wrote:

> Hey Feynman,
>
> I doubt DF persistence will work in my case. Let's use the following code:
> ==
> def searchRun( params = [param1, param2] )
>   data1 = hashing1.transform(rawData, param1)
>   data1.cache()
>   data2 = hashing2.transform(data1, param2)
>   data2.someAction()
> ==
> Say if we run "searchRun()" for 2 times with the same "param1" but
> different "param2". Will spark recognize that the two local variables
> "data1" in consecutive runs has the same content?
>
>
> Best,
> Lewis
>
> 2015-09-15 13:58 GMT+08:00 Feynman Liang <fli...@databricks.com>:
>
>> You can persist the transformed Dataframes, for example
>>
>> val data : DF = ...
>> val hashedData = hashingTF.transform(data)
>> hashedData.cache() // to cache DataFrame in memory
>>
>> Future usage of hashedData read from an in-memory cache now.
>>
>> You can also persist to disk, eg:
>>
>> hashedData.write.parquet(FilePath) // to write DataFrame in Parquet
>> format to disk
>> ...
>> val savedHashedData = sqlContext.read.parquet(FilePath)
>>
>> Future uses of hash
>>
>> Like my earlier response, this will still require you call each
>> PipelineStage's `transform` method (i.e. to NOT use the overall
>> Pipeline.setStages API)
>>
>> On Mon, Sep 14, 2015 at 10:45 PM, Jingchu Liu <liujing...@gmail.com>
>> wrote:
>>
>>> Hey Feynman,
>>>
>>> Thanks for your response, but I'm afraid "model save/load" is not
>>> exactly the feature I'm looking for.
>>>
>>> What I need to cache and reuse are the intermediate outputs of
>>> transformations, not transformer themselves. Do you know any related dev.
>>> activities or plans?
>>>
>>> Best,
>>> Lewis
>>>
>>> 2015-09-15 13:03 GMT+08:00 Feynman Liang <fli...@databricks.com>:
>>>
>>>> Lewis,
>>>>
>>>> Many pipeline stages implement save/load methods, which can be used if
>>>> you instantiate and call the underlying pipeline stages `transform` methods
>>>> individually (instead of using the Pipeline.setStages API). See associated
>>>> JIRAs <https://issues.apache.org/jira/browse/SPARK-4587>.
>>>>
>>>> Pipeline persistence is on the 1.6 roadmap, JIRA here
>>>> <https://issues.apache.org/jira/browse/SPARK-6725>.
>>>>
>>>> Feynman
>>>>
>>>> On Mon, Sep 14, 2015 at 9:20 PM, Jingchu Liu <liujing...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>> I have a question regarding the ability of ML pipeline to cache
>>>>> intermediate results. I've posted this question on stackoverflow
>>>>> <http://stackoverflow.com/questions/32561687/caching-intermediate-results-in-spark-ml-pipeline>
>>>>> but got no answer, hope someone here can help me out.
>>>>>
>>>>> ===
>>>>> Lately I'm planning to migrate my standalone python ML code to spark.
>>>>> The ML pipeline in spark.ml turns out quite handy, with streamlined
>>>>> API for chaining up algorithm stages and hyper-parameter grid search.
>>>>>
>>>>> Still, I found its support for one important feature obscure in
>>>>> existing documents: caching of intermediate results. The importance of 
>>>>> this
>>>>> feature arise when the pipeline involves computation intensive stages.
>>>>>
>>>>> For example, in my case I use a huge sparse matrix to perform multiple
>>>>> moving averages on time series data in order to form input features. The
>>>>> structure of the matrix is determined by some hyper-parameter. This step
>>>>> turns out to be a bottleneck for the entire pipeline because I have to
>>>>> construct the matrix in runtime.
>>>>>
>>>>> During parameter search, I usually have other parameters to examine in
>>>>> addition to this "structure parameter". So if I can reuse the huge matrix
>>>>> when the "structure parameter" is unchanged, I can save tons of time. For
>>>>> this reason, I intentionally formed my code to cache and reuse these
>>>>> intermediate results.
>>>>>
>>>>> So my question is: can Spark's ML pipeline handle intermediate caching
>>>>> automatically? Or do I have to manually form code to do so? If so, is 
>>>>> there
>>>>> any best practice to learn from?
>>>>>
>>>>> P.S. I have looked into the official document and some other material,
>>>>> but none of them seems to discuss this topic.
>>>>>
>>>>>
>>>>>
>>>>> Best,
>>>>> Lewis
>>>>>
>>>>
>>>>
>>>
>>
>


Re: How to speed up MLlib LDA?

2015-09-15 Thread Feynman Liang
Hi Marko,

I haven't looked into your case in much detail but one immediate thought
is: have you tried the OnlineLDAOptimizer? It's implementation and
resulting LDA model (LocalLDAModel) is quite different (doesn't depend on
GraphX, assumes the model fits on a single machine) so you may see
performance differences.

Feynman

On Tue, Sep 15, 2015 at 6:37 AM, Marko Asplund 
wrote:

>
> While doing some more testing I noticed that loading the persisted model
> from disk (~2 minutes) as well as querying LDA model topic distributions
> (~4 seconds for one document) are quite slow operations.
>
> Our application is querying LDA model topic distribution (for one doc at a
> time) as part of end-user operation execution flow, so a ~4 second
> execution time is very problematic. Am I using the MLlib LDA API correctly
> or is this just reflecting the current performance characteristics of the
> LDA implementation? My code can be found here:
>
>
> https://github.com/marko-asplund/tech-protos/blob/master/mllib-lda/src/main/scala/fi/markoa/proto/mllib/LDADemo.scala#L56-L57
>
> For what kinds of use cases are people currently using the LDA
> implementation?
>
>
> marko
>


Re: Caching intermediate results in Spark ML pipeline?

2015-09-14 Thread Feynman Liang
Lewis,

Many pipeline stages implement save/load methods, which can be used if you
instantiate and call the underlying pipeline stages `transform` methods
individually (instead of using the Pipeline.setStages API). See associated
JIRAs .

Pipeline persistence is on the 1.6 roadmap, JIRA here
.

Feynman

On Mon, Sep 14, 2015 at 9:20 PM, Jingchu Liu  wrote:

> Hi all,
>
> I have a question regarding the ability of ML pipeline to cache
> intermediate results. I've posted this question on stackoverflow
> 
> but got no answer, hope someone here can help me out.
>
> ===
> Lately I'm planning to migrate my standalone python ML code to spark. The
> ML pipeline in spark.ml turns out quite handy, with streamlined API for
> chaining up algorithm stages and hyper-parameter grid search.
>
> Still, I found its support for one important feature obscure in existing
> documents: caching of intermediate results. The importance of this feature
> arise when the pipeline involves computation intensive stages.
>
> For example, in my case I use a huge sparse matrix to perform multiple
> moving averages on time series data in order to form input features. The
> structure of the matrix is determined by some hyper-parameter. This step
> turns out to be a bottleneck for the entire pipeline because I have to
> construct the matrix in runtime.
>
> During parameter search, I usually have other parameters to examine in
> addition to this "structure parameter". So if I can reuse the huge matrix
> when the "structure parameter" is unchanged, I can save tons of time. For
> this reason, I intentionally formed my code to cache and reuse these
> intermediate results.
>
> So my question is: can Spark's ML pipeline handle intermediate caching
> automatically? Or do I have to manually form code to do so? If so, is there
> any best practice to learn from?
>
> P.S. I have looked into the official document and some other material, but
> none of them seems to discuss this topic.
>
>
>
> Best,
> Lewis
>


Re: Caching intermediate results in Spark ML pipeline?

2015-09-14 Thread Feynman Liang
You can persist the transformed Dataframes, for example

val data : DF = ...
val hashedData = hashingTF.transform(data)
hashedData.cache() // to cache DataFrame in memory

Future usage of hashedData read from an in-memory cache now.

You can also persist to disk, eg:

hashedData.write.parquet(FilePath) // to write DataFrame in Parquet format
to disk
...
val savedHashedData = sqlContext.read.parquet(FilePath)

Future uses of hash

Like my earlier response, this will still require you call each
PipelineStage's `transform` method (i.e. to NOT use the overall
Pipeline.setStages API)

On Mon, Sep 14, 2015 at 10:45 PM, Jingchu Liu <liujing...@gmail.com> wrote:

> Hey Feynman,
>
> Thanks for your response, but I'm afraid "model save/load" is not exactly
> the feature I'm looking for.
>
> What I need to cache and reuse are the intermediate outputs of
> transformations, not transformer themselves. Do you know any related dev.
> activities or plans?
>
> Best,
> Lewis
>
> 2015-09-15 13:03 GMT+08:00 Feynman Liang <fli...@databricks.com>:
>
>> Lewis,
>>
>> Many pipeline stages implement save/load methods, which can be used if
>> you instantiate and call the underlying pipeline stages `transform` methods
>> individually (instead of using the Pipeline.setStages API). See associated
>> JIRAs <https://issues.apache.org/jira/browse/SPARK-4587>.
>>
>> Pipeline persistence is on the 1.6 roadmap, JIRA here
>> <https://issues.apache.org/jira/browse/SPARK-6725>.
>>
>> Feynman
>>
>> On Mon, Sep 14, 2015 at 9:20 PM, Jingchu Liu <liujing...@gmail.com>
>> wrote:
>>
>>> Hi all,
>>>
>>> I have a question regarding the ability of ML pipeline to cache
>>> intermediate results. I've posted this question on stackoverflow
>>> <http://stackoverflow.com/questions/32561687/caching-intermediate-results-in-spark-ml-pipeline>
>>> but got no answer, hope someone here can help me out.
>>>
>>> ===
>>> Lately I'm planning to migrate my standalone python ML code to spark.
>>> The ML pipeline in spark.ml turns out quite handy, with streamlined API
>>> for chaining up algorithm stages and hyper-parameter grid search.
>>>
>>> Still, I found its support for one important feature obscure in existing
>>> documents: caching of intermediate results. The importance of this feature
>>> arise when the pipeline involves computation intensive stages.
>>>
>>> For example, in my case I use a huge sparse matrix to perform multiple
>>> moving averages on time series data in order to form input features. The
>>> structure of the matrix is determined by some hyper-parameter. This step
>>> turns out to be a bottleneck for the entire pipeline because I have to
>>> construct the matrix in runtime.
>>>
>>> During parameter search, I usually have other parameters to examine in
>>> addition to this "structure parameter". So if I can reuse the huge matrix
>>> when the "structure parameter" is unchanged, I can save tons of time. For
>>> this reason, I intentionally formed my code to cache and reuse these
>>> intermediate results.
>>>
>>> So my question is: can Spark's ML pipeline handle intermediate caching
>>> automatically? Or do I have to manually form code to do so? If so, is there
>>> any best practice to learn from?
>>>
>>> P.S. I have looked into the official document and some other material,
>>> but none of them seems to discuss this topic.
>>>
>>>
>>>
>>> Best,
>>> Lewis
>>>
>>
>>
>


Re: Training the MultilayerPerceptronClassifier

2015-09-13 Thread Feynman Liang
AFAIK no, we have a TODO item
<https://github.com/apache/spark/blob/6add4eddb39e7748a87da3e921ea3c7881d30a82/mllib/src/test/scala/org/apache/spark/ml/ann/ANNSuite.scala#L28>
to implement more rigorous correctness tests (e.g. referenced against
Weka). If you're interested, go ahead and comment the JIra
<https://issues.apache.org/jira/browse/SPARK-10583>to let others know
you're working on it.

On Sat, Sep 12, 2015 at 4:58 AM, Rory Waite <rwa...@sdl.com> wrote:

> Thanks Feynman, that is useful.
>
> I am interested in comparing the Spark MLP with Caffe. If I understand it
> correctly the changes to the Spark MLP API now restricts the functionality
> such that
>
> -Spark restricts the top layer to be a softmax
> -Can only use LBFGS to train the network
>
> I think this benchmark originally used a sigmoid top layer and SGD to
> optimise the network for spark. So the Caffe config used in the benchmark
> and the Spark setup are now not equivalent.
>
> Also this benchmark is designed for speed testing. I just want to do a
> quick sanity test and make sure that Caffe and Spark yield similar
> accuracies for MNIST before I try to test Spark on our own task. I am
> possibly reproducing existing efforts. Is there an example of this kind of
> sanity test I could reproduce?
>
>
>   <http://www.sdl.com/>
> www.sdl.com
>
>
> SDL PLC confidential, all rights reserved. If you are not the intended
> recipient of this mail SDL requests and requires that you delete it without
> acting upon or copying any of its contents, and we further request that you
> advise us.
>
> SDL PLC is a public limited company registered in England and Wales.
> Registered number: 02675207.
> Registered address: Globe House, Clivemont Road, Maidenhead, Berkshire SL6
> 7DY, UK.
> --
> *From:* Feynman Liang [fli...@databricks.com]
> *Sent:* 11 September 2015 20:34
> *To:* Rory Waite
> *Cc:* user@spark.apache.org
> *Subject:* Re: Training the MultilayerPerceptronClassifier
>
> Rory,
>
> I just sent a PR (https://github.com/avulanov/ann-benchmark/pull/1) to
> bring that benchmark up to date. Hope it helps.
>
> On Fri, Sep 11, 2015 at 6:39 AM, Rory Waite <rwa...@sdl.com> wrote:
>
>> Hi,
>>
>> I’ve been trying to train the new MultilayerPerceptronClassifier in spark
>> 1.5 for the MNIST digit recognition task. I’m trying to reproduce the work
>> here:
>>
>> https://github.com/avulanov/ann-benchmark
>>
>> The API has changed since this work, so I’m not sure that I’m setting up
>> the task correctly.
>>
>> After I've trained the classifier, it classifies everything as a 1. It
>> even does this for the training set. I am doing something wrong with the
>> setup? I’m not looking for state of the art performance, just something
>> that looks reasonable. This experiment is meant to be a quick sanity test.
>>
>> Here is the job:
>>
>> import org.apache.log4j._
>> //Logger.getRootLogger.setLevel(Level.OFF)
>> import org.apache.spark.mllib.linalg.Vectors
>> import org.apache.spark.mllib.regression.LabeledPoint
>> import org.apache.spark.ml.classification.MultilayerPerceptronClassifier
>> import org.apache.spark.ml.Pipeline
>> import org.apache.spark.ml.PipelineStage
>> import org.apache.spark.mllib.util.MLUtils
>> import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
>> import org.apache.spark.SparkContext
>> import org.apache.spark.SparkContext._
>> import org.apache.spark.SparkConf
>> import org.apache.spark.sql.SQLContext
>> import java.io.FileOutputStream
>> import java.io.ObjectOutputStream
>>
>> object MNIST {
>>  def main(args: Array[String]) {
>>val conf = new SparkConf().setAppName("MNIST")
>>conf.set("spark.driver.extraJavaOptions", "-XX:MaxPermSize=512M")
>>val sc = new SparkContext(conf)
>>val batchSize = 100
>>val numIterations = 5
>>val mlp = new MultilayerPerceptronClassifier
>>mlp.setLayers(Array[Int](780, 2500, 2000, 1500, 1000, 500, 10))
>>mlp.setMaxIter(numIterations)
>>mlp.setBlockSize(batchSize)
>>val train = MLUtils.loadLibSVMFile(sc,
>> "file:///misc/home/rwaite/mt-work/ann-benchmark/mnist.scale")
>>train.repartition(200)
>>val sqlContext = new SQLContext(sc)
>>import sqlContext.implicits._
>>val df = train.toDF
>>val model = mlp.fit(df)
>>val trainPredictions = model.transform(df)
>>trainPredictions.show(100)
>>val test = MLUtils.loadLibSVMFile(sc,
>> "file:///misc/home

Re: Model summary for linear and logistic regression.

2015-09-11 Thread Feynman Liang
Sorry! The documentation is not the greatest thing in the world, but these
features are documented here


On Fri, Sep 11, 2015 at 6:25 AM, Sebastian Kuepers <
sebastian.kuep...@publicispixelpark.de> wrote:

> Hey,
>
>
> the 1.5.0 release note say, that there are now model summaries for
> logistic regressions available.
>
> But I can't find them in the current documentary.
>
> ​
>
> Any help very much appreciated!
>
> Thanks
>
>
> Sebastian
>
>
>
>
> 
> Disclaimer The information in this email and any attachments may contain
> proprietary and confidential information that is intended for the
> addressee(s) only. If you are not the intended recipient, you are hereby
> notified that any disclosure, copying, distribution, retention or use of
> the contents of this information is prohibited. When addressed to our
> clients or vendors, any information contained in this e-mail or any
> attachments is subject to the terms and conditions in any governing
> contract. If you have received this e-mail in error, please immediately
> contact the sender and delete the e-mail.
>


Re: Training the MultilayerPerceptronClassifier

2015-09-11 Thread Feynman Liang
Rory,

I just sent a PR (https://github.com/avulanov/ann-benchmark/pull/1) to
bring that benchmark up to date. Hope it helps.

On Fri, Sep 11, 2015 at 6:39 AM, Rory Waite  wrote:

> Hi,
>
> I’ve been trying to train the new MultilayerPerceptronClassifier in spark
> 1.5 for the MNIST digit recognition task. I’m trying to reproduce the work
> here:
>
> https://github.com/avulanov/ann-benchmark
>
> The API has changed since this work, so I’m not sure that I’m setting up
> the task correctly.
>
> After I've trained the classifier, it classifies everything as a 1. It
> even does this for the training set. I am doing something wrong with the
> setup? I’m not looking for state of the art performance, just something
> that looks reasonable. This experiment is meant to be a quick sanity test.
>
> Here is the job:
>
> import org.apache.log4j._
> //Logger.getRootLogger.setLevel(Level.OFF)
> import org.apache.spark.mllib.linalg.Vectors
> import org.apache.spark.mllib.regression.LabeledPoint
> import org.apache.spark.ml.classification.MultilayerPerceptronClassifier
> import org.apache.spark.ml.Pipeline
> import org.apache.spark.ml.PipelineStage
> import org.apache.spark.mllib.util.MLUtils
> import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
> import org.apache.spark.SparkContext
> import org.apache.spark.SparkContext._
> import org.apache.spark.SparkConf
> import org.apache.spark.sql.SQLContext
> import java.io.FileOutputStream
> import java.io.ObjectOutputStream
>
> object MNIST {
>  def main(args: Array[String]) {
>val conf = new SparkConf().setAppName("MNIST")
>conf.set("spark.driver.extraJavaOptions", "-XX:MaxPermSize=512M")
>val sc = new SparkContext(conf)
>val batchSize = 100
>val numIterations = 5
>val mlp = new MultilayerPerceptronClassifier
>mlp.setLayers(Array[Int](780, 2500, 2000, 1500, 1000, 500, 10))
>mlp.setMaxIter(numIterations)
>mlp.setBlockSize(batchSize)
>val train = MLUtils.loadLibSVMFile(sc,
> "file:///misc/home/rwaite/mt-work/ann-benchmark/mnist.scale")
>train.repartition(200)
>val sqlContext = new SQLContext(sc)
>import sqlContext.implicits._
>val df = train.toDF
>val model = mlp.fit(df)
>val trainPredictions = model.transform(df)
>trainPredictions.show(100)
>val test = MLUtils.loadLibSVMFile(sc,
> "file:///misc/home/rwaite/mt-work/ann-benchmark/mnist.scale.t", 780).toDF
>val result = model.transform(test)
>result.show(100)
>val predictionAndLabels = result.select("prediction", "label")
>val evaluator = new MulticlassClassificationEvaluator()
>  .setMetricName("precision")
>println("Precision:" + evaluator.evaluate(predictionAndLabels))
>val fos = new
> FileOutputStream("/home/rwaite/mt-work/ann-benchmark/spark_out/spark_model.obj");
>val oos = new ObjectOutputStream(fos);
>oos.writeObject(model);
>oos.close
>  }
> }
>
>
> And here is the output:
>
> +-++--+
> |label|features|prediction|
> +-++--+
> |  5.0|(780,[152,153,154...|   1.0|
> |  0.0|(780,[127,128,129...|   1.0|
> |  4.0|(780,[160,161,162...|   1.0|
> |  1.0|(780,[158,159,160...|   1.0|
> |  9.0|(780,[208,209,210...|   1.0|
> |  2.0|(780,[155,156,157...|   1.0|
> |  1.0|(780,[124,125,126...|   1.0|
> |  3.0|(780,[151,152,153...|   1.0|
> |  1.0|(780,[152,153,154...|   1.0|
> |  4.0|(780,[134,135,161...|   1.0|
> |  3.0|(780,[123,124,125...|   1.0|
> |  5.0|(780,[216,217,218...|   1.0|
> |  3.0|(780,[143,144,145...|   1.0|
> |  6.0|(780,[72,73,74,99...|   1.0|
> |  1.0|(780,[151,152,153...|   1.0|
> |  7.0|(780,[211,212,213...|   1.0|
> |  2.0|(780,[151,152,153...|   1.0|
> |  8.0|(780,[159,160,161...|   1.0|
> |  6.0|(780,[100,101,102...|   1.0|
> |  9.0|(780,[209,210,211...|   1.0|
> |  4.0|(780,[129,130,131...|   1.0|
> |  0.0|(780,[129,130,131...|   1.0|
> |  9.0|(780,[183,184,185...|   1.0|
> |  1.0|(780,[158,159,160...|   1.0|
> |  1.0|(780,[99,100,101,...|   1.0|
> |  2.0|(780,[124,125,126...|   1.0|
> |  4.0|(780,[185,186,187...|   1.0|
> |  3.0|(780,[150,151,152...|   1.0|
> |  2.0|(780,[145,146,147...|   1.0|
> |  7.0|(780,[240,241,242...|   1.0|
> |  3.0|(780,[201,202,203...|   1.0|
> |  8.0|(780,[153,154,155...|   1.0|
> |  6.0|(780,[71,72,73,74...|   1.0|
> |  9.0|(780,[210,211,212...|   1.0|
> |  0.0|(780,[154,155,156...|   1.0|
> |  5.0|(780,[188,189,190...|   1.0|
> |  6.0|(780,[98,99,100,1...|   1.0|
> |  0.0|(780,[127,128,129...|   1.0|
> |  7.0|(780,[201,202,203...|   1.0|
> |  6.0|(780,[125,126,127...|   1.0|
> |  1.0|(780,[154,155,156...|   1.0|
> |  8.0|(780,[131,132,133...|   1.0|
> |  7.0|(780,[209,210,211...|   1.0|
> |  9.0|(780,[181,182,183...|   1.0|
> |  3.0|(780,[174,175,176...|   1.0|
> |  

Re: Realtime Data Visualization Tool for Spark

2015-09-11 Thread Feynman Liang
Spark notebook does something similar, take a look at their line chart code


On Fri, Sep 11, 2015 at 8:56 AM, Shashi Vishwakarma <
shashi.vish...@gmail.com> wrote:

> Hi
>
> I have got streaming data which needs to be processed and send for
> visualization.  I am planning to use spark streaming for this but little
> bit confused in choosing visualization tool. I read somewhere that D3.js
> can be used but i wanted know which is best tool for visualization while
> dealing with streaming application.(something that can be easily integrated)
>
> If someone has any link which can tell about D3.js(or any other
> visualization tool) and Spark streaming application integration  then
> please share . That would be great help.
>
>
> Thanks and Regards
> Shashi
>
>


Re: Spark ANN

2015-09-09 Thread Feynman Liang
My 2 cents:

* There is frequency domain processing available already (e.g. spark.ml DCT
transformer) but no FFT transformer yet because complex numbers are not
currently a Spark SQL datatype
* We shouldn't assume signals are even, so we need complex numbers to
implement the FFT
* I have not closely studied the relative performance tradeoffs, so please
do let me know if there's a significant difference in practice



On Tue, Sep 8, 2015 at 5:46 PM, Ulanov, Alexander <alexander.ula...@hpe.com>
wrote:

> That is an option too. Implementing convolutions with FFTs should be
> considered as well http://arxiv.org/pdf/1312.5851.pdf.
>
>
>
> *From:* Feynman Liang [mailto:fli...@databricks.com]
> *Sent:* Tuesday, September 08, 2015 12:07 PM
> *To:* Ulanov, Alexander
> *Cc:* Ruslan Dautkhanov; Nick Pentreath; user; na...@yandex.ru
> *Subject:* Re: Spark ANN
>
>
>
> Just wondering, why do we need tensors? Is the implementation of convnets
> using im2col (see here <http://cs231n.github.io/convolutional-networks/>)
> insufficient?
>
>
>
> On Tue, Sep 8, 2015 at 11:55 AM, Ulanov, Alexander <
> alexander.ula...@hpe.com> wrote:
>
> Ruslan, thanks for including me in the discussion!
>
>
>
> Dropout and other features such as Autoencoder were implemented, but not
> merged yet in order to have room for improving the internal Layer API. For
> example, there is an ongoing work with convolutional layer that
> consumes/outputs 2D arrays. We’ll probably need to change the Layer’s
> input/output type to tensors. This will influence dropout which will need
> some refactoring to handle tensors too. Also, all new components should
> have ML pipeline public interface. There is an umbrella issue for deep
> learning in Spark https://issues.apache.org/jira/browse/SPARK-5575 which
> includes various features of Autoencoder, in particular
> https://issues.apache.org/jira/browse/SPARK-10408. You are very welcome
> to join and contribute since there is a lot of work to be done.
>
>
>
> Best regards, Alexander
>
> *From:* Ruslan Dautkhanov [mailto:dautkha...@gmail.com]
> *Sent:* Monday, September 07, 2015 10:09 PM
> *To:* Feynman Liang
> *Cc:* Nick Pentreath; user; na...@yandex.ru
> *Subject:* Re: Spark ANN
>
>
>
> Found a dropout commit from avulanov:
>
>
> https://github.com/avulanov/spark/commit/3f25e26d10ef8617e46e35953fe0ad1a178be69d
>
>
>
> It probably hasn't made its way to MLLib (yet?).
>
>
>
>
>
> --
> Ruslan Dautkhanov
>
>
>
> On Mon, Sep 7, 2015 at 8:34 PM, Feynman Liang <fli...@databricks.com>
> wrote:
>
> Unfortunately, not yet... Deep learning support (autoencoders, RBMs) is on
> the roadmap for 1.6 <https://issues.apache.org/jira/browse/SPARK-10324>
> though, and there is a spark package
> <http://spark-packages.org/package/rakeshchalasani/MLlib-dropout> for
> dropout regularized logistic regression.
>
>
>
>
>
> On Mon, Sep 7, 2015 at 3:15 PM, Ruslan Dautkhanov <dautkha...@gmail.com>
> wrote:
>
> Thanks!
>
>
>
> It does not look Spark ANN yet supports dropout/dropconnect or any other
> techniques that help avoiding overfitting?
>
> http://www.cs.toronto.edu/~rsalakhu/papers/srivastava14a.pdf
>
> https://cs.nyu.edu/~wanli/dropc/dropc.pdf
>
>
>
> ps. There is a small copy-paste typo in
>
>
> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/ann/BreezeUtil.scala#L43
>
> should read B :)
>
>
>
>
>
> --
> Ruslan Dautkhanov
>
>
>
> On Mon, Sep 7, 2015 at 12:47 PM, Feynman Liang <fli...@databricks.com>
> wrote:
>
> Backprop is used to compute the gradient here
> <https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/ann/Layer.scala#L579-L584>,
> which is then optimized by SGD or LBFGS here
> <https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/ann/Layer.scala#L878>
>
>
>
> On Mon, Sep 7, 2015 at 11:24 AM, Nick Pentreath <nick.pentre...@gmail.com>
> wrote:
>
> Haven't checked the actual code but that doc says "MLPC employes
> backpropagation for learning the model. .."?
>
>
>
>
>
>
> —
> Sent from Mailbox <https://www.dropbox.com/mailbox>
>
>
>
> On Mon, Sep 7, 2015 at 8:18 PM, Ruslan Dautkhanov <dautkha...@gmail.com>
> wrote:
>
> http://people.apache.org/~pwendell/spark-releases/latest/ml-ann.html
>
>
>
> Implementation seems missing backpropagation?
>
> Was there is a good reason to omit BP?
>
> What are the drawbacks of a pure feedforward-only ANN?
>
>
>
> Thanks!
>
>
>
> --
> Ruslan Dautkhanov
>
>
>
>
>
>
>
>
>
>
>
>
>


Re: Spark ANN

2015-09-08 Thread Feynman Liang
Just wondering, why do we need tensors? Is the implementation of convnets
using im2col (see here <http://cs231n.github.io/convolutional-networks/>)
insufficient?

On Tue, Sep 8, 2015 at 11:55 AM, Ulanov, Alexander <alexander.ula...@hpe.com
> wrote:

> Ruslan, thanks for including me in the discussion!
>
>
>
> Dropout and other features such as Autoencoder were implemented, but not
> merged yet in order to have room for improving the internal Layer API. For
> example, there is an ongoing work with convolutional layer that
> consumes/outputs 2D arrays. We’ll probably need to change the Layer’s
> input/output type to tensors. This will influence dropout which will need
> some refactoring to handle tensors too. Also, all new components should
> have ML pipeline public interface. There is an umbrella issue for deep
> learning in Spark https://issues.apache.org/jira/browse/SPARK-5575 which
> includes various features of Autoencoder, in particular
> https://issues.apache.org/jira/browse/SPARK-10408. You are very welcome
> to join and contribute since there is a lot of work to be done.
>
>
>
> Best regards, Alexander
>
> *From:* Ruslan Dautkhanov [mailto:dautkha...@gmail.com]
> *Sent:* Monday, September 07, 2015 10:09 PM
> *To:* Feynman Liang
> *Cc:* Nick Pentreath; user; na...@yandex.ru
> *Subject:* Re: Spark ANN
>
>
>
> Found a dropout commit from avulanov:
>
>
> https://github.com/avulanov/spark/commit/3f25e26d10ef8617e46e35953fe0ad1a178be69d
>
>
>
> It probably hasn't made its way to MLLib (yet?).
>
>
>
>
>
> --
> Ruslan Dautkhanov
>
>
>
> On Mon, Sep 7, 2015 at 8:34 PM, Feynman Liang <fli...@databricks.com>
> wrote:
>
> Unfortunately, not yet... Deep learning support (autoencoders, RBMs) is on
> the roadmap for 1.6 <https://issues.apache.org/jira/browse/SPARK-10324>
> though, and there is a spark package
> <http://spark-packages.org/package/rakeshchalasani/MLlib-dropout> for
> dropout regularized logistic regression.
>
>
>
>
>
> On Mon, Sep 7, 2015 at 3:15 PM, Ruslan Dautkhanov <dautkha...@gmail.com>
> wrote:
>
> Thanks!
>
>
>
> It does not look Spark ANN yet supports dropout/dropconnect or any other
> techniques that help avoiding overfitting?
>
> http://www.cs.toronto.edu/~rsalakhu/papers/srivastava14a.pdf
>
> https://cs.nyu.edu/~wanli/dropc/dropc.pdf
>
>
>
> ps. There is a small copy-paste typo in
>
>
> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/ann/BreezeUtil.scala#L43
>
> should read B :)
>
>
>
>
>
> --
> Ruslan Dautkhanov
>
>
>
> On Mon, Sep 7, 2015 at 12:47 PM, Feynman Liang <fli...@databricks.com>
> wrote:
>
> Backprop is used to compute the gradient here
> <https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/ann/Layer.scala#L579-L584>,
> which is then optimized by SGD or LBFGS here
> <https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/ann/Layer.scala#L878>
>
>
>
> On Mon, Sep 7, 2015 at 11:24 AM, Nick Pentreath <nick.pentre...@gmail.com>
> wrote:
>
> Haven't checked the actual code but that doc says "MLPC employes
> backpropagation for learning the model. .."?
>
>
>
>
>
>
> —
> Sent from Mailbox <https://www.dropbox.com/mailbox>
>
>
>
> On Mon, Sep 7, 2015 at 8:18 PM, Ruslan Dautkhanov <dautkha...@gmail.com>
> wrote:
>
> http://people.apache.org/~pwendell/spark-releases/latest/ml-ann.html
>
>
>
> Implementation seems missing backpropagation?
>
> Was there is a good reason to omit BP?
>
> What are the drawbacks of a pure feedforward-only ANN?
>
>
>
> Thanks!
>
>
>
> --
> Ruslan Dautkhanov
>
>
>
>
>
>
>
>
>
>
>


Re: Spark ANN

2015-09-07 Thread Feynman Liang
Unfortunately, not yet... Deep learning support (autoencoders, RBMs) is on
the roadmap for 1.6 <https://issues.apache.org/jira/browse/SPARK-10324>
though, and there is a spark package
<http://spark-packages.org/package/rakeshchalasani/MLlib-dropout> for
dropout regularized logistic regression.


On Mon, Sep 7, 2015 at 3:15 PM, Ruslan Dautkhanov <dautkha...@gmail.com>
wrote:

> Thanks!
>
> It does not look Spark ANN yet supports dropout/dropconnect or any other
> techniques that help avoiding overfitting?
> http://www.cs.toronto.edu/~rsalakhu/papers/srivastava14a.pdf
> https://cs.nyu.edu/~wanli/dropc/dropc.pdf
>
> ps. There is a small copy-paste typo in
>
> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/ann/BreezeUtil.scala#L43
> should read B :)
>
>
>
> --
> Ruslan Dautkhanov
>
> On Mon, Sep 7, 2015 at 12:47 PM, Feynman Liang <fli...@databricks.com>
> wrote:
>
>> Backprop is used to compute the gradient here
>> <https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/ann/Layer.scala#L579-L584>,
>> which is then optimized by SGD or LBFGS here
>> <https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/ann/Layer.scala#L878>
>>
>> On Mon, Sep 7, 2015 at 11:24 AM, Nick Pentreath <nick.pentre...@gmail.com
>> > wrote:
>>
>>> Haven't checked the actual code but that doc says "MLPC employes
>>> backpropagation for learning the model. .."?
>>>
>>>
>>>
>>> —
>>> Sent from Mailbox <https://www.dropbox.com/mailbox>
>>>
>>>
>>> On Mon, Sep 7, 2015 at 8:18 PM, Ruslan Dautkhanov <dautkha...@gmail.com>
>>> wrote:
>>>
>>>> http://people.apache.org/~pwendell/spark-releases/latest/ml-ann.html
>>>>
>>>> Implementation seems missing backpropagation?
>>>> Was there is a good reason to omit BP?
>>>> What are the drawbacks of a pure feedforward-only ANN?
>>>>
>>>> Thanks!
>>>>
>>>>
>>>> --
>>>> Ruslan Dautkhanov
>>>>
>>>
>>>
>>
>


Re: Spark ANN

2015-09-07 Thread Feynman Liang
BTW thanks for pointing out the typos, I've included them in my MLP cleanup
PR <https://github.com/apache/spark/pull/8648>

On Mon, Sep 7, 2015 at 7:34 PM, Feynman Liang <fli...@databricks.com> wrote:

> Unfortunately, not yet... Deep learning support (autoencoders, RBMs) is on
> the roadmap for 1.6 <https://issues.apache.org/jira/browse/SPARK-10324>
> though, and there is a spark package
> <http://spark-packages.org/package/rakeshchalasani/MLlib-dropout> for
> dropout regularized logistic regression.
>
>
> On Mon, Sep 7, 2015 at 3:15 PM, Ruslan Dautkhanov <dautkha...@gmail.com>
> wrote:
>
>> Thanks!
>>
>> It does not look Spark ANN yet supports dropout/dropconnect or any other
>> techniques that help avoiding overfitting?
>> http://www.cs.toronto.edu/~rsalakhu/papers/srivastava14a.pdf
>> https://cs.nyu.edu/~wanli/dropc/dropc.pdf
>>
>> ps. There is a small copy-paste typo in
>>
>> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/ann/BreezeUtil.scala#L43
>> should read B :)
>>
>>
>>
>> --
>> Ruslan Dautkhanov
>>
>> On Mon, Sep 7, 2015 at 12:47 PM, Feynman Liang <fli...@databricks.com>
>> wrote:
>>
>>> Backprop is used to compute the gradient here
>>> <https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/ann/Layer.scala#L579-L584>,
>>> which is then optimized by SGD or LBFGS here
>>> <https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/ann/Layer.scala#L878>
>>>
>>> On Mon, Sep 7, 2015 at 11:24 AM, Nick Pentreath <
>>> nick.pentre...@gmail.com> wrote:
>>>
>>>> Haven't checked the actual code but that doc says "MLPC employes
>>>> backpropagation for learning the model. .."?
>>>>
>>>>
>>>>
>>>> —
>>>> Sent from Mailbox <https://www.dropbox.com/mailbox>
>>>>
>>>>
>>>> On Mon, Sep 7, 2015 at 8:18 PM, Ruslan Dautkhanov <dautkha...@gmail.com
>>>> > wrote:
>>>>
>>>>> http://people.apache.org/~pwendell/spark-releases/latest/ml-ann.html
>>>>>
>>>>> Implementation seems missing backpropagation?
>>>>> Was there is a good reason to omit BP?
>>>>> What are the drawbacks of a pure feedforward-only ANN?
>>>>>
>>>>> Thanks!
>>>>>
>>>>>
>>>>> --
>>>>> Ruslan Dautkhanov
>>>>>
>>>>
>>>>
>>>
>>
>


Re: Spark ANN

2015-09-07 Thread Feynman Liang
Backprop is used to compute the gradient here
,
which is then optimized by SGD or LBFGS here


On Mon, Sep 7, 2015 at 11:24 AM, Nick Pentreath 
wrote:

> Haven't checked the actual code but that doc says "MLPC employes
> backpropagation for learning the model. .."?
>
>
>
> —
> Sent from Mailbox 
>
>
> On Mon, Sep 7, 2015 at 8:18 PM, Ruslan Dautkhanov 
> wrote:
>
>> http://people.apache.org/~pwendell/spark-releases/latest/ml-ann.html
>>
>> Implementation seems missing backpropagation?
>> Was there is a good reason to omit BP?
>> What are the drawbacks of a pure feedforward-only ANN?
>>
>> Thanks!
>>
>>
>> --
>> Ruslan Dautkhanov
>>
>
>


Re: How to generate spark assembly (jar file) using Intellij

2015-08-29 Thread Feynman Liang
Have you tried `build/sbt assembly`?

On Sat, Aug 29, 2015 at 9:03 PM, Muler mulugeta.abe...@gmail.com wrote:

 Hi guys,

 I can successfully build Spark using Intellij, but I'm not able to
 locate/generate spark assembly (jar file) in the assembly/target directly)
 How do I generate one? I have attached the screenshot of my IDE.



 Thanks,


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark MLLIB multiclass calssification

2015-08-29 Thread Feynman Liang
I would check out the Pipeline code example
https://spark.apache.org/docs/latest/ml-guide.html#example-pipeline

On Sat, Aug 29, 2015 at 9:23 PM, Zsombor Egyed egye...@starschema.net
wrote:

 Hi!

 I want to implement a multiclass classification for documents.
 So I have different kinds of text files, and I want to classificate them
 with spark mllib in java.

 Do you have any code examples?

 Thanks!

 --


 *Egyed Zsombor *
 Junior Big Data Engineer



 Mobile: +36 70 320 65 81 | Twitter:@starschemaltd

 Email: egye...@starschema.net bali...@starschema.net | Web:
 www.starschema.net




Re: Spark MLLIB multiclass calssification

2015-08-29 Thread Feynman Liang
I think the spark.ml logistic regression currently only supports 0/1
labels. If you need multiclass, I would suggest to look at either the
spark.ml decision trees. If you don't care too much for pipelines, then you
could use the spark.mllib logistic regression after featurizing.

On Sat, Aug 29, 2015 at 10:49 PM, Zsombor Egyed egye...@starschema.net
wrote:

 Thank you, I saw this before, but it is just a binary classification, so
 how can I extract this to multiple classification.

 Simply add different labels?
 e.g.:

   new LabeledDocument(0L, a b c d e spark, 1.0),
   new LabeledDocument(1L, b d, 0.0),
   new LabeledDocument(2L, hadoop f g h, 2.0),




 On Sun, Aug 30, 2015 at 7:32 AM, Feynman Liang fli...@databricks.com
 wrote:

 I would check out the Pipeline code example
 https://spark.apache.org/docs/latest/ml-guide.html#example-pipeline

 On Sat, Aug 29, 2015 at 9:23 PM, Zsombor Egyed egye...@starschema.net
 wrote:

 Hi!

 I want to implement a multiclass classification for documents.
 So I have different kinds of text files, and I want to classificate them
 with spark mllib in java.

 Do you have any code examples?

 Thanks!

 --


 *Egyed Zsombor *
 Junior Big Data Engineer



 Mobile: +36 70 320 65 81 | Twitter:@starschemaltd

 Email: egye...@starschema.net bali...@starschema.net | Web:
 www.starschema.net





 --


 *Egyed Zsombor *
 Junior Big Data Engineer



 Mobile: +36 70 320 65 81 | Twitter:@starschemaltd

 Email: egye...@starschema.net bali...@starschema.net | Web:
 www.starschema.net




Re: MLlib Prefixspan implementation

2015-08-26 Thread Feynman Liang
ReversedPrefix is used because scala's List uses a linked list, which has
constant time append to head but linear time append to tail.

I'm aware that there are use cases for the gap constraints. My question was
more about whether any users of Spark/MLlib have an immediate application
for these features.

On Wed, Aug 26, 2015 at 12:10 AM, alexis GILLAIN ila...@hotmail.com wrote:

 A first use case of gap constraint is included in the article.
 Another application would be customer-shopping sequence analysis where you
 want to put a constraint on the duration between two purchases for them to
 be considered as a pertinent sequence.

 Additional question regarding the code : what's the point of using 
 ReversedPrefix
 in localprefispan ? The prefix is used neither in finding frequent items
 of a projected database or computing a new projected database so it looks
 like it's appended in inverse order just to be reversed when transformed to
 a sequence.

 2015-08-25 12:15 GMT+08:00 Feynman Liang fli...@databricks.com:

 CCing the mailing list again.

 It's currently not on the radar. Do you have a use case for it? I can
 bring it up during 1.6 roadmap planning tomorrow.

 On Mon, Aug 24, 2015 at 8:28 PM, alexis GILLAIN ila...@hotmail.com
 wrote:

 Hi,

 I just realized the article I mentioned is cited in the jira and not in
 the code so I guess you didn't use this result.

 Do you plan to implement sequence with timestamp and gap constraint as
 in :


 https://people.mpi-inf.mpg.de/~rgemulla/publications/miliaraki13mg-fsm.pdf

 2015-08-25 7:06 GMT+08:00 Feynman Liang fli...@databricks.com:

 Hi Alexis,

 Unfortunately, both of the papers you referenced appear to be
 translations and are quite difficult to understand. We followed
 http://doi.org/10.1109/ICDE.2001.914830 when implementing PrefixSpan.
 Perhaps you can find the relevant lines in there so I can elaborate 
 further?

 Feynman

 On Thu, Aug 20, 2015 at 9:07 AM, alexis GILLAIN ila...@hotmail.com
 wrote:

 I want to use prefixspan so I had a look at the code and the cited
 paper : Distributed PrefixSpan Algorithm Based on MapReduce.

 There is a result in the paper I didn't really undertstand and I
 could'nt find where it is used in the code.

 Suppose a sequence database S = {­1­,2...­n}, a sequence a... is a
 length-(L-1) (2≤L≤n) sequential pattern, in projected databases which is a
 prefix of a length-(L-1) sequential pattern a...a, when the support 
 count
 of a is not less than min_support, it is equal to obtaining a length-L
 sequential pattern  a ... a  from projected databases that obtaining a
 length-L sequential pattern  a ... a  from a sequence database S.

 According to the paper It's supposed to add a pruning step in the
 reduce function but I couldn't find where.

 This result seems to come from a previous paper : Wang Linlin, Fan
 Jun. Improved Algorithm for Sequential Pattern Mining Based on PrefixSpan
 [J]. Computer Engineering, 2009, 35(23): 56-61 but it didn't help me to
 understand it and how it can improve the algorithm.








Re: CHAID Decision Trees

2015-08-25 Thread Feynman Liang
For a single decision tree, the closest I can think of is printDebugString,
which gives you a text representation of the decision thresholds and paths
down the tree.

I don't think there's anything in MLlib for visualizing GBTs or random
forests

On Tue, Aug 25, 2015 at 9:20 PM, Jatinpreet Singh jatinpr...@gmail.com
wrote:

 Hi Feynman,

 Thanks for the information. Is there a way to depict decision tree as a
 visualization for large amounts of data using any other technique/library?

 Thanks,
 Jatin

 On Tue, Aug 25, 2015 at 11:42 PM, Feynman Liang fli...@databricks.com
 wrote:

 Nothing is in JIRA
 https://issues.apache.org/jira/issues/?jql=project%20%3D%20SPARK%20AND%20text%20~%20%22CHAID%22
 so AFAIK no, only random forests and GBTs using entropy or GINI for
 information gain is supported.

 On Tue, Aug 25, 2015 at 9:39 AM, jatinpreet jatinpr...@gmail.com wrote:

 Hi,

 I wish to know if MLlib supports CHAID regression and classifcation
 trees.
 If yes, how can I  build them in spark?

 Thanks,
 Jatin



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/CHAID-Decision-Trees-tp24449.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





 --
 Regards,
 Jatinpreet Singh



Re: CHAID Decision Trees

2015-08-25 Thread Feynman Liang
Nothing is in JIRA
https://issues.apache.org/jira/issues/?jql=project%20%3D%20SPARK%20AND%20text%20~%20%22CHAID%22
so AFAIK no, only random forests and GBTs using entropy or GINI for
information gain is supported.

On Tue, Aug 25, 2015 at 9:39 AM, jatinpreet jatinpr...@gmail.com wrote:

 Hi,

 I wish to know if MLlib supports CHAID regression and classifcation trees.
 If yes, how can I  build them in spark?

 Thanks,
 Jatin



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/CHAID-Decision-Trees-tp24449.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Adding/subtracting org.apache.spark.mllib.linalg.Vector in Scala?

2015-08-25 Thread Feynman Liang
Kristina,

Thanks for the discussion. I followed up on your problem and learned that Scala
doesn't support multiple implicit conversions in a single expression
http://stackoverflow.com/questions/8068346/can-scala-apply-multiple-implicit-conversions-in-one-expression
for
complexity reasons. I'm afraid the solution for now is to do (v1:
BV[Double]) + (v1: BV[Double])

On Tue, Aug 25, 2015 at 11:06 AM, Kristina Rogale Plazonic kpl...@gmail.com
 wrote:

 YES PLEASE!

 :)))

 On Tue, Aug 25, 2015 at 1:57 PM, Burak Yavuz brk...@gmail.com wrote:

 Hmm. I have a lot of code on the local linear algebra operations using
 Spark's Matrix and Vector representations
 done for https://issues.apache.org/jira/browse/SPARK-6442.

 I can make a Spark package with that code if people are interested.

 Best,
 Burak

 On Tue, Aug 25, 2015 at 10:54 AM, Kristina Rogale Plazonic 
 kpl...@gmail.com wrote:

 However I do think it's easier than it seems to write the implicits;
 it doesn't involve new classes or anything. Yes it's pretty much just
 what you wrote. There is a class Vector in Spark. This declaration
 can be in an object; you don't implement your own class. (Also you can
 use toBreeze to get Breeze vectors.)


 The implicit conversion with the implicit def happens for the first
 vector in the sum, but not the second vector (see below).

 At this point I give up, because I spent way too much time.  I am so
 disappointed.  So many times I heard Spark makes simple things easy and
 complicated things possible. Well, here is the simplest thing you can
 imagine in linear algebra, but heck, it is not easy or intuitive.  It was
 easier to run a DeepLearning algo (from another library) than add two
 vectors.

 If anybody has a workaround other than implementing your own
 add/substract/scalarMultiply, PLEASE let me know.

 Here is the code and error from (freshly started) spark-shell:

 scala import breeze.linalg.{DenseVector = BDV, SparseVector = BSV,
 Vector = BV}
 import breeze.linalg.{DenseVector=BDV, SparseVector=BSV, Vector=BV}

 scala import org.apache.spark.mllib.linalg.Vectors
 import org.apache.spark.mllib.linalg.Vectors

 scala val v1 = Vectors.dense(1.0, 2.0, 3.0)
 v1: org.apache.spark.mllib.linalg.Vector = [1.0,2.0,3.0]

 scala import org.apache.spark.mllib.linalg.{Vector =SparkVector}
 import org.apache.spark.mllib.linalg.{Vector=SparkVector}

 scala object MyUtils {
  |   implicit def toBreeze(v:SparkVector) = BV(v.toArray)
  | }
 warning: there were 1 feature warning(s); re-run with -feature for
 details
 defined module MyUtils

 scala import MyUtils._
 import MyUtils._

 scala v1:BV[Double]
 res2: breeze.linalg.Vector[Double] = DenseVector(1.0, 2.0, 3.0)

 scala v1 + v1
 console:30: error: could not find implicit value for parameter op:
 breeze.linalg.operators.OpAdd.Impl2[breeze.linalg.Vector[Double],org.apache.spark.mllib.linalg.Vector,That]
   v1 + v1
  ^








Re: MLlib Prefixspan implementation

2015-08-24 Thread Feynman Liang
CCing the mailing list again.

It's currently not on the radar. Do you have a use case for it? I can bring
it up during 1.6 roadmap planning tomorrow.

On Mon, Aug 24, 2015 at 8:28 PM, alexis GILLAIN ila...@hotmail.com wrote:

 Hi,

 I just realized the article I mentioned is cited in the jira and not in
 the code so I guess you didn't use this result.

 Do you plan to implement sequence with timestamp and gap constraint as in :

 https://people.mpi-inf.mpg.de/~rgemulla/publications/miliaraki13mg-fsm.pdf

 2015-08-25 7:06 GMT+08:00 Feynman Liang fli...@databricks.com:

 Hi Alexis,

 Unfortunately, both of the papers you referenced appear to be
 translations and are quite difficult to understand. We followed
 http://doi.org/10.1109/ICDE.2001.914830 when implementing PrefixSpan.
 Perhaps you can find the relevant lines in there so I can elaborate further?

 Feynman

 On Thu, Aug 20, 2015 at 9:07 AM, alexis GILLAIN ila...@hotmail.com
 wrote:

 I want to use prefixspan so I had a look at the code and the cited paper
 : Distributed PrefixSpan Algorithm Based on MapReduce.

 There is a result in the paper I didn't really undertstand and I
 could'nt find where it is used in the code.

 Suppose a sequence database S = {­1­,2...­n}, a sequence a... is a
 length-(L-1) (2≤L≤n) sequential pattern, in projected databases which is a
 prefix of a length-(L-1) sequential pattern a...a, when the support count
 of a is not less than min_support, it is equal to obtaining a length-L
 sequential pattern  a ... a  from projected databases that obtaining a
 length-L sequential pattern  a ... a  from a sequence database S.

 According to the paper It's supposed to add a pruning step in the reduce
 function but I couldn't find where.

 This result seems to come from a previous paper : Wang Linlin, Fan Jun.
 Improved Algorithm for Sequential Pattern Mining Based on PrefixSpan [J].
 Computer Engineering, 2009, 35(23): 56-61 but it didn't help me to
 understand it and how it can improve the algorithm.






Re: mllib on (key, Iterable[Vector])

2015-08-11 Thread Feynman Liang
You could try flatMapping i.e. if you have data : RDD[(key,
Iterable[Vector])] then  data.flatMap(_._2) : RDD[Vector], which can be
GMMed.

If you want to first partition by url, I would first create multiple RDDs
using `filter`, then running GMM on each of the filtered rdds.

On Tue, Aug 11, 2015 at 5:43 AM, Fabian Böhnlein fabian.boehnl...@gmail.com
 wrote:

 Hi everyone,

 I am trying to use mllib.clustering.GaussianMixture, but am blocked by the
 fact that the API only accepts RDD[Vector].

 Broadly speaking I need to run the clustering on an RDD[(key,
 Iterable[Vector]), e.g. (fabricated):

 val WebsiteUserAgeRDD : RDD[url, userAgeVector]

 val ageClusterByUrl =
 WebsiteUserAgeRDD.groupby(_.url).mapValues(GaussianMixture.setK(x).run)

 This obviously does not work, as the mapValues function is called on
 Iterable[Vector] but requires RDD[Vector]
 As I see it, parallelizing this Iterable is not possible, would result in
 an RDD of RDDs?

 Anyone has an idea how to cluster an RDD of (key, Iterable[Vector]) like
 in above groupBy result?

 Many thanks,
 Fabian



Re: miniBatchFraction for LinearRegressionWithSGD

2015-08-07 Thread Feynman Liang
Sounds reasonable to me, feel free to create a JIRA (and PR if you're up
for it) so we can see what others think!

On Fri, Aug 7, 2015 at 1:45 AM, Gerald Loeffler 
gerald.loeff...@googlemail.com wrote:

 hi,

 if new LinearRegressionWithSGD() uses a miniBatchFraction of 1.0,
 doesn’t that make it a deterministic/classical gradient descent rather
 than a SGD?

 Specifically, miniBatchFraction=1.0 means the entire data set, i.e.
 all rows. In the spirit of SGD, shouldn’t the default be the fraction
 that results in exactly one row of the data set?

 thank you
 gerald

 --
 Gerald Loeffler
 mailto:gerald.loeff...@googlemail.com
 http://www.gerald-loeffler.net

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: miniBatchFraction for LinearRegressionWithSGD

2015-08-07 Thread Feynman Liang
Yep, I think that's what Gerald is saying and they are proposing to default
miniBatchFraction = (1 / numInstances). Is that correct?

On Fri, Aug 7, 2015 at 11:16 AM, Meihua Wu rotationsymmetr...@gmail.com
wrote:

 I think in the SGD algorithm, the mini batch sample is done without
 replacement. So with fraction=1, then all the rows will be sampled
 exactly once to form the miniBatch, resulting to the
 deterministic/classical case.

 On Fri, Aug 7, 2015 at 9:05 AM, Feynman Liang fli...@databricks.com
 wrote:
  Sounds reasonable to me, feel free to create a JIRA (and PR if you're up
 for
  it) so we can see what others think!
 
  On Fri, Aug 7, 2015 at 1:45 AM, Gerald Loeffler
  gerald.loeff...@googlemail.com wrote:
 
  hi,
 
  if new LinearRegressionWithSGD() uses a miniBatchFraction of 1.0,
  doesn’t that make it a deterministic/classical gradient descent rather
  than a SGD?
 
  Specifically, miniBatchFraction=1.0 means the entire data set, i.e.
  all rows. In the spirit of SGD, shouldn’t the default be the fraction
  that results in exactly one row of the data set?
 
  thank you
  gerald
 
  --
  Gerald Loeffler
  mailto:gerald.loeff...@googlemail.com
  http://www.gerald-loeffler.net
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 
 



Re: Spark MLib v/s SparkR

2015-08-07 Thread Feynman Liang
SparkR and MLlib are becoming more integrated (we recently added R formula
support) but the integration is still quite small. If you learn R and
SparkR, you will not be able to leverage most of the distributed algorithms
in MLlib (e.g. all the algorithms you cited). However, you could use the
equivalent R implementations (e.g. glm for Logistic) but be aware that
these will not scale to the large scale datasets Spark is designed to
handle.

On Thu, Aug 6, 2015 at 8:06 PM, praveen S mylogi...@gmail.com wrote:

 I am starting off with classification models, Logistic,RandomForest.
 Basically wanted to learn Machine learning.
 Since I have a java background I started off with MLib, but later heard R
 works as well ( with scaling issues - only).

 So, with SparkR was wondering the scaling issue would be resolved - hence
 my question why not go with R and Spark R alone.( keeping aside my
 inclination towards java)

 On Thu, Aug 6, 2015 at 12:28 AM, Charles Earl charles.ce...@gmail.com
 wrote:

 What machine learning algorithms are you interested in exploring or
 using? Start from there or better yet the problem you are trying to solve,
 and then the selection may be evident.


 On Wednesday, August 5, 2015, praveen S mylogi...@gmail.com wrote:

 I was wondering when one should go for MLib or SparkR. What is the
 criteria or what should be considered before choosing either of the
 solutions for data analysis?
 or What is the advantages of Spark MLib over Spark R or advantages of
 SparkR over MLib?



 --
 - Charles





Re: miniBatchFraction for LinearRegressionWithSGD

2015-08-07 Thread Feynman Liang
Good point; I agree that defaulting to online SGD (single example per
iteration) would be a poor UX due to performance.

On Fri, Aug 7, 2015 at 12:44 PM, Meihua Wu rotationsymmetr...@gmail.com
wrote:

 Feynman, thanks for clarifying.

 If we default miniBatchFraction = (1 / numInstances), then we will
 only hit one row for every iteration of SGD regardless the number of
 partitions and executors. In other words the parallelism provided by
 the RDD is lost in this approach. I think this is something we need to
 consider for the default value of miniBatchFraction.

 On Fri, Aug 7, 2015 at 11:24 AM, Feynman Liang fli...@databricks.com
 wrote:
  Yep, I think that's what Gerald is saying and they are proposing to
 default
  miniBatchFraction = (1 / numInstances). Is that correct?
 
  On Fri, Aug 7, 2015 at 11:16 AM, Meihua Wu rotationsymmetr...@gmail.com
 
  wrote:
 
  I think in the SGD algorithm, the mini batch sample is done without
  replacement. So with fraction=1, then all the rows will be sampled
  exactly once to form the miniBatch, resulting to the
  deterministic/classical case.
 
  On Fri, Aug 7, 2015 at 9:05 AM, Feynman Liang fli...@databricks.com
  wrote:
   Sounds reasonable to me, feel free to create a JIRA (and PR if you're
 up
   for
   it) so we can see what others think!
  
   On Fri, Aug 7, 2015 at 1:45 AM, Gerald Loeffler
   gerald.loeff...@googlemail.com wrote:
  
   hi,
  
   if new LinearRegressionWithSGD() uses a miniBatchFraction of 1.0,
   doesn’t that make it a deterministic/classical gradient descent
 rather
   than a SGD?
  
   Specifically, miniBatchFraction=1.0 means the entire data set, i.e.
   all rows. In the spirit of SGD, shouldn’t the default be the fraction
   that results in exactly one row of the data set?
  
   thank you
   gerald
  
   --
   Gerald Loeffler
   mailto:gerald.loeff...@googlemail.com
   http://www.gerald-loeffler.net
  
   -
   To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
   For additional commands, e-mail: user-h...@spark.apache.org
  
  
 
 



Re: Label based MLLib MulticlassMetrics is buggy

2015-08-05 Thread Feynman Liang
Hi Hayri,

Can you provide a sample of the expected and actual results?

Feynman

On Wed, Aug 5, 2015 at 6:19 AM, Hayri Volkan Agun volkana...@gmail.com
wrote:

 The results in MulticlassMetrics is totally wrong. They are improperly
 calculated.
 Confusion matrix may be true I don't know but for each label scores are
 wrong.

 --
 Hayri Volkan Agun
 PhD. Student - Anadolu University



Re: Label based MLLib MulticlassMetrics is buggy

2015-08-05 Thread Feynman Liang
Also, what version of Spark are you using?

On Wed, Aug 5, 2015 at 9:57 AM, Feynman Liang fli...@databricks.com wrote:

 Hi Hayri,

 Can you provide a sample of the expected and actual results?

 Feynman

 On Wed, Aug 5, 2015 at 6:19 AM, Hayri Volkan Agun volkana...@gmail.com
 wrote:

 The results in MulticlassMetrics is totally wrong. They are improperly
 calculated.
 Confusion matrix may be true I don't know but for each label scores are
 wrong.

 --
 Hayri Volkan Agun
 PhD. Student - Anadolu University





Re: Label based MLLib MulticlassMetrics is buggy

2015-08-05 Thread Feynman Liang
1.5 has not yet been released; what is the commit hash that you are
building?

On Wed, Aug 5, 2015 at 10:29 AM, Hayri Volkan Agun volkana...@gmail.com
wrote:

 Hi,

 In Spark 1.5 I saw a result for precision 1.0 and recall 0.01 for decision
 tree classification.
 While precision a hundred the recall shouldn't be so small...I checked the
 code, everything seems ok
 but why I got such a result is unexplainable. As far as I understand from
 scala code the row sum is the actual
 class counts, the column sum is predictions sum am I right?
 I am doing additional tests for comparison with my own code...
 I attached a document for my reuters tests on page 3.


 On Wed, Aug 5, 2015 at 7:57 PM, Feynman Liang fli...@databricks.com
 wrote:

 Also, what version of Spark are you using?

 On Wed, Aug 5, 2015 at 9:57 AM, Feynman Liang fli...@databricks.com
 wrote:

 Hi Hayri,

 Can you provide a sample of the expected and actual results?

 Feynman

 On Wed, Aug 5, 2015 at 6:19 AM, Hayri Volkan Agun volkana...@gmail.com
 wrote:

 The results in MulticlassMetrics is totally wrong. They are improperly
 calculated.
 Confusion matrix may be true I don't know but for each label scores are
 wrong.

 --
 Hayri Volkan Agun
 PhD. Student - Anadolu University






 --
 Hayri Volkan Agun
 PhD. Student - Anadolu University



Re: [Spark ML] HasInputCol, etc.

2015-07-28 Thread Feynman Liang
Unfortunately, AFAIK custom transformers are not part of the public API so
you will have to continue with what you're doing.

On Tue, Jul 28, 2015 at 1:32 PM, Matt Narrell matt.narr...@gmail.com
wrote:

 Hey,

 Our ML ETL pipeline has several complex steps that I’d like to address
 with custom Transformers in an ML Pipeline.  Looking at the Tokenizer and
 HashingTF transformers I see these handy traits (HasInputCol, HasLabelCol,
 HasOutputCol, etc.) but they have strict access modifiers.  How can I use
 these with custom Transformer/Estimator implementations?

 I’m stuck depositing my implementations in org.apache.spark.ml, which is
 tolerable for now, but I’m wondering if I’m missing some pattern?

 Thanks,
 mn
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: LDA on a large dataset

2015-07-20 Thread Feynman Liang
LDAOptimizer.scala:421 collects to driver a numTopics by vocabSize matrix
of summary statistics. I suspect that this is what's causing the failure.

One thing you may try doing is decreasing the vocabulary size. One
possibility would be to use a HashingTF if you don't mind dimension
reduction via hashing collisions.

On Mon, Jul 20, 2015 at 3:21 AM, Peter Zvirinsky peter.zvirin...@gmail.com
wrote:

 Hello,

 I'm trying to run LDA on a relatively large dataset (size 100-200 G), but
 with no luck so far.

 At first I made sure that the executors have enough memory with respect to
 the vocabulary size and number of topics.

 After that I ran LDA with default EMLDAOptimizer, but learning failed
 after a few iteration, because the application master ran out of disk. The
 learning job used all space available in the usercache of the application
 master (cca. 100G). I noticed that this implementation uses some sort of
 checkopointing so I made sure it is not used, but it didn't help.

 Afterwards, I tried the OnlineLDAOptimizer, but it started failing at
 reduce at LDAOptimizer.scala:421 with error message: Total size of
 serialized results of X tasks (Y GB) is bigger than
 spark.driver.maxResultSize (Y GB). I kept increasing the
 spark.driver.maxResultSize to tens of GB but it didn't help, just delayed
 this error. I tried to adjust the batch size to very small values so that I
 was sure it must fit into memory, but this didn't help at all.

 Has anyone experience with learning LDA on such a dataset? Maybe some
 ideas what might be wrong?

 I'm using spark 1.4.0 in yarn-client mode. I managed to learn a word2vec
 model on the same dataset with no problems at all.

 Thanks,
 Peter



Re: Finding moving average using Spark and Scala

2015-07-14 Thread Feynman Liang
If your rows may have NAs in them, I would process each column individually
by first projecting the column ( map(x = x.nameOfColumn) ), filtering out
the NAs, then running a summarizer over each column.

Even if you have many rows, after summarizing you will only have a vector
of length #columns.

On Mon, Jul 13, 2015 at 7:19 PM, Anupam Bagchi anupam_bag...@rocketmail.com
 wrote:

 Hello Feynman,

 Actually in my case, the vectors I am summarizing over will not have the
 same dimension since many devices will be inactive on some days. This is at
 best a sparse matrix where we take only the active days and attempt to fit
 a moving average over it.

 The reason I would like to save it to HDFS is that there are really
 several million (almost a billion) devices for which this data needs to be
 written. I am perhaps writing a very few columns, but the number of rows is
 pretty large.

 Given the above two cases, is using MultivariateOnlineSummarizer not a
 good idea then?

 Anupam Bagchi


 On Jul 13, 2015, at 7:06 PM, Feynman Liang fli...@databricks.com wrote:

 Dimensions mismatch when adding new sample. Expecting 8 but got 14.

 Make sure all the vectors you are summarizing over have the same dimension.

 Why would you want to write a MultivariateOnlineSummary object (which can
 be represented with a couple Double's) into a distributed filesystem like
 HDFS?

 On Mon, Jul 13, 2015 at 6:54 PM, Anupam Bagchi 
 anupam_bag...@rocketmail.com wrote:

 Thank you Feynman for the lead.

 I was able to modify the code using clues from the RegressionMetrics
 example. Here is what I got now.

 val deviceAggregateLogs = 
 sc.textFile(logFile).map(DailyDeviceAggregates.parseLogLine).cache()

 // Calculate statistics based on bytes-transferred
 val deviceIdsMap = deviceAggregateLogs.groupBy(_.device_id)
 println(deviceIdsMap.collect().deep.mkString(\n))

 val summary: MultivariateStatisticalSummary = {
   val summary: MultivariateStatisticalSummary = deviceIdsMap.map {
 case (deviceId, allaggregates) = Vectors.dense({
   val sortedAggregates = allaggregates.toArray
   Sorting.quickSort(sortedAggregates)
   sortedAggregates.map(dda = dda.bytes.toDouble)
 })
   }.aggregate(new MultivariateOnlineSummarizer())(
   (summary, v) = summary.add(v),  // Not sure if this is really what I 
 want, it just came from the example
   (sum1, sum2) = sum1.merge(sum2) // Same doubt here as well
 )
   summary
 }

 It compiles fine. But I am now getting an exception as follows at Runtime.

 Exception in thread main org.apache.spark.SparkException: Job aborted
 due to stage failure: Task 1 in stage 3.0 failed 1 times, most recent
 failure: Lost task 1.0 in stage 3.0 (TID 5, localhost):
 java.lang.IllegalArgumentException: requirement failed: Dimensions mismatch
 when adding new sample. Expecting 8 but got 14.
 at scala.Predef$.require(Predef.scala:233)
 at
 org.apache.spark.mllib.stat.MultivariateOnlineSummarizer.add(MultivariateOnlineSummarizer.scala:70)
 at
 com.aeris.analytics.machinelearning.statistics.DailyDeviceStatisticsAnalyzer$$anonfun$4.apply(DailyDeviceStatisticsAnalyzer.scala:41)
 at
 com.aeris.analytics.machinelearning.statistics.DailyDeviceStatisticsAnalyzer$$anonfun$4.apply(DailyDeviceStatisticsAnalyzer.scala:41)
 at
 scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:144)
 at
 scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:144)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at
 scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:144)
 at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1157)
 at
 scala.collection.TraversableOnce$class.aggregate(TraversableOnce.scala:201)
 at
 scala.collection.AbstractIterator.aggregate(Iterator.scala:1157)
 at org.apache.spark.rdd.RDD$$anonfun$26.apply(RDD.scala:966)
 at org.apache.spark.rdd.RDD$$anonfun$26.apply(RDD.scala:966)
 at
 org.apache.spark.SparkContext$$anonfun$32.apply(SparkContext.scala:1533)
 at
 org.apache.spark.SparkContext$$anonfun$32.apply(SparkContext.scala:1533)
 at
 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
 at org.apache.spark.scheduler.Task.run(Task.scala:64)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:722)

 Can’t tell where exactly I went wrong. Also, how do I take the
 MultivariateOnlineSummary object and write it to HDFS? I have the
 MultivariateOnlineSummary object with me, but I really need an RDD to call
 saveAsTextFile

Re: Few basic spark questions

2015-07-14 Thread Feynman Liang
You could implement the receiver as a Spark Streaming Receiver
https://spark.apache.org/docs/latest/streaming-programming-guide.html#input-dstreams-and-receivers;
the data received would be available for any streaming applications which
operate on DStreams (e.g. Streaming KMeans
https://spark.apache.org/docs/latest/mllib-clustering.html#streaming-k-means
).

On Tue, Jul 14, 2015 at 8:31 AM, Oded Maimon o...@scene53.com wrote:

 Hi,
 Thanks for all the help.
 I'm still missing something very basic.

 If I wont use sparkR, which doesn't support streaming (will use mlib
 instead as Debasish suggested), and I have my scala receiver working, how
 the receiver should save the data in memory? I do see the store method, so
 if i use it, how can i read the data from a different spark scala/java
 application? how do i find/query this data?


 Regards,
 Oded Maimon
 Scene53.

 On Tue, Jul 14, 2015 at 12:35 AM, Feynman Liang fli...@databricks.com
 wrote:

 Sorry; I think I may have used poor wording. SparkR will let you use R to
 analyze the data, but it has to be loaded into memory using SparkR (see 
 SparkR
 DataSources
 http://people.apache.org/~pwendell/spark-releases/latest/sparkr.html).
 You will still have to write a Java receiver to store the data into some
 tabular datastore (e.g. Hive) before loading them as SparkR DataFrames and
 performing the analysis.

 R specific questions such as windowing in R should go to R-help@; you
 won't be able to use window since that is a Spark Streaming method.

 On Mon, Jul 13, 2015 at 2:23 PM, Oded Maimon o...@scene53.com wrote:

 You are helping me understanding stuff here a lot.

 I believe I have 3 last questions..

 If is use java receiver to get the data, how should I save it in memory?
 Using store command or other command?

 Once stored, how R can read that data?

 Can I use window command in R? I guess not because it is a streaming
 command, right? Any other way to window the data?

 Sent from IPhone




 On Mon, Jul 13, 2015 at 2:07 PM -0700, Feynman Liang 
 fli...@databricks.com wrote:

  If you use SparkR then you can analyze the data that's currently in
 memory with R; otherwise you will have to write to disk (eg HDFS).

 On Mon, Jul 13, 2015 at 1:45 PM, Oded Maimon o...@scene53.com wrote:

 Thanks again.
 What I'm missing is where can I store the data? Can I store it in
 spark memory and then use R to analyze it? Or should I use hdfs? Any other
 places that I can save the data?

 What would you suggest?

 Thanks...

 Sent from IPhone




 On Mon, Jul 13, 2015 at 1:41 PM -0700, Feynman Liang 
 fli...@databricks.com wrote:

  If you don't require true streaming processing and need to use R for
 analysis, SparkR on a custom data source seems to fit your use case.

 On Mon, Jul 13, 2015 at 1:06 PM, Oded Maimon o...@scene53.com
 wrote:

 Hi, thanks for replying!
 I want to do the entire process in stages. Get the data using Java
 or scala because they are the only Langs that supports custom receivers,
 keep the data somewhere, use R to analyze it, keep the results
 somewhere, output the data to different systems.

 I thought that somewhere can be spark memory using rdd or
 dstreams.. But could it be that I need to keep it in hdfs to make the
 entire process in stages?

 Sent from IPhone




 On Mon, Jul 13, 2015 at 12:07 PM -0700, Feynman Liang 
 fli...@databricks.com wrote:

  Hi Oded,

 I'm not sure I completely understand your question, but it sounds
 like you could have the READER receiver produce a DStream which is
 windowed/processed in Spark Streaming and forEachRDD to do the OUTPUT.
 However, streaming in SparkR is not currently supported (SPARK-6803
 https://issues.apache.org/jira/browse/SPARK-6803) so I'm not too
 sure how ANALYZER would fit in.

 Feynman

 On Sun, Jul 12, 2015 at 11:23 PM, Oded Maimon o...@scene53.com
 wrote:

 any help / idea will be appreciated :)
 thanks


 Regards,
 Oded Maimon
 Scene53.

 On Sun, Jul 12, 2015 at 4:49 PM, Oded Maimon o...@scene53.com
 wrote:

 Hi All,
 we are evaluating spark for real-time analytic. what we are
 trying to do is the following:

- READER APP- use custom receiver to get data from rabbitmq
(written in scala)
- ANALYZER APP - use spark R application to read the data
(windowed), analyze it every minute and save the results inside 
 spark
- OUTPUT APP - user spark application (scala/java/python) to
read the results from R every X minutes and send the data to few 
 external
systems

 basically at the end i would like to have the READER COMPONENT as
 an app that always consumes the data and keeps it in spark,
 have as many ANALYZER COMPONENTS as my data scientists wants, and
 have one OUTPUT APP that will read the ANALYZER results and send it 
 to any
 relevant system.

 what is the right way to do it?

 Thanks,
 Oded.





 *This email and any files transmitted with it are confidential and
 intended solely for the use of the individual or entity to whom they 
 are
 addressed

Re: Finding moving average using Spark and Scala

2015-07-13 Thread Feynman Liang
Dimensions mismatch when adding new sample. Expecting 8 but got 14.

Make sure all the vectors you are summarizing over have the same dimension.

Why would you want to write a MultivariateOnlineSummary object (which can
be represented with a couple Double's) into a distributed filesystem like
HDFS?

On Mon, Jul 13, 2015 at 6:54 PM, Anupam Bagchi anupam_bag...@rocketmail.com
 wrote:

 Thank you Feynman for the lead.

 I was able to modify the code using clues from the RegressionMetrics
 example. Here is what I got now.

 val deviceAggregateLogs = 
 sc.textFile(logFile).map(DailyDeviceAggregates.parseLogLine).cache()

 // Calculate statistics based on bytes-transferred
 val deviceIdsMap = deviceAggregateLogs.groupBy(_.device_id)
 println(deviceIdsMap.collect().deep.mkString(\n))

 val summary: MultivariateStatisticalSummary = {
   val summary: MultivariateStatisticalSummary = deviceIdsMap.map {
 case (deviceId, allaggregates) = Vectors.dense({
   val sortedAggregates = allaggregates.toArray
   Sorting.quickSort(sortedAggregates)
   sortedAggregates.map(dda = dda.bytes.toDouble)
 })
   }.aggregate(new MultivariateOnlineSummarizer())(
   (summary, v) = summary.add(v),  // Not sure if this is really what I 
 want, it just came from the example
   (sum1, sum2) = sum1.merge(sum2) // Same doubt here as well
 )
   summary
 }

 It compiles fine. But I am now getting an exception as follows at Runtime.

 Exception in thread main org.apache.spark.SparkException: Job aborted
 due to stage failure: Task 1 in stage 3.0 failed 1 times, most recent
 failure: Lost task 1.0 in stage 3.0 (TID 5, localhost):
 java.lang.IllegalArgumentException: requirement failed: Dimensions mismatch
 when adding new sample. Expecting 8 but got 14.
 at scala.Predef$.require(Predef.scala:233)
 at
 org.apache.spark.mllib.stat.MultivariateOnlineSummarizer.add(MultivariateOnlineSummarizer.scala:70)
 at
 com.aeris.analytics.machinelearning.statistics.DailyDeviceStatisticsAnalyzer$$anonfun$4.apply(DailyDeviceStatisticsAnalyzer.scala:41)
 at
 com.aeris.analytics.machinelearning.statistics.DailyDeviceStatisticsAnalyzer$$anonfun$4.apply(DailyDeviceStatisticsAnalyzer.scala:41)
 at
 scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:144)
 at
 scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:144)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at
 scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:144)
 at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1157)
 at
 scala.collection.TraversableOnce$class.aggregate(TraversableOnce.scala:201)
 at scala.collection.AbstractIterator.aggregate(Iterator.scala:1157)
 at org.apache.spark.rdd.RDD$$anonfun$26.apply(RDD.scala:966)
 at org.apache.spark.rdd.RDD$$anonfun$26.apply(RDD.scala:966)
 at
 org.apache.spark.SparkContext$$anonfun$32.apply(SparkContext.scala:1533)
 at
 org.apache.spark.SparkContext$$anonfun$32.apply(SparkContext.scala:1533)
 at
 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
 at org.apache.spark.scheduler.Task.run(Task.scala:64)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:722)

 Can’t tell where exactly I went wrong. Also, how do I take the
 MultivariateOnlineSummary object and write it to HDFS? I have the
 MultivariateOnlineSummary object with me, but I really need an RDD to call
 saveAsTextFile() on it.

 Anupam Bagchi
 (c) 408.431.0780 (h) 408-873-7909

 On Jul 13, 2015, at 4:52 PM, Feynman Liang fli...@databricks.com wrote:

 A good example is RegressionMetrics
 https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/evaluation/RegressionMetrics.scala#L48's
 use of of OnlineMultivariateSummarizer to aggregate statistics across
 labels and residuals; take a look at how aggregateByKey is used there.

 On Mon, Jul 13, 2015 at 4:50 PM, Anupam Bagchi 
 anupam_bag...@rocketmail.com wrote:

 Thank you Feynman for your response. Since I am very new to Scala I may
 need a bit more hand-holding at this stage.

 I have been able to incorporate your suggestion about sorting - and it
 now works perfectly. Thanks again for that.

 I tried to use your suggestion of using MultiVariateOnlineSummarizer, but
 could not proceed further. For each deviceid (the key) my goal is to get a
 vector of doubles on which I can query the mean and standard deviation. Now
 because RDDs are immutable, I cannot use a foreach loop to interate through
 the groupby

Re: Few basic spark questions

2015-07-13 Thread Feynman Liang
Hi Oded,

I'm not sure I completely understand your question, but it sounds like you
could have the READER receiver produce a DStream which is
windowed/processed in Spark Streaming and forEachRDD to do the OUTPUT.
However, streaming in SparkR is not currently supported (SPARK-6803
https://issues.apache.org/jira/browse/SPARK-6803) so I'm not too sure how
ANALYZER would fit in.

Feynman

On Sun, Jul 12, 2015 at 11:23 PM, Oded Maimon o...@scene53.com wrote:

 any help / idea will be appreciated :)
 thanks


 Regards,
 Oded Maimon
 Scene53.

 On Sun, Jul 12, 2015 at 4:49 PM, Oded Maimon o...@scene53.com wrote:

 Hi All,
 we are evaluating spark for real-time analytic. what we are trying to do
 is the following:

- READER APP- use custom receiver to get data from rabbitmq (written
in scala)
- ANALYZER APP - use spark R application to read the data (windowed),
analyze it every minute and save the results inside spark
- OUTPUT APP - user spark application (scala/java/python) to read the
results from R every X minutes and send the data to few external systems

 basically at the end i would like to have the READER COMPONENT as an app
 that always consumes the data and keeps it in spark,
 have as many ANALYZER COMPONENTS as my data scientists wants, and have
 one OUTPUT APP that will read the ANALYZER results and send it to any
 relevant system.

 what is the right way to do it?

 Thanks,
 Oded.





 *This email and any files transmitted with it are confidential and
 intended solely for the use of the individual or entity to whom they are
 addressed. Please note that any disclosure, copying or distribution of the
 content of this information is strictly forbidden. If you have received
 this email message in error, please destroy it immediately and notify its
 sender.*



Re: Spark issue with running CrossValidator with RandomForestClassifier on dataset

2015-07-13 Thread Feynman Liang
Can you send the error messages again? I'm not seeing them.

On Mon, Jul 13, 2015 at 2:45 AM, shivamverma shivam13ve...@gmail.com
wrote:

 Hi

 I am running Spark 1.4 in Standalone mode on top of Hadoop 2.3 on a CentOS
 node. I am trying to run grid search on an RF classifier to classify a
 small
 dataset using the pyspark.ml.tuning module, specifically the
 ParamGridBuilder and CrossValidator classes. I get the following error when
 I try passing a DataFrame of Features-Labels to CrossValidator:



 I tried the following code, using the dataset given in Spark's CV
 documentation for  cross validator
 
 https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.tuning.CrossValidator
 
 . I also pass the DF through a StringIndexer transformation for the RF:



 Note that the above dataset works on logistic regression. I have also tried
 a larger dataset with sparse vectors as features (which I was originally
 trying to fit) but received the same error on RF.
 My guess is that there is an issue with how
 BinaryClassificationEvaluator(self, rawPredictionCol=rawPrediction,
 labelCol=label, metricName=areaUnderROC) interprets the 'rawPredict'
 column - with LR, the rawPredictionCol is a list/vector, whereas with RF,
 the prediction column is a double.
 Is it an issue with the evaluator, or is there something else that I'm
 missing?

 Thanks!



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-issue-with-running-CrossValidator-with-RandomForestClassifier-on-dataset-tp23791.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Finding moving average using Spark and Scala

2015-07-13 Thread Feynman Liang

 The call to Sorting.quicksort is not working. Perhaps I am calling it the
 wrong way.

allaggregates.toArray allocates and creates a new array separate from
allaggregates which is sorted by Sorting.quickSort; allaggregates. Try:
val sortedAggregates = allaggregates.toArray
Sorting.quickSort(sortedAggregates)

 I would like to use the Spark mllib class MultivariateStatisticalSummary
 to calculate the statistical values.

MultivariateStatisticalSummary is a trait (similar to a Java interface);
you probably want to use MultivariateOnlineSummarizer.

 For that I would need to keep all my intermediate values as RDD so that I
 can directly use the RDD methods to do the job.

Correct; you would do an aggregate using the add and merge functions
provided by MultivariateOnlineSummarizer

 At the end I also need to write the results to HDFS for which there is a
 method provided on the RDD class to do so, which is another reason I would
 like to retain everything as RDD.

You can write the RDD[(device_id, MultivariateOnlineSummarizer)] to HDFS,
or you could unpack the relevant statistics from
MultivariateOnlineSummarizer into an array/tuple using a mapValues first
and then write.

On Mon, Jul 13, 2015 at 10:07 AM, Anupam Bagchi 
anupam_bag...@rocketmail.com wrote:

 I have to do the following tasks on a dataset using Apache Spark with
 Scala as the programming language:

1. Read the dataset from HDFS. A few sample lines look like this:

  
 deviceid,bytes,eventdate15590657,246620,2015063014066921,1907,2015062114066921,1906,201506266522013,2349,201506266522013,2525,20150613


1. Group the data by device id. Thus we now have a map of deviceid =
(bytes,eventdate)
2. For each device, sort the set by eventdate. We now have an ordered
set of bytes based on eventdate for each device.
3. Pick the last 30 days of bytes from this ordered set.
4. Find the moving average of bytes for the last date using a time
period of 30.
5. Find the standard deviation of the bytes for the final date using a
time period of 30.
6. Return two values in the result (mean - k*stddev) and (mean + k*stddev)
[Assume k = 3]

 I am using Apache Spark 1.3.0. The actual dataset is wider, and it has to
 run on a billion rows finally.
 Here is the data structure for the dataset.

 package com.testingcase class DeviceAggregates (
 device_id: Integer,
 bytes: Long,
 eventdate: Integer
) extends Ordered[DailyDeviceAggregates] {
   def compare(that: DailyDeviceAggregates): Int = {
 eventdate - that.eventdate
   }}object DeviceAggregates {
   def parseLogLine(logline: String): DailyDeviceAggregates = {
 val c = logline.split(,)
 DailyDeviceAggregates(c(0).toInt, c(1).toLong, c(2).toInt)
   }}

 The DeviceAnalyzer class looks like this:
 I have a very crude implementation that does the job, but it is not up to
 the mark. Sorry, I am very new to Scala/Spark, so my questions are quite
 basic. Here is what I have now:

 import com.testing.DailyDeviceAggregatesimport 
 org.apache.spark.{SparkContext, SparkConf}import 
 org.apache.spark.mllib.linalg.Vectorimport 
 org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, 
 Statistics}import org.apache.spark.mllib.linalg.{Vector, Vectors}
 import scala.util.Sorting
 object DeviceAnalyzer {
   def main(args: Array[String]) {
 val sparkConf = new SparkConf().setAppName(Device Analyzer)
 val sc = new SparkContext(sparkConf)

 val logFile = args(0)

 val deviceAggregateLogs = 
 sc.textFile(logFile).map(DailyDeviceAggregates.parseLogLine).cache()

 // Calculate statistics based on bytes
 val deviceIdsMap = deviceAggregateLogs.groupBy(_.device_id)

 deviceIdsMap.foreach(a = {
   val device_id = a._1  // This is the device ID
   val allaggregates = a._2  // This is an array of all device-aggregates 
 for this device

   println(allaggregates)
   Sorting.quickSort(allaggregates.toArray) // Sort the CompactBuffer of 
 DailyDeviceAggregates based on eventdate
   println(allaggregates) // This does not work - results are not sorted !!

   val byteValues = allaggregates.map(dda = dda.bytes.toDouble).toArray
   val count = byteValues.count(A = true)
   val sum = byteValues.sum
   val xbar = sum / count
   val sum_x_minus_x_bar_square = byteValues.map(x = 
 (x-xbar)*(x-xbar)).sum
   val stddev = math.sqrt(sum_x_minus_x_bar_square / count)

   val vector: Vector = Vectors.dense(byteValues)
   println(vector)
   println(device_id + , + xbar + , + stddev)

   //val vector: Vector = Vectors.dense(byteValues)
   //println(vector)
   //val summary: MultivariateStatisticalSummary = 
 Statistics.colStats(vector)
 })

 sc.stop()
   }}

 I would really appreciate if someone can suggests improvements for the
 following:

1. The call to Sorting.quicksort is not working. Perhaps I am 

Re: Few basic spark questions

2015-07-13 Thread Feynman Liang
Sorry; I think I may have used poor wording. SparkR will let you use R to
analyze the data, but it has to be loaded into memory using SparkR (see SparkR
DataSources
http://people.apache.org/~pwendell/spark-releases/latest/sparkr.html).
You will still have to write a Java receiver to store the data into some
tabular datastore (e.g. Hive) before loading them as SparkR DataFrames and
performing the analysis.

R specific questions such as windowing in R should go to R-help@; you won't
be able to use window since that is a Spark Streaming method.

On Mon, Jul 13, 2015 at 2:23 PM, Oded Maimon o...@scene53.com wrote:

 You are helping me understanding stuff here a lot.

 I believe I have 3 last questions..

 If is use java receiver to get the data, how should I save it in memory?
 Using store command or other command?

 Once stored, how R can read that data?

 Can I use window command in R? I guess not because it is a streaming
 command, right? Any other way to window the data?

 Sent from IPhone




 On Mon, Jul 13, 2015 at 2:07 PM -0700, Feynman Liang 
 fli...@databricks.com wrote:

  If you use SparkR then you can analyze the data that's currently in
 memory with R; otherwise you will have to write to disk (eg HDFS).

 On Mon, Jul 13, 2015 at 1:45 PM, Oded Maimon o...@scene53.com wrote:

 Thanks again.
 What I'm missing is where can I store the data? Can I store it in spark
 memory and then use R to analyze it? Or should I use hdfs? Any other places
 that I can save the data?

 What would you suggest?

 Thanks...

 Sent from IPhone




 On Mon, Jul 13, 2015 at 1:41 PM -0700, Feynman Liang 
 fli...@databricks.com wrote:

  If you don't require true streaming processing and need to use R for
 analysis, SparkR on a custom data source seems to fit your use case.

 On Mon, Jul 13, 2015 at 1:06 PM, Oded Maimon o...@scene53.com wrote:

 Hi, thanks for replying!
 I want to do the entire process in stages. Get the data using Java or
 scala because they are the only Langs that supports custom receivers, keep
 the data somewhere, use R to analyze it, keep the results somewhere,
 output the data to different systems.

 I thought that somewhere can be spark memory using rdd or dstreams..
 But could it be that I need to keep it in hdfs to make the entire process
 in stages?

 Sent from IPhone




 On Mon, Jul 13, 2015 at 12:07 PM -0700, Feynman Liang 
 fli...@databricks.com wrote:

  Hi Oded,

 I'm not sure I completely understand your question, but it sounds
 like you could have the READER receiver produce a DStream which is
 windowed/processed in Spark Streaming and forEachRDD to do the OUTPUT.
 However, streaming in SparkR is not currently supported (SPARK-6803
 https://issues.apache.org/jira/browse/SPARK-6803) so I'm not too
 sure how ANALYZER would fit in.

 Feynman

 On Sun, Jul 12, 2015 at 11:23 PM, Oded Maimon o...@scene53.com
 wrote:

 any help / idea will be appreciated :)
 thanks


 Regards,
 Oded Maimon
 Scene53.

 On Sun, Jul 12, 2015 at 4:49 PM, Oded Maimon o...@scene53.com
 wrote:

 Hi All,
 we are evaluating spark for real-time analytic. what we are trying
 to do is the following:

- READER APP- use custom receiver to get data from rabbitmq
(written in scala)
- ANALYZER APP - use spark R application to read the data
(windowed), analyze it every minute and save the results inside 
 spark
- OUTPUT APP - user spark application (scala/java/python) to
read the results from R every X minutes and send the data to few 
 external
systems

 basically at the end i would like to have the READER COMPONENT as
 an app that always consumes the data and keeps it in spark,
 have as many ANALYZER COMPONENTS as my data scientists wants, and
 have one OUTPUT APP that will read the ANALYZER results and send it to 
 any
 relevant system.

 what is the right way to do it?

 Thanks,
 Oded.





 *This email and any files transmitted with it are confidential and
 intended solely for the use of the individual or entity to whom they are
 addressed. Please note that any disclosure, copying or distribution of 
 the
 content of this information is strictly forbidden. If you have received
 this email message in error, please destroy it immediately and notify 
 its
 sender.*



 *This email and any files transmitted with it are confidential and
 intended solely for the use of the individual or entity to whom they are
 addressed. Please note that any disclosure, copying or distribution of the
 content of this information is strictly forbidden. If you have received
 this email message in error, please destroy it immediately and notify its
 sender.*



 *This email and any files transmitted with it are confidential and
 intended solely for the use of the individual or entity to whom they are
 addressed. Please note that any disclosure, copying or distribution of the
 content of this information is strictly forbidden. If you have received
 this email message in error, please destroy it immediately and notify its

Re: Finding moving average using Spark and Scala

2015-07-13 Thread Feynman Liang
A good example is RegressionMetrics
https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/evaluation/RegressionMetrics.scala#L48's
use of of OnlineMultivariateSummarizer to aggregate statistics across
labels and residuals; take a look at how aggregateByKey is used there.

On Mon, Jul 13, 2015 at 4:50 PM, Anupam Bagchi anupam_bag...@rocketmail.com
 wrote:

 Thank you Feynman for your response. Since I am very new to Scala I may
 need a bit more hand-holding at this stage.

 I have been able to incorporate your suggestion about sorting - and it now
 works perfectly. Thanks again for that.

 I tried to use your suggestion of using MultiVariateOnlineSummarizer, but
 could not proceed further. For each deviceid (the key) my goal is to get a
 vector of doubles on which I can query the mean and standard deviation. Now
 because RDDs are immutable, I cannot use a foreach loop to interate through
 the groupby results and individually add the values in an RDD - Spark does
 not allow that. I need to apply the RDD functions directly on the entire
 set to achieve the transformations I need. This is where I am faltering
 since I am not used to the lambda expressions that Scala uses.

 object DeviceAnalyzer {
   def main(args: Array[String]) {
 val sparkConf = new SparkConf().setAppName(Device Analyzer)
 val sc = new SparkContext(sparkConf)

 val logFile = args(0)

 val deviceAggregateLogs = 
 sc.textFile(logFile).map(DailyDeviceAggregates.parseLogLine).cache()

 // Calculate statistics based on bytes
 val deviceIdsMap = deviceAggregateLogs.groupBy(_.device_id)

 // Question: Can we not write the line above as 
 deviceAggregateLogs.groupBy(_.device_id).sortBy(c = c_.2, true) // Anything 
 wrong?

 // All I need to do below is collect the vector of bytes for each device 
 and store it in the RDD

 // The problem with the ‘foreach' approach below, is that it generates 
 the vector values one at a time, which I cannot

 // add individually to an immutable RDD

 deviceIdsMap.foreach(a = {
   val device_id = a._1  // This is the device ID
   val allaggregates = a._2  // This is an array of all device-aggregates 
 for this device

   val sortedaggregates = allaggregates.toArray  
 Sorting.quickSort(sortedaggregates)

   val byteValues = sortedaggregates.map(dda = dda.bytes.toDouble).toArray
   val count = byteValues.count(A = true)
   val sum = byteValues.sum
   val xbar = sum / count
   val sum_x_minus_x_bar_square = byteValues.map(x = 
 (x-xbar)*(x-xbar)).sum
   val stddev = math.sqrt(sum_x_minus_x_bar_square / count)

   val vector: Vector = Vectors.dense(byteValues)
   println(vector)
   println(device_id + , + xbar + , + stddev)
 })

   //val vector: Vector = Vectors.dense(byteValues)
   //println(vector)
   //val summary: MultivariateStatisticalSummary = 
 Statistics.colStats(vector)


 sc.stop() } }

 Can you show me how to write the ‘foreach’ loop in a Spark-friendly way?
 Thanks a lot for your help.

 Anupam Bagchi


 On Jul 13, 2015, at 12:21 PM, Feynman Liang fli...@databricks.com wrote:

 The call to Sorting.quicksort is not working. Perhaps I am calling it the
 wrong way.

 allaggregates.toArray allocates and creates a new array separate from
 allaggregates which is sorted by Sorting.quickSort; allaggregates. Try:
 val sortedAggregates = allaggregates.toArray
 Sorting.quickSort(sortedAggregates)

 I would like to use the Spark mllib class MultivariateStatisticalSummary
 to calculate the statistical values.

 MultivariateStatisticalSummary is a trait (similar to a Java interface);
 you probably want to use MultivariateOnlineSummarizer.

 For that I would need to keep all my intermediate values as RDD so that I
 can directly use the RDD methods to do the job.

 Correct; you would do an aggregate using the add and merge functions
 provided by MultivariateOnlineSummarizer

 At the end I also need to write the results to HDFS for which there is a
 method provided on the RDD class to do so, which is another reason I would
 like to retain everything as RDD.

 You can write the RDD[(device_id, MultivariateOnlineSummarizer)] to HDFS,
 or you could unpack the relevant statistics from
 MultivariateOnlineSummarizer into an array/tuple using a mapValues first
 and then write.

 On Mon, Jul 13, 2015 at 10:07 AM, Anupam Bagchi 
 anupam_bag...@rocketmail.com wrote:

 I have to do the following tasks on a dataset using Apache Spark with
 Scala as the programming language:

1. Read the dataset from HDFS. A few sample lines look like this:

  
 deviceid,bytes,eventdate15590657,246620,2015063014066921,1907,2015062114066921,1906,201506266522013,2349,201506266522013,2525,20150613


1. Group the data by device id. Thus we now have a map of deviceid =
(bytes,eventdate)
2. For each device, sort the set by eventdate. We now have an ordered
set of bytes based on eventdate

Re: How can the RegressionMetrics produce negative R2 and explained variance?

2015-07-12 Thread Feynman Liang
This might be a bug... R^2 should always be in [0,1] and variance should
never be negative.

Can you give more details on which version of Spark you are running?

On Sun, Jul 12, 2015 at 8:37 AM, Sean Owen so...@cloudera.com wrote:

 In general, R2 means the line that was fit is a very poor fit -- the
 mean would give a smaller squared error. But it can also mean you are
 applying R2 where it doesn't apply. Here, you're not performing a
 linear regression; why are you using R2?

 On Sun, Jul 12, 2015 at 4:22 PM, afarahat ayman.fara...@yahoo.com wrote:
  Hello;
  I am using the ALS recommendation MLLibb. To select the optimal rank, I
 have
  a number of users who used multiple items as my test. I then get the
  prediction on these users and compare it to the observed. I use
  the  RegressionMetrics to estimate the R^2.
  I keep getting a negative value.
  r2 =   -1.18966999676 explained var =  -1.18955347415 count =  11620309
  Here is my Pyspark code :
 
  train1.cache()
  test1.cache()
 
  numIterations =10
  for i in range(10) :
  rank = int(40+i*10)
  als = ALS(rank=rank, maxIter=numIterations,implicitPrefs=False)
  model = als.fit(train1)
  predobs =
  model.transform(test1).select(prediction,rating).map(lambda p :
  (p.prediction,p.rating)).filter(lambda p: (math.isnan(p[0]) == False))
  metrics = RegressionMetrics(predobs)
  mycount = predobs.count()
  myr2 = metrics.r2
  myvar = metrics.explainedVariance
  print hooo,rank,  r2 =  ,myr2, explained var = , myvar,
 count
  = ,mycount
 
 
 
 
  --
  View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-can-the-RegressionMetrics-produce-negative-R2-and-explained-variance-tp23779.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: how to use DoubleRDDFunctions on mllib Vector?

2015-07-08 Thread Feynman Liang
A RDD[Double] is an abstraction for a large collection of doubles, possibly
distributed across multiple nodes. The DoubleRDDFunctions are there for
performing mean and variance calculations across this distributed dataset.

In contrast, a Vector is not distributed and fits on your local machine.
You would be better off computing these quantities on the Vector directly
(see mllib.clustering.GaussianMixture#vectorMean for an example of how to
compute the mean of a vector).

On Tue, Jul 7, 2015 at 8:26 PM, 诺铁 noty...@gmail.com wrote:

 hi,

 there are some useful functions in DoubleRDDFunctions, which I can use if
 I have RDD[Double], eg, mean, variance.

 Vector doesn't have such methods, how can I convert Vector to RDD[Double],
 or maybe better if I can call mean directly on a Vector?



Re: Getting started with spark-scala developemnt in eclipse.

2015-07-08 Thread Feynman Liang
Take a look at

https://cwiki.apache.org/confluence/display/SPARK/Useful+Developer+Tools#UsefulDeveloperTools-Eclipse

On Wed, Jul 8, 2015 at 7:47 AM, Daniel Siegmann daniel.siegm...@teamaol.com
 wrote:

 To set up Eclipse for Spark you should install the Scala IDE plugins:
 http://scala-ide.org/download/current.html

 Define your project in Maven with Scala plugins configured (you should be
 able to find documentation online) and import as an existing Maven project.
 The source code should be in src/main/scala but otherwise the project
 structure will be the same as you'd expect in Java.

 Nothing special is needed for Spark. Just define the desired Spark jars (
 spark-core and possibly others, such as spark-sql) in your Maven POM as
 dependencies. You should scope these dependencies as provided, since they
 will automatically be on the classpath when you deploy your project to a
 Spark cluster.

 One thing to keep in mind is that Scala dependencies require separate jars
 for different versions of Scala, and it is convention to append the Scala
 version to the artifact ID. For example, if you are using Scala 2.11.x,
 your dependency will be spark-core_2.11 (look on search.maven.org if
 you're not sure). I think you can omit the Scala version if you're using
 SBT (not sure why you would, but some people seem to prefer it).

 Unit testing Spark is briefly explained in the programming guide
 https://spark.apache.org/docs/latest/programming-guide.html#unit-testing
 .

 To deploy using spark-submit you can build the jar using mvn package if
 and only if you don't have any non-Spark dependencies. Otherwise, the
 simplest thing is to build a jar with dependencies (typically using the
 assembly
 http://maven.apache.org/plugins/maven-assembly-plugin/single-mojo.html
 or shade https://maven.apache.org/plugins/maven-shade-plugin/ plugins).





 On Wed, Jul 8, 2015 at 9:38 AM, Prateek . prat...@aricent.com wrote:

  Hi



 I am beginner to scala and spark. I am trying to set up eclipse
 environment to develop spark program  in scala, then take it’s  jar  for
 spark-submit.

 How shall I start? To start my  task includes, setting up eclipse for
 scala and spark, getting dependencies resolved, building project using
 maven/sbt.

 Is there any good blog or documentation that is can follow.



 Thanks

  DISCLAIMER: This message is proprietary to Aricent and is intended
 solely for the use of the individual to whom it is addressed. It may
 contain privileged or confidential information and should not be circulated
 or used for any purpose other than for what it is intended. If you have
 received this message in error, please notify the originator immediately.
 If you are not the intended recipient, you are notified that you are
 strictly prohibited from using, copying, altering, or disclosing the
 contents of this message. Aricent accepts no responsibility for loss or
 damage arising from the use of the information transmitted by this email
 including damage from virus.





Re: Disable heartbeat messages in REPL

2015-07-08 Thread Feynman Liang
I was thinking the same thing! Try sc.setLogLevel(ERROR)

On Wed, Jul 8, 2015 at 2:01 PM, Lincoln Atkinson lat...@microsoft.com
wrote:

  “WARN Executor: Told to re-register on heartbeat” is logged repeatedly
 in the spark shell, which is very distracting and corrupts the display of
 whatever set of commands I’m currently typing out.



 Is there an option to disable the logging of this message?



 Thanks,

 -Lincoln



Re: How to write mapreduce programming in spark by using java on user-defined javaPairRDD?

2015-07-07 Thread Feynman Liang
Hi MIssie,

In the Java API, you should consider:

   1. RDD.map
   
https://spark.apache.org/docs/latest/api/java/org/apache/spark/rdd/RDD.html#map(scala.Function1,%20scala.reflect.ClassTag)
to
   transform the text
   2. RDD.sortBy
   
https://spark.apache.org/docs/latest/api/java/org/apache/spark/rdd/RDD.html#sortBy(scala.Function1,%20boolean,%20int,%20scala.math.Ordering,%20scala.reflect.ClassTag)
to
   order by LongWritable
   3. RDD.saveAsTextFile
   
https://spark.apache.org/docs/latest/api/java/org/apache/spark/rdd/RDD.html#saveAsTextFile(java.lang.String)
to
   write to HDFS


On Tue, Jul 7, 2015 at 7:18 AM, 付雅丹 yadanfu1...@gmail.com wrote:

 Hi, everyone!

 I've got key,value pair in form of LongWritable, Text, where I used
 the following code:

 SparkConf conf = new SparkConf().setAppName(MapReduceFileInput);
 JavaSparkContext sc = new JavaSparkContext(conf);
 Configuration confHadoop = new Configuration();

 JavaPairRDDLongWritable,Text sourceFile=sc.newAPIHadoopFile(
 hdfs://cMaster:9000/wcinput/data.txt,
 DataInputFormat.class,LongWritable.class,Text.class,confHadoop);

 Now I want to handle the javapairrdd data from LongWritable, Text to
 another LongWritable, Text, where the Text content is different. After
 that, I want to write Text into hdfs in order of LongWritable value. But I
 don't know how to write mapreduce function in spark using java language.
 Someone can help me?


 Sincerely,
 Missie.



Re: Random Forest in MLLib

2015-07-06 Thread Feynman Liang
Not yet, though work on this feature has begun (SPARK-5133
https://issues.apache.org/jira/browse/SPARK-5133)

On Mon, Jul 6, 2015 at 4:46 PM, Sourav Mazumder sourav.mazumde...@gmail.com
 wrote:

 Hi,

 Is there a way to get variable importance for RandomForest model created
 using MLLib ? This way one can know among multiple features which are the
 one contributing the most to the dependent variable.

 Regards,
 Sourav



Re: KMeans questions

2015-07-02 Thread Feynman Liang
SPARK-7879 https://issues.apache.org/jira/browse/SPARK-7879 seems to
address your use case (running KMeans on a dataframe and having the results
added as an additional column)

On Wed, Jul 1, 2015 at 5:53 PM, Eric Friedman eric.d.fried...@gmail.com
wrote:

 In preparing a DataFrame (spark 1.4) to use with MLlib's kmeans.train
 method, is there a cleaner way to create the Vectors than this?

 data.map{r = Vectors.dense(r.getDouble(0), r.getDouble(3),
 r.getDouble(4), r.getDouble(5), r.getDouble(6))}


 Second, once I train the model and call predict on my vectorized dataset,
 what's the best way to relate the cluster assignments back to the original
 data frame?


 That is, I started with df1, which has a bunch of domain information in
 each row and also the doubles I use to cluster.  I vectorize the doubles
 and then train on them.  I use the resulting model to predict clusters for
 the vectors.  I'd like to look at the original domain information in light
 of the clusters to which they are now assigned.





Re: sliding

2015-07-02 Thread Feynman Liang
How about:

events.sliding(3).zipWithIndex.filter(_._2 % 3 == 0)

That would group the RDD into adjacent buckets of size 3.

On Thu, Jul 2, 2015 at 2:33 PM, tog guillaume.all...@gmail.com wrote:

 Was complaining about the Seq ...

 Moved it to
 val eventsfiltered = events.sliding(3).map(s  = Event(s(0).time,
 (s(0).x+s(1).x+s(2).x)/3.0 (s(0).vztot+s(1).vztot+s(2).vztot)/3.0))

 and that is working.

 Anyway this is not what I wanted to do, my goal was more to implement
 bucketing to shorten the time serie.


 On 2 July 2015 at 18:25, Feynman Liang fli...@databricks.com wrote:

 What's the error you are getting?

 On Thu, Jul 2, 2015 at 9:37 AM, tog guillaume.all...@gmail.com wrote:

 Hi

 Sorry for this scala/spark newbie question. I am creating RDD which
 represent large time series this way:
 val data = sc.textFile(somefile.csv)

 case class Event(
 time:   Double,
 x:  Double,
 vztot:  Double
 )

 val events = data.filter(s = !s.startsWith(GMT)).map{s =
 val r = s.split(;)
 ...
 Event(time, x, vztot )
 }

 I would like to process those RDD in order to reduce them by some
 filtering. For this I noticed that sliding could help but I was not able to
 use it so far. Here is what I did:

 import org.apache.spark.mllib.rdd.RDDFunctions._

 val eventsfiltered = events.sliding(3).map(Seq(e0, e1, e2)  =
 Event(e0.time, (e0.x+e1.x+e2.x)/3.0, (e0.vztot+e1.vztot+e2.vztot)/3.0))

 Thanks for your help


 --
 PGP KeyID: 2048R/EA31CFC9  subkeys.pgp.net





 --
 PGP KeyID: 2048R/EA31CFC9  subkeys.pgp.net



Re: sliding

2015-07-02 Thread Feynman Liang
Consider an example dataset [a, b, c, d, e, f]

After sliding(3), you get [(a,b,c), (b,c,d), (c, d, e), (d, e, f)]

After zipWithIndex: [((a,b,c), 0), ((b,c,d), 1), ((c, d, e), 2), ((d, e,
f), 3)]

After filter: [((a,b,c), 0), ((d, e, f), 3)], which is what I'm assuming
you want (non-overlapping buckets)? You can then do something like
.map(func(_._1)) to apply func (e.g. min, max, mean) to the 3-tuples.

On Thu, Jul 2, 2015 at 3:20 PM, tog guillaume.all...@gmail.com wrote:

 Well it did reduce the length of my serie of events. I will have to dig
 what it did actually ;-)

 I would assume that it took one out of 3 value, is that correct ?
 Would it be possible to control a bit more how the value assigned to the
 bucket is computed for example take the first element, the min, the max,
 mean ... any other function.

 Thanks for putting me on the right track

 On 2 July 2015 at 22:56, Feynman Liang fli...@databricks.com wrote:

 How about:

 events.sliding(3).zipWithIndex.filter(_._2 % 3 == 0)

 That would group the RDD into adjacent buckets of size 3.

 On Thu, Jul 2, 2015 at 2:33 PM, tog guillaume.all...@gmail.com wrote:

 Was complaining about the Seq ...

 Moved it to
 val eventsfiltered = events.sliding(3).map(s  = Event(s(0).time,
 (s(0).x+s(1).x+s(2).x)/3.0 (s(0).vztot+s(1).vztot+s(2).vztot)/3.0))

 and that is working.

 Anyway this is not what I wanted to do, my goal was more to implement
 bucketing to shorten the time serie.


 On 2 July 2015 at 18:25, Feynman Liang fli...@databricks.com wrote:

 What's the error you are getting?

 On Thu, Jul 2, 2015 at 9:37 AM, tog guillaume.all...@gmail.com wrote:

 Hi

 Sorry for this scala/spark newbie question. I am creating RDD which
 represent large time series this way:
 val data = sc.textFile(somefile.csv)

 case class Event(
 time:   Double,
 x:  Double,
 vztot:  Double
 )

 val events = data.filter(s = !s.startsWith(GMT)).map{s =
 val r = s.split(;)
 ...
 Event(time, x, vztot )
 }

 I would like to process those RDD in order to reduce them by some
 filtering. For this I noticed that sliding could help but I was not able 
 to
 use it so far. Here is what I did:

 import org.apache.spark.mllib.rdd.RDDFunctions._

 val eventsfiltered = events.sliding(3).map(Seq(e0, e1, e2)  =
 Event(e0.time, (e0.x+e1.x+e2.x)/3.0, (e0.vztot+e1.vztot+e2.vztot)/3.0))

 Thanks for your help


 --
 PGP KeyID: 2048R/EA31CFC9  subkeys.pgp.net





 --
 PGP KeyID: 2048R/EA31CFC9  subkeys.pgp.net





 --
 PGP KeyID: 2048R/EA31CFC9  subkeys.pgp.net



Re: custom RDD in java

2015-07-01 Thread Feynman Liang
AFAIK RDDs can only be created on the driver, not the executors. Also,
`saveAsTextFile(...)` is an action and hence can also only be executed on
the driver.

As Silvio already mentioned, Sqoop may be a good option.

On Wed, Jul 1, 2015 at 12:46 PM, Shushant Arora shushantaror...@gmail.com
wrote:

 List of tables is not large , RDD is created on table list to parllelise
 the work of fetching tables in multiple mappers at same time.Since time
 taken to fetch a table is significant , so can't run that sequentially.


 Content of table fetched by a map job is large, so one option is to dump
 content to hdfs using filesystem api from inside map function for every few
 rows of table fetched.

 I cannot keep complete table in memory and then dump in hdfs using below
 map function-

 JavaRDDString tablecontent = tablelistrdd.map(new
 FunctionString,IterableString)
 {public IterableString call(String tablename){
 ..make jdbc connection get table data and populate in list and return
 that..
  }
  tablecontent .saveAsTextFile(hdfspath);

 Here I wanted to create customRDD- whose partitions would be in memory on
 multiple executors and contains parts of table data. And i would have
 called saveAsTextFile on customRDD directly to save in hdfs.



 On Thu, Jul 2, 2015 at 12:59 AM, Feynman Liang fli...@databricks.com
 wrote:


 On Wed, Jul 1, 2015 at 7:19 AM, Shushant Arora shushantaror...@gmail.com
  wrote:

 JavaRDDString rdd = javasparkcontext.parllelise(tables);


 You are already creating an RDD in Java here ;)

 However, it's not clear to me why you'd want to make this an RDD. Is the
 list of tables so large that it doesn't fit on a single machine? If not,
 you may be better off spinning up one spark job for dumping each table in
 tables using a JDBC datasource
 https://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-to-other-databases
 .

 On Wed, Jul 1, 2015 at 12:00 PM, Silvio Fiorito 
 silvio.fior...@granturing.com wrote:

   Sure, you can create custom RDDs. Haven’t done so in Java, but in
 Scala absolutely.

   From: Shushant Arora
 Date: Wednesday, July 1, 2015 at 1:44 PM
 To: Silvio Fiorito
 Cc: user
 Subject: Re: custom RDD in java

   ok..will evaluate these options but is it possible to create RDD in
 java?


 On Wed, Jul 1, 2015 at 8:29 PM, Silvio Fiorito 
 silvio.fior...@granturing.com wrote:

  If all you’re doing is just dumping tables from SQLServer to HDFS,
 have you looked at Sqoop?

  Otherwise, if you need to run this in Spark could you just use the
 existing JdbcRDD?


   From: Shushant Arora
 Date: Wednesday, July 1, 2015 at 10:19 AM
 To: user
 Subject: custom RDD in java

   Hi

  Is it possible to write custom RDD in java?

  Requirement is - I am having a list of Sqlserver tables  need to be
 dumped in HDFS.

  So I have a
 ListString tables = {dbname.tablename,dbname.tablename2..};

  then
 JavaRDDString rdd = javasparkcontext.parllelise(tables);

  JavaRDDString tablecontent = rdd.map(new
 FunctionString,IterableString){fetch table and return populate 
 iterable}

  tablecontent.storeAsTextFile(hffs path);


  In rdd.map(new FunctionString,). I cannot keep complete table
 content in memory , so I want to creat my own RDD to handle it.

  Thanks
 Shushant












Re: required: org.apache.spark.streaming.dstream.DStream[org.apache.spark.mllib.linalg.Vector]

2015-06-28 Thread Feynman Liang
You are trying to predict on a DStream[LabeledPoint] (data + labels) but
predictOn expects a DStream[Vector] (just the data without the labels).

Try doing:

val unlabeledStream = labeledStream.map { x = x.features }
model.predictOn(unlabeledStream).print()

On Sun, Jun 28, 2015 at 6:03 PM, Arthur Chan arthur.hk.c...@gmail.com
wrote:

 also my Spark is 1.4

 On Mon, Jun 29, 2015 at 9:02 AM, Arthur Chan arthur.hk.c...@gmail.com
 wrote:



 Hi,


 line 99:model.trainOn(labeledStream)

 line 100: model.predictOn(labeledStream).print()

 line 101:ssc.start()

 line 102: ssc.awaitTermination()


 Regards

 On Sun, Jun 28, 2015 at 10:53 PM, Ted Yu yuzhih...@gmail.com wrote:

 Can you show us your code around line 100 ?

 Which Spark release are you compiling against ?

 Cheers

 On Sun, Jun 28, 2015 at 5:49 AM, Arthur Chan arthur.hk.c...@gmail.com
 wrote:

 Hi,

 I am trying Spark with some sample programs,


 In my code, the following items are imported:

 import
 org.apache.spark.mllib.regression.{StreamingLinearRegressionWithSGD,
 LabeledPoint}

 import
 org.apache.spark.mllib.regression.{StreamingLinearRegressionWithSGD}

 import org.apache.spark.streaming.{Seconds, StreamingContext}

 import scala.util.Random

 I got following error:

 [error] StreamingModel.scala:100: type mismatch;

 [error]  found   :
 org.apache.spark.streaming.dstream.DStream[org.apache.spark.mllib.regression.LabeledPoint]

 [error]  required:
 org.apache.spark.streaming.dstream.DStream[org.apache.spark.mllib.linalg.Vector]

 [error] model.predictOn(labeledStream).print()

 [error] ^

 [error] one error found

 [error] (compile:compile) Compilation failed


 Any idea?


 Regards