Re: Design patterns for Spark implementation

2016-12-08 Thread Peter Figliozzi
Keeping in mind Spark is a parallel computing engine, Spark does not change
your data infrastructure/data architecture.  These days it's relatively
convenient to read data from a variety of sources (S3, HDFS, Cassandra,
...) and ditto on the output side.

For example, for one of my use-cases, I store 10's of gigs of time-series
data in Cassandra.  It just so happens I like to analyze all of it at once
using Spark, which writes a very nice, small text file table of results I
look at using Python/Pandas, in a Jupyter notebook, on a laptop.

If we didn't have Spark, I'd still be doing the input side (Cassandra) and
output side (small text file, ingestible by a laptop) the same way.  The
only difference would be, instead of importing and processing in Spark, my
fictional group of 5,000 assistants would each download a portion of the
data into their Excel spreadsheet, then have a big meeting to produce my
small text file.

So my view is the nature of your data and specific objectives determine
your infrastructure and architecture, not the presence or absence of Spark.





On Sat, Dec 3, 2016 at 10:59 AM, Vasu Gourabathina 
wrote:

> Hi,
>
> I know this is a broad question. If this is not the right forum,
> appreciate if you can point to other sites/areas that may be helpful.
>
> Before posing this question, I did use our friend Google, but sanitizing
> the query results from my need angle hasn't been easy.
>
> Who I am:
>- Have done data processing and analytics, but relatively new to Spark
> world
>
> What I am looking for:
>   - Architecture/Design of a ML system using Spark
>   - In particular, looking for best practices that can support/bridge both
> Engineering and Data Science teams
>
> Engineering:
>- Build a system that has typical engineering needs, data processing,
> scalability, reliability, availability, fault-tolerance etc.
>- System monitoring etc.
> Data Science:
>- Build a system for Data Science team to do data exploration activities
>- Develop models using supervised learning and tweak models
>
> Data:
>   - Batch and incremental updates - mostly structured or semi-structured
> (some data from transaction systems, weblogs, click stream etc.)
>   - Steaming, in near term, but not to begin with
>
> Data Storage:
>   - Data is expected to grow on a daily basis...so, system should be able
> to support and handle big data
>   - May be, after further analysis, there might be a possibility/need to
> archive some of the data...it all depends on how the ML models were built
> and results were stored/used for future usage
>
> Data Analysis:
>   - Obvious data related aspects, such as data cleansing, data
> transformation, data partitioning etc
>   - May be run models on windows of data. For example: last 1-year,
> 2-years etc.
>
> ML models:
>   - Ability to store model versions and previous results
>   - Compare results of different variants of models
>
> Consumers:
>   - RESTful webservice clients to look at the results
>
> *So, the questions I have are:*
> 1) Are there architectural and design patterns that I can use based on
> industry best-practices. In particular:
>   - data ingestion
>   - data storage (for eg. go with HDFS or not)
>   - data partitioning, especially in Spark world
>   - running parallel ML models and combining results etc.
>   - consumption of final results by clients (for eg. by pushing
> results to Cassandra, NoSQL dbs etc.)
>
> Again, I know this is a broad questionPointers to some best-practices
> in some of the areas, if not all, would be highly appreciated. Open to
> purchase any books that may have relevant information.
>
> Thanks much folks,
> Vasu.
>
>


Re: Parsing XML

2016-10-04 Thread Peter Figliozzi
It's pretty clear that df.col(xpath) is looking for a column named xpath in
your df, not executing an xpath over an XML document as you wish.  Try
constructing a UDF which applies your xpath query, and give that as the
second argument to withColumn.

On Tue, Oct 4, 2016 at 4:35 PM, Jean Georges Perrin  wrote:

> Spark 2.0.0
> XML parser 0.4.0
> Java
>
> Hi,
>
> I am trying to create a new column in my data frame, based on a value of a
> sub element. I have done that several time with JSON, but not very
> successful in XML.
>
> (I know a world with less format would be easier :) )
>
> Here is the code:
> df.withColumn("FulfillmentOption1", df.col("//FulfillmentOption[1]
> /text()"));
>
> And here is the error:
> Exception in thread "main" org.apache.spark.sql.AnalysisException: Cannot
> resolve column name "//FulfillmentOption[1]/text()" among (x, xx, xxx,
> , a, b, FulfillmentOption, c, d, e, f, g);
> at org.apache.spark.sql.Dataset$$anonfun$resolve$1.apply(
> Dataset.scala:220)
> at org.apache.spark.sql.Dataset$$anonfun$resolve$1.apply(
> Dataset.scala:220)
> ...
>
> The XPath is valid...
>
> Thanks!
>
> jg
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Spark ML Decision Trees Algorithm

2016-09-30 Thread Peter Figliozzi
It's a good question.  People have been publishing papers on decision trees
and various methods of constructing and pruning them for over 30 years.  I
think it's rather a question for a historian at this point.

On Fri, Sep 30, 2016 at 5:08 PM, janardhan shetty 
wrote:

> Read this explanation but wondering if this algorithm has the base from a
> research paper for detail understanding.
>
> On Fri, Sep 30, 2016 at 1:36 PM, Kevin Mellott 
> wrote:
>
>> The documentation details the algorithm being used at
>> http://spark.apache.org/docs/latest/mllib-decision-tree.html
>>
>> Thanks,
>> Kevin
>>
>> On Fri, Sep 30, 2016 at 1:14 AM, janardhan shetty > > wrote:
>>
>>> Hi,
>>>
>>> Any help here is appreciated ..
>>>
>>> On Wed, Sep 28, 2016 at 11:34 AM, janardhan shetty <
>>> janardhan...@gmail.com> wrote:
>>>
 Is there a reference to the research paper which is implemented in
 spark 2.0 ?

 On Wed, Sep 28, 2016 at 9:52 AM, janardhan shetty <
 janardhan...@gmail.com> wrote:

> Which algorithm is used under the covers while doing decision trees
> FOR SPARK ?
> for example: scikit-learn (python) uses an optimised version of the
> CART algorithm.
>


>>>
>>
>


Re: Treadting NaN fields in Spark

2016-09-29 Thread Peter Figliozzi
"isnan" ends up using a case class, subclass of UnaryExpression, called
"IsNaN" which evaluates each row of the column like this:

   - *False* if the value is Null
   - Check the "Expression.Type" (apparently a Spark thing, not a Scala
   thing.. still learning here)
   - DoubleType:  cast to Double and retrieve .isNaN
   - FloatType: cast to Float and retrieve .isNaN
   - Casting done by value.asInstanceOf[T]

What's interesting is the "inputTypes" for this class are only DoubleType
and FloatType.  Unfortunately, I haven't figured out how the code would
handle a String.  Maybe someone could tell us how these Expressions work?

In any case, we're not getting *True* back unless the value x casted to a
Double actually returns Double.NaN.  Strings casted to Double return errors
(not Double.NaN) and the '-' character casted to Double returns 45 (!).

On Thu, Sep 29, 2016 at 7:45 AM, Michael Segel 
wrote:

> Hi,
>
> Just a few thoughts so take it for what its worth…
>
> Databases have static schemas and will reject a row’s column on insert.
>
> In your case… you have one data set where you have a column which is
> supposed to be a number but you have it as a string.
> You want to convert this to a double in your final data set.
>
>
> It looks like your problem is that your original data set that you
> ingested used a ‘-‘ (dash) to represent missing data, rather than a NULL
> value.
> In fact, looking at the rows… you seem to have a stock that didn’t trade
> for a given day. (All have Volume as 0. ) Why do you need this?  Wouldn’t
> you want to represent this as null or no row for a given date?
>
> The reason your ‘-‘ check failed when isnan() is that ‘-‘ actually could
> be represented as a number.
>
> If you replaced the ‘-‘ with a String that is wider than the width of a
> double … the isnan should flag the row.
>
> (I still need more coffee, so I could be wrong) ;-)
>
> HTH
>
> -Mike
>
> On Sep 28, 2016, at 5:56 AM, Mich Talebzadeh 
> wrote:
>
>
> This is an issue in most databases. Specifically if a field is NaN.. --> (
> *NaN*, standing for not a number, is a numeric data type value
> representing an undefined or unrepresentable value, especially in
> floating-point calculations)
>
> There is a method called isnan() in Spark that is supposed to handle this
> scenario . However, it does not return correct values! For example I
> defined column "Open" as String  (it should be Float) and it has the
> following 7 rogue entries out of 1272 rows in a csv
>
> df2.filter( $"OPen" === 
> "-").select((changeToDate("TradeDate").as("TradeDate")),
> 'Open, 'High, 'Low, 'Close, 'Volume).show
>
> +--+++---+-+--+
> | TradeDate|Open|High|Low|Close|Volume|
> +--+++---+-+--+
> |2011-12-23|   -|   -|  -|40.56| 0|
> |2011-04-21|   -|   -|  -|45.85| 0|
> |2010-12-30|   -|   -|  -|38.10| 0|
> |2010-12-23|   -|   -|  -|38.36| 0|
> |2008-04-30|   -|   -|  -|32.39| 0|
> |2008-04-29|   -|   -|  -|33.05| 0|
> |2008-04-28|   -|   -|  -|32.60| 0|
> +--+++---+-+--+
>
> However, the following does not work!
>
>  df2.filter(isnan($"Open")).show
> +-+--+-+++---+-+--+
> |Stock|Ticker|TradeDate|Open|High|Low|Close|Volume|
> +-+--+-+++---+-+--+
> +-+--+-+++---+-+--+
>
> Any suggestions?
>
> Thanks
>
>
> Dr Mich Talebzadeh
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
> http://talebzadehmich.wordpress.com
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
>


Re: Treadting NaN fields in Spark

2016-09-28 Thread Peter Figliozzi
In Scala, x.isNaN returns true for Double.NaN, but false for any
character.  I guess the `isnan` function you are using works by ultimately
looking at x.isNan.

On Wed, Sep 28, 2016 at 5:56 AM, Mich Talebzadeh 
wrote:

>
> This is an issue in most databases. Specifically if a field is NaN.. --> (
> *NaN*, standing for not a number, is a numeric data type value
> representing an undefined or unrepresentable value, especially in
> floating-point calculations)
>
> There is a method called isnan() in Spark that is supposed to handle this
> scenario . However, it does not return correct values! For example I
> defined column "Open" as String  (it should be Float) and it has the
> following 7 rogue entries out of 1272 rows in a csv
>
> df2.filter( $"OPen" === 
> "-").select((changeToDate("TradeDate").as("TradeDate")),
> 'Open, 'High, 'Low, 'Close, 'Volume).show
>
> +--+++---+-+--+
> | TradeDate|Open|High|Low|Close|Volume|
> +--+++---+-+--+
> |2011-12-23|   -|   -|  -|40.56| 0|
> |2011-04-21|   -|   -|  -|45.85| 0|
> |2010-12-30|   -|   -|  -|38.10| 0|
> |2010-12-23|   -|   -|  -|38.36| 0|
> |2008-04-30|   -|   -|  -|32.39| 0|
> |2008-04-29|   -|   -|  -|33.05| 0|
> |2008-04-28|   -|   -|  -|32.60| 0|
> +--+++---+-+--+
>
> However, the following does not work!
>
>  df2.filter(isnan($"Open")).show
> +-+--+-+++---+-+--+
> |Stock|Ticker|TradeDate|Open|High|Low|Close|Volume|
> +-+--+-+++---+-+--+
> +-+--+-+++---+-+--+
>
> Any suggestions?
>
> Thanks
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>


Re: read multiple files

2016-09-27 Thread Peter Figliozzi
If you're up for a fancy but excellent solution:

   - Store your data in Cassandra.
   - Use the expiring data feature (TTL)
    so
   data will automatically be removed a month later.
   - Now in your Spark process, just read from the database and you don't
   have to worry about the timestamp.
   - You'll still have all your old files if you need to refer back them.

Pete

On Tue, Sep 27, 2016 at 2:52 AM, Divya Gehlot 
wrote:

> Hi,
> The input data files for my spark job generated at every five minutes file
> name follows epoch time convention  as below :
>
> InputFolder/batch-147495960
> InputFolder/batch-147495990
> InputFolder/batch-147496020
> InputFolder/batch-147496050
> InputFolder/batch-147496080
> InputFolder/batch-147496110
> InputFolder/batch-147496140
> InputFolder/batch-147496170
> InputFolder/batch-147496200
> InputFolder/batch-147496230
>
> As per requirement I need to read one month of data from current timestamp.
>
> Would really appreciate if anybody could help me .
>
> Thanks,
> Divya
>


median of groups

2016-09-26 Thread Peter Figliozzi
I'm trying to figure out a nice way to get the median of a DataFrame
column *once
it is grouped.  *

It's easy enough now to get the min, max, mean, and other things that are
part of spark.sql.functions:

df.groupBy("foo", "bar").agg(mean($"column1"))

And it's easy enough to get the median of a column before grouping, using
approxQuantile.

However approxQuantile is part of DataFrame.stat i.e. a
DataFrameStatFunctions.

Is there a way to use it inside the .agg?

Or do we need a user defined aggregation function?

Or some other way?
Stack Overflow version of the question here

.

Thanks,

Pete


Re: Writing Dataframe to CSV yields blank file called "_SUCCESS"

2016-09-26 Thread Peter Figliozzi
Thanks again Piotr.  It's good to know there are a number of options.  Once
again I'm glad I put all my workers on the same ethernet switch, as
unanticipated shuffling isn't so bad.
Sincerely,
Pete

On Mon, Sep 26, 2016 at 8:35 AM, Piotr Smoliński <
piotr.smolinski...@gmail.com> wrote:

> Best, you should write to HDFS or when you test the product with no HDFS
> available just create a shared
> filesystem (windows shares, nfs, etc.) where the data will be written.
>
> You'll still end up with many files, but this time there will be only one
> directory tree.
>
> You may reduce the number of files by:
> * combining partitions on the same executor with coalesce call
> * repartitioning the RDD (DataFrame or DataSet depending on the API you
> use)
>
> The latter one is useful when you write the data to a partitioned
> structure. Note that repartitioning
> is explicit shuffle.
>
> If you want to have only single file you need to repartition the whole RDD
> to single partition.
> Depending on the result data size it may be something that you want or do
> not want to do ;-)
>
> Regards,
> Piotr
>
>
>
> On Mon, Sep 26, 2016 at 2:30 PM, Peter Figliozzi  > wrote:
>
>> Thank you Piotr, that's what happened.  In fact, there are about 100
>> files on each worker node in a directory corresponding to the write.
>>
>> Any way to tone that down a bit (maybe 1 file per worker)?  Or, write a
>> single file somewhere?
>>
>>
>> On Mon, Sep 26, 2016 at 12:44 AM, Piotr Smoliński <
>> piotr.smolinski...@gmail.com> wrote:
>>
>>> Hi Peter,
>>>
>>> The blank file _SUCCESS indicates properly finished output operation.
>>>
>>> What is the topology of your application?
>>> I presume, you write to local filesystem and have more than one worker
>>> machine.
>>> In such case Spark will write the result files for each partition (in
>>> the worker which
>>> holds it) and complete operation writing the _SUCCESS in the driver node.
>>>
>>> Cheers,
>>> Piotr
>>>
>>>
>>> On Mon, Sep 26, 2016 at 4:56 AM, Peter Figliozzi <
>>> pete.figlio...@gmail.com> wrote:
>>>
>>>> Both
>>>>
>>>> df.write.csv("/path/to/foo")
>>>>
>>>> and
>>>>
>>>> df.write.format("com.databricks.spark.csv").save("/path/to/foo")
>>>>
>>>> results in a *blank* file called "_SUCCESS" under /path/to/foo.
>>>>
>>>> My df has stuff in it.. tried this with both my real df, and a quick df
>>>> constructed from literals.
>>>>
>>>> Why isn't it writing anything?
>>>>
>>>> Thanks,
>>>>
>>>> Pete
>>>>
>>>
>>>
>>
>


Re: Writing Dataframe to CSV yields blank file called "_SUCCESS"

2016-09-26 Thread Peter Figliozzi
Thank you Piotr, that's what happened.  In fact, there are about 100 files
on each worker node in a directory corresponding to the write.

Any way to tone that down a bit (maybe 1 file per worker)?  Or, write a
single file somewhere?


On Mon, Sep 26, 2016 at 12:44 AM, Piotr Smoliński <
piotr.smolinski...@gmail.com> wrote:

> Hi Peter,
>
> The blank file _SUCCESS indicates properly finished output operation.
>
> What is the topology of your application?
> I presume, you write to local filesystem and have more than one worker
> machine.
> In such case Spark will write the result files for each partition (in the
> worker which
> holds it) and complete operation writing the _SUCCESS in the driver node.
>
> Cheers,
> Piotr
>
>
> On Mon, Sep 26, 2016 at 4:56 AM, Peter Figliozzi  > wrote:
>
>> Both
>>
>> df.write.csv("/path/to/foo")
>>
>> and
>>
>> df.write.format("com.databricks.spark.csv").save("/path/to/foo")
>>
>> results in a *blank* file called "_SUCCESS" under /path/to/foo.
>>
>> My df has stuff in it.. tried this with both my real df, and a quick df
>> constructed from literals.
>>
>> Why isn't it writing anything?
>>
>> Thanks,
>>
>> Pete
>>
>
>


Writing Dataframe to CSV yields blank file called "_SUCCESS"

2016-09-25 Thread Peter Figliozzi
Both

df.write.csv("/path/to/foo")

and

df.write.format("com.databricks.spark.csv").save("/path/to/foo")

results in a *blank* file called "_SUCCESS" under /path/to/foo.

My df has stuff in it.. tried this with both my real df, and a quick df
constructed from literals.

Why isn't it writing anything?

Thanks,

Pete


Re: Is executor computing time affected by network latency?

2016-09-23 Thread Peter Figliozzi
See the reference on shuffles
<http://people.apache.org/~pwendell/spark-nightly/spark-master-docs/latest/programming-guide.html#shuffle-operations>,
"Spark’s mechanism for re-distributing data so that it’s grouped
differently across partitions. This typically involves copying data across
executors and machines, making the shuffle a complex and costly operation."



On Thu, Sep 22, 2016 at 4:14 PM, Soumitra Johri <
soumitra.siddha...@gmail.com> wrote:

> If your job involves a shuffle then the compute for the entire batch will
> increase with network latency. What would be interesting is to see how much
> time each task/job/stage takes.
>
> On Thu, Sep 22, 2016 at 5:11 PM Peter Figliozzi 
> wrote:
>
>> It seems to me they must communicate for joins, sorts, grouping, and so
>> forth, where the original data partitioning needs to change.  You could
>> repeat your experiment for different code snippets.  I'll bet it depends on
>> what you do.
>>
>> On Thu, Sep 22, 2016 at 8:54 AM, gusiri  wrote:
>>
>>> Hi,
>>>
>>> When I increase the network latency among spark nodes,
>>>
>>> I see compute time (=executor computing time in Spark Web UI) also
>>> increases.
>>>
>>> In the graph attached, left = latency 1ms vs right = latency 500ms.
>>>
>>> Is there any communication between worker and driver/master even 'during'
>>> executor computing? or any idea on this result?
>>>
>>>
>>> <http://apache-spark-user-list.1001560.n3.nabble.com/
>>> file/n27779/Screen_Shot_2016-09-21_at_5.png>
>>>
>>>
>>>
>>>
>>>
>>> Thank you very much in advance.
>>>
>>> //gusiri
>>>
>>>
>>>
>>>
>>> --
>>> View this message in context: http://apache-spark-user-list.
>>> 1001560.n3.nabble.com/Is-executor-computing-time-
>>> affected-by-network-latency-tp27779.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>>


Re: Is executor computing time affected by network latency?

2016-09-22 Thread Peter Figliozzi
It seems to me they must communicate for joins, sorts, grouping, and so
forth, where the original data partitioning needs to change.  You could
repeat your experiment for different code snippets.  I'll bet it depends on
what you do.

On Thu, Sep 22, 2016 at 8:54 AM, gusiri  wrote:

> Hi,
>
> When I increase the network latency among spark nodes,
>
> I see compute time (=executor computing time in Spark Web UI) also
> increases.
>
> In the graph attached, left = latency 1ms vs right = latency 500ms.
>
> Is there any communication between worker and driver/master even 'during'
> executor computing? or any idea on this result?
>
>
>  file/n27779/Screen_Shot_2016-09-21_at_5.png>
>
>
>
>
>
> Thank you very much in advance.
>
> //gusiri
>
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Is-executor-computing-time-
> affected-by-network-latency-tp27779.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Dataframe, Java: How to convert String to Vector ?

2016-09-21 Thread Peter Figliozzi
I'm sure there's another way to do it; I hope someone can show us.  I
couldn't figure out how to use `map` either.

On Wed, Sep 21, 2016 at 3:32 AM, 颜发才(Yan Facai)  wrote:

> Thanks, Peter.
> It works!
>
> Why udf is needed?
>
>
>
>
> On Wed, Sep 21, 2016 at 12:00 AM, Peter Figliozzi <
> pete.figlio...@gmail.com> wrote:
>
>> Hi Yan, I agree, it IS really confusing.  Here is the technique for
>> transforming a column.  It is very general because you can make "myConvert"
>> do whatever you want.
>>
>> import org.apache.spark.mllib.linalg.Vectors
>> val df = Seq((0, "[1,3,5]"), (1, "[2,4,6]")).toDF
>>
>> df.show()
>> // The columns were named "_1" and "_2"
>> // Very confusing, because it looks like a Scala wildcard when we refer
>> to it in code
>>
>> val myConvert = (x: String) => { Vectors.parse(x) }
>> val myConvertUDF = udf(myConvert)
>>
>> val newDf = df.withColumn("parsed", myConvertUDF(col("_2")))
>>
>> newDf.show()
>>
>> On Mon, Sep 19, 2016 at 3:29 AM, 颜发才(Yan Facai)  wrote:
>>
>>> Hi, all.
>>> I find that it's really confuse.
>>>
>>> I can use Vectors.parse to create a DataFrame contains Vector type.
>>>
>>> scala> val dataVec = Seq((0, Vectors.parse("[1,3,5]")), (1,
>>> Vectors.parse("[2,4,6]"))).toDF
>>> dataVec: org.apache.spark.sql.DataFrame = [_1: int, _2: vector]
>>>
>>>
>>> But using map to convert String to Vector throws an error:
>>>
>>> scala> val dataStr = Seq((0, "[1,3,5]"), (1, "[2,4,6]")).toDF
>>> dataStr: org.apache.spark.sql.DataFrame = [_1: int, _2: string]
>>>
>>> scala> dataStr.map(row => Vectors.parse(row.getString(1)))
>>> :30: error: Unable to find encoder for type stored in a
>>> Dataset.  Primitive types (Int, String, etc) and Product types (case
>>> classes) are supported by importing spark.implicits._  Support for
>>> serializing other types will be added in future releases.
>>>   dataStr.map(row => Vectors.parse(row.getString(1)))
>>>
>>>
>>> Dose anyone can help me,
>>> thanks very much!
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Tue, Sep 6, 2016 at 9:58 PM, Peter Figliozzi <
>>> pete.figlio...@gmail.com> wrote:
>>>
>>>> Hi Yan, I think you'll have to map the features column to a new
>>>> numerical features column.
>>>>
>>>> Here's one way to do the individual transform:
>>>>
>>>> scala> val x = "[1, 2, 3, 4, 5]"
>>>> x: String = [1, 2, 3, 4, 5]
>>>>
>>>> scala> val y:Array[Int] = x slice(1, x.length - 1) replace(",", "")
>>>> split(" ") map(_.toInt)
>>>> y: Array[Int] = Array(1, 2, 3, 4, 5)
>>>>
>>>> If you don't know about the Scala command line, just type "scala" in a
>>>> terminal window.  It's a good place to try things out.
>>>>
>>>> You can make a function out of this transformation and apply it to your
>>>> features column to make a new column.  Then add this with
>>>> Dataset.withColumn.
>>>>
>>>> See here
>>>> <http://stackoverflow.com/questions/35227568/applying-function-to-spark-dataframe-column>
>>>> on how to apply a function to a Column to make a new column.
>>>>
>>>> On Tue, Sep 6, 2016 at 1:56 AM, 颜发才(Yan Facai) 
>>>> wrote:
>>>>
>>>>> Hi,
>>>>> I have a csv file like:
>>>>> uid  mid  features   label
>>>>> 1235231[0, 1, 3, ...]True
>>>>>
>>>>> Both  "features" and "label" columns are used for GBTClassifier.
>>>>>
>>>>> However, when I read the file:
>>>>> Dataset samples = sparkSession.read().csv(file);
>>>>> The type of samples.select("features") is String.
>>>>>
>>>>> My question is:
>>>>> How to map samples.select("features") to Vector or any appropriate
>>>>> type,
>>>>> so I can use it to train like:
>>>>> GBTClassifier gbdt = new GBTClassifier()
>>>>> .setLabelCol("label")
>>>>> .setFeaturesCol("features")
>>>>> .setMaxIter(2)
>>>>> .setMaxDepth(7);
>>>>>
>>>>> Thanks.
>>>>>
>>>>
>>>>
>>>
>>
>


Re: Similar Items

2016-09-20 Thread Peter Figliozzi
Related question: is there anything that does scalable matrix
multiplication on Spark?  For example, we have that long list of vectors
and want to construct the similarity matrix:  v * T(v).  In R it would be: v
%*% t(v)
Thanks,
Pete



On Mon, Sep 19, 2016 at 3:49 PM, Kevin Mellott 
wrote:

> Hi all,
>
> I'm trying to write a Spark application that will detect similar items (in
> this case products) based on their descriptions. I've got an ML pipeline
> that transforms the product data to TF-IDF representation, using the
> following components.
>
>- *RegexTokenizer* - strips out non-word characters, results in a list
>of tokens
>- *StopWordsRemover* - removes common "stop words", such as "the",
>"and", etc.
>- *HashingTF* - assigns a numeric "hash" to each token and calculates
>the term frequency
>- *IDF* - computes the inverse document frequency
>
> After this pipeline evaluates, I'm left with a SparseVector that
> represents the inverse document frequency of tokens for each product. As a
> next step, I'd like to be able to compare each vector to one another, to
> detect similarities.
>
> Does anybody know of a straightforward way to do this in Spark? I tried
> creating a UDF (that used the Breeze linear algebra methods internally);
> however, that did not scale well.
>
> Thanks,
> Kevin
>


Re: Dataframe, Java: How to convert String to Vector ?

2016-09-20 Thread Peter Figliozzi
Hi Yan, I agree, it IS really confusing.  Here is the technique for
transforming a column.  It is very general because you can make "myConvert"
do whatever you want.

import org.apache.spark.mllib.linalg.Vectors
val df = Seq((0, "[1,3,5]"), (1, "[2,4,6]")).toDF

df.show()
// The columns were named "_1" and "_2"
// Very confusing, because it looks like a Scala wildcard when we refer to
it in code

val myConvert = (x: String) => { Vectors.parse(x) }
val myConvertUDF = udf(myConvert)

val newDf = df.withColumn("parsed", myConvertUDF(col("_2")))

newDf.show()

On Mon, Sep 19, 2016 at 3:29 AM, 颜发才(Yan Facai)  wrote:

> Hi, all.
> I find that it's really confuse.
>
> I can use Vectors.parse to create a DataFrame contains Vector type.
>
> scala> val dataVec = Seq((0, Vectors.parse("[1,3,5]")), (1,
> Vectors.parse("[2,4,6]"))).toDF
> dataVec: org.apache.spark.sql.DataFrame = [_1: int, _2: vector]
>
>
> But using map to convert String to Vector throws an error:
>
> scala> val dataStr = Seq((0, "[1,3,5]"), (1, "[2,4,6]")).toDF
> dataStr: org.apache.spark.sql.DataFrame = [_1: int, _2: string]
>
> scala> dataStr.map(row => Vectors.parse(row.getString(1)))
> :30: error: Unable to find encoder for type stored in a
> Dataset.  Primitive types (Int, String, etc) and Product types (case
> classes) are supported by importing spark.implicits._  Support for
> serializing other types will be added in future releases.
>   dataStr.map(row => Vectors.parse(row.getString(1)))
>
>
> Dose anyone can help me,
> thanks very much!
>
>
>
>
>
>
>
> On Tue, Sep 6, 2016 at 9:58 PM, Peter Figliozzi 
> wrote:
>
>> Hi Yan, I think you'll have to map the features column to a new numerical
>> features column.
>>
>> Here's one way to do the individual transform:
>>
>> scala> val x = "[1, 2, 3, 4, 5]"
>> x: String = [1, 2, 3, 4, 5]
>>
>> scala> val y:Array[Int] = x slice(1, x.length - 1) replace(",", "")
>> split(" ") map(_.toInt)
>> y: Array[Int] = Array(1, 2, 3, 4, 5)
>>
>> If you don't know about the Scala command line, just type "scala" in a
>> terminal window.  It's a good place to try things out.
>>
>> You can make a function out of this transformation and apply it to your
>> features column to make a new column.  Then add this with
>> Dataset.withColumn.
>>
>> See here
>> <http://stackoverflow.com/questions/35227568/applying-function-to-spark-dataframe-column>
>> on how to apply a function to a Column to make a new column.
>>
>> On Tue, Sep 6, 2016 at 1:56 AM, 颜发才(Yan Facai)  wrote:
>>
>>> Hi,
>>> I have a csv file like:
>>> uid  mid  features   label
>>> 1235231[0, 1, 3, ...]True
>>>
>>> Both  "features" and "label" columns are used for GBTClassifier.
>>>
>>> However, when I read the file:
>>> Dataset samples = sparkSession.read().csv(file);
>>> The type of samples.select("features") is String.
>>>
>>> My question is:
>>> How to map samples.select("features") to Vector or any appropriate type,
>>> so I can use it to train like:
>>> GBTClassifier gbdt = new GBTClassifier()
>>> .setLabelCol("label")
>>> .setFeaturesCol("features")
>>> .setMaxIter(2)
>>> .setMaxDepth(7);
>>>
>>> Thanks.
>>>
>>
>>
>


Re: distribute work (files)

2016-09-07 Thread Peter Figliozzi
It works!  Hmm, smells like some kind of linux permissions issue. Checking
this, the owner & group are the same all around, and there is global read
permission as well.  So I have no clue why it would not work with an sshfs
mounted volume.

Back to OPs question... use Spark's CSV data source instead of calling
textFile like I originally suggested.  See this StackOverflow
<http://stackoverflow.com/questions/37639956/how-to-import-multiple-csv-files-in-a-single-load>.


Good to know this is an option.  I use Cassandra for my data source and am
not running Hadoop (no reason to thus far).

Can anyone get this to work with an sshfs mounted share?

On Wed, Sep 7, 2016 at 8:48 PM, ayan guha  wrote:

> So, can you try to simulate the same without sshfs? ie, create a folder on
> /tmp/datashare and copy your files on all the machines and point
> sc.textFiles to that folder?
>
>
> On Thu, Sep 8, 2016 at 11:26 AM, Peter Figliozzi  > wrote:
>
>> All (three) of them.  It's kind of cool-- when I re-run collect() a different
>> executor will show up as first to encounter the error.
>>
>> On Wed, Sep 7, 2016 at 8:20 PM, ayan guha  wrote:
>>
>>> Hi
>>>
>>> Is it happening on all executors or one?
>>>
>>> On Thu, Sep 8, 2016 at 10:46 AM, Peter Figliozzi <
>>> pete.figlio...@gmail.com> wrote:
>>>
>>>>
>>>> Yes indeed (see below).  Just to reiterate, I am not running Hadoop.
>>>> The "curly" node name mentioned in the stacktrace is the name of one of the
>>>> worker nodes.  I've mounted the same directory "datashare" with two text
>>>> files to all worker nodes with sshfs.  The Spark documentation suggests
>>>> that this should work:
>>>>
>>>> *If using a path on the local filesystem, the file must also be
>>>> accessible at the same path on worker nodes. Either copy the file to all
>>>> workers or use a network-mounted shared file system.*
>>>>
>>>> I was hoping someone else could try this and see if it works.
>>>>
>>>> Here's what I did to generate the error:
>>>>
>>>> val data = sc.textFile("file:///home/peter/datashare/*.txt")
>>>> data.collect()
>>>>
>>>> It's working to some extent because if I put a bogus path in, I'll get
>>>> a different (correct) error (InvalidInputException: Input Pattern
>>>> file:/home/peter/ddatashare/*.txt matches 0 files).
>>>>
>>>> Here's the stack trace when I use a valid path:
>>>>
>>>> org.apache.spark.SparkException: Job aborted due to stage failure:
>>>> Task 1 in stage 18.0 failed 4 times, most recent failure: Lost task 1.3 in
>>>> stage 18.0 (TID 792, curly): java.io.FileNotFoundException: File
>>>> file:/home/peter/datashare/f1.txt does not exist
>>>> at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileSta
>>>> tus(RawLocalFileSystem.java:609)
>>>> at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInt
>>>> ernal(RawLocalFileSystem.java:822)
>>>> at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLoc
>>>> alFileSystem.java:599)
>>>> at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFi
>>>> leSystem.java:421)
>>>> at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputCheck
>>>> er.(ChecksumFileSystem.java:140)
>>>> at org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSys
>>>> tem.java:341)
>>>> at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:767)
>>>> at org.apache.hadoop.mapred.LineRecordReader.(LineRecordR
>>>> eader.java:109)
>>>> at org.apache.hadoop.mapred.TextInputFormat.getRecordReader(Tex
>>>> tInputFormat.java:67)
>>>> at org.apache.spark.rdd.HadoopRDD$$anon$1.(HadoopRDD.scala:246)
>>>> at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:209)
>>>> at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:102)
>>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>>>> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsR
>>>> DD.scala:38)
>>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>>>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
>>>> at org.apache.spark.scheduler.Task.ru

Re: distribute work (files)

2016-09-07 Thread Peter Figliozzi
All (three) of them.  It's kind of cool-- when I re-run collect() a different
executor will show up as first to encounter the error.

On Wed, Sep 7, 2016 at 8:20 PM, ayan guha  wrote:

> Hi
>
> Is it happening on all executors or one?
>
> On Thu, Sep 8, 2016 at 10:46 AM, Peter Figliozzi  > wrote:
>
>>
>> Yes indeed (see below).  Just to reiterate, I am not running Hadoop.  The
>> "curly" node name mentioned in the stacktrace is the name of one of the
>> worker nodes.  I've mounted the same directory "datashare" with two text
>> files to all worker nodes with sshfs.  The Spark documentation suggests
>> that this should work:
>>
>> *If using a path on the local filesystem, the file must also be
>> accessible at the same path on worker nodes. Either copy the file to all
>> workers or use a network-mounted shared file system.*
>>
>> I was hoping someone else could try this and see if it works.
>>
>> Here's what I did to generate the error:
>>
>> val data = sc.textFile("file:///home/peter/datashare/*.txt")
>> data.collect()
>>
>> It's working to some extent because if I put a bogus path in, I'll get a
>> different (correct) error (InvalidInputException: Input Pattern
>> file:/home/peter/ddatashare/*.txt matches 0 files).
>>
>> Here's the stack trace when I use a valid path:
>>
>> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>> 1 in stage 18.0 failed 4 times, most recent failure: Lost task 1.3 in stage
>> 18.0 (TID 792, curly): java.io.FileNotFoundException: File
>> file:/home/peter/datashare/f1.txt does not exist
>> at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileSta
>> tus(RawLocalFileSystem.java:609)
>> at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInt
>> ernal(RawLocalFileSystem.java:822)
>> at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLoc
>> alFileSystem.java:599)
>> at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFi
>> leSystem.java:421)
>> at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputCheck
>> er.(ChecksumFileSystem.java:140)
>> at org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSys
>> tem.java:341)
>> at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:767)
>> at org.apache.hadoop.mapred.LineRecordReader.(LineRecordR
>> eader.java:109)
>> at org.apache.hadoop.mapred.TextInputFormat.getRecordReader(Tex
>> tInputFormat.java:67)
>> at org.apache.spark.rdd.HadoopRDD$$anon$1.(HadoopRDD.scala:246)
>> at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:209)
>> at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:102)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsR
>> DD.scala:38)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
>> at org.apache.spark.scheduler.Task.run(Task.scala:85)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>> Executor.java:1142)
>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>> lExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>>
>>
>> On Wed, Sep 7, 2016 at 9:50 AM, Yong Zhang  wrote:
>>
>>> What error do you get? FileNotFoundException?
>>>
>>>
>>> Please paste the stacktrace here.
>>>
>>>
>>> Yong
>>>
>>>
>>> --
>>> *From:* Peter Figliozzi 
>>> *Sent:* Wednesday, September 7, 2016 10:18 AM
>>> *To:* ayan guha
>>> *Cc:* Lydia Ickler; user.spark
>>> *Subject:* Re: distribute work (files)
>>>
>>> That's failing for me.  Can someone please try this-- is this even
>>> supposed to work:
>>>
>>>- create a directory somewhere and add two text files to it
>>>- mount that directory on the Spark worker machines with sshfs
>>>- read the textfiles into one datas structure using a file URL with
>>>a wildcard
>>>
>>> Thanks,
>>>
>>> Pete
>>>
>>> On Tue, Sep 6, 2016 at 11:20 PM, ayan guha  wrote:
>>>
>>>> To access local file, try with file:// URI.
>>>>
>>>

Fwd: distribute work (files)

2016-09-07 Thread Peter Figliozzi
Yes indeed (see below).  Just to reiterate, I am not running Hadoop.  The
"curly" node name mentioned in the stacktrace is the name of one of the
worker nodes.  I've mounted the same directory "datashare" with two text
files to all worker nodes with sshfs.  The Spark documentation suggests
that this should work:

*If using a path on the local filesystem, the file must also be accessible
at the same path on worker nodes. Either copy the file to all workers or
use a network-mounted shared file system.*

I was hoping someone else could try this and see if it works.

Here's what I did to generate the error:

val data = sc.textFile("file:///home/peter/datashare/*.txt")
data.collect()

It's working to some extent because if I put a bogus path in, I'll get a
different (correct) error (InvalidInputException: Input Pattern
file:/home/peter/ddatashare/*.txt matches 0 files).

Here's the stack trace when I use a valid path:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 1
in stage 18.0 failed 4 times, most recent failure: Lost task 1.3 in stage
18.0 (TID 792, curly): java.io.FileNotFoundException: File
file:/home/peter/datashare/f1.txt does not exist
at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(
RawLocalFileSystem.java:609)
at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(
RawLocalFileSystem.java:822)
at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(
RawLocalFileSystem.java:599)
at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(
FilterFileSystem.java:421)
at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.(
ChecksumFileSystem.java:140)
at org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:341)
at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:767)
at org.apache.hadoop.mapred.LineRecordReader.(
LineRecordReader.java:109)
at org.apache.hadoop.mapred.TextInputFormat.getRecordReader(
TextInputFormat.java:67)
at org.apache.spark.rdd.HadoopRDD$$anon$1.(HadoopRDD.scala:246)
at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:209)
at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:102)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(
ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(
ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


On Wed, Sep 7, 2016 at 9:50 AM, Yong Zhang  wrote:

> What error do you get? FileNotFoundException?
>
>
> Please paste the stacktrace here.
>
>
> Yong
>
>
> --
> *From:* Peter Figliozzi 
> *Sent:* Wednesday, September 7, 2016 10:18 AM
> *To:* ayan guha
> *Cc:* Lydia Ickler; user.spark
> *Subject:* Re: distribute work (files)
>
> That's failing for me.  Can someone please try this-- is this even
> supposed to work:
>
>- create a directory somewhere and add two text files to it
>- mount that directory on the Spark worker machines with sshfs
>- read the textfiles into one datas structure using a file URL with a
>wildcard
>
> Thanks,
>
> Pete
>
> On Tue, Sep 6, 2016 at 11:20 PM, ayan guha  wrote:
>
>> To access local file, try with file:// URI.
>>
>> On Wed, Sep 7, 2016 at 8:52 AM, Peter Figliozzi > > wrote:
>>
>>> This is a great question.  Basically you don't have to worry about the
>>> details-- just give a wildcard in your call to textFile.  See the 
>>> Programming
>>> Guide <http://spark.apache.org/docs/latest/programming-guide.html> section
>>> entitled "External Datasets".  The Spark framework will distribute your
>>> data across the workers.  Note that:
>>>
>>> *If using a path on the local filesystem, the file must also be
>>>> accessible at the same path on worker nodes. Either copy the file to all
>>>> workers or use a network-mounted shared file system.*
>>>
>>>
>>> In your case this would mean the directory of files.
>>>
>>> Curiously, I cannot get this to work when I mount a directory with sshfs
>>> on all of my worker nodes.  It says "file not found" even though the file
>>> clearly exists in the specified path on all workers.   Anyone care to try
>>> and comment on this?
&

Re: distribute work (files)

2016-09-07 Thread Peter Figliozzi
That's failing for me.  Can someone please try this-- is this even supposed
to work:

   - create a directory somewhere and add two text files to it
   - mount that directory on the Spark worker machines with sshfs
   - read the textfiles into one datas structure using a file URL with a
   wildcard

Thanks,

Pete

On Tue, Sep 6, 2016 at 11:20 PM, ayan guha  wrote:

> To access local file, try with file:// URI.
>
> On Wed, Sep 7, 2016 at 8:52 AM, Peter Figliozzi 
> wrote:
>
>> This is a great question.  Basically you don't have to worry about the
>> details-- just give a wildcard in your call to textFile.  See the Programming
>> Guide <http://spark.apache.org/docs/latest/programming-guide.html> section
>> entitled "External Datasets".  The Spark framework will distribute your
>> data across the workers.  Note that:
>>
>> *If using a path on the local filesystem, the file must also be
>>> accessible at the same path on worker nodes. Either copy the file to all
>>> workers or use a network-mounted shared file system.*
>>
>>
>> In your case this would mean the directory of files.
>>
>> Curiously, I cannot get this to work when I mount a directory with sshfs
>> on all of my worker nodes.  It says "file not found" even though the file
>> clearly exists in the specified path on all workers.   Anyone care to try
>> and comment on this?
>>
>> Thanks,
>>
>> Pete
>>
>> On Tue, Sep 6, 2016 at 9:51 AM, Lydia Ickler 
>> wrote:
>>
>>> Hi,
>>>
>>> maybe this is a stupid question:
>>>
>>> I have a list of files. Each file I want to take as an input for a
>>> ML-algorithm. All files are independent from another.
>>> My question now is how do I distribute the work so that each worker
>>> takes a block of files and just runs the algorithm on them one by one.
>>> I hope somebody can point me in the right direction! :)
>>>
>>> Best regards,
>>> Lydia
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>


Re: Dataframe, Java: How to convert String to Vector ?

2016-09-07 Thread Peter Figliozzi
Here's a decent GitHub book: Mastering Apache Spark
<https://www.gitbook.com/book/jaceklaskowski/mastering-apache-spark/details>
.

I'm new at Scala too.  I found it very helpful to study the Scala language
without Spark.  The documentation found here
<http://docs.scala-lang.org/index.html> is excellent.

Pete

On Wed, Sep 7, 2016 at 1:39 AM, 颜发才(Yan Facai)  wrote:

> Hi Peter,
> I'm familiar with Pandas / Numpy in python,  while spark / scala is
> totally new for me.
> Pandas provides a detailed document, like how to slice data, parse file,
> use apply and filter function.
>
> Do spark have some more detailed document?
>
>
>
> On Tue, Sep 6, 2016 at 9:58 PM, Peter Figliozzi 
> wrote:
>
>> Hi Yan, I think you'll have to map the features column to a new numerical
>> features column.
>>
>> Here's one way to do the individual transform:
>>
>> scala> val x = "[1, 2, 3, 4, 5]"
>> x: String = [1, 2, 3, 4, 5]
>>
>> scala> val y:Array[Int] = x slice(1, x.length - 1) replace(",", "")
>> split(" ") map(_.toInt)
>> y: Array[Int] = Array(1, 2, 3, 4, 5)
>>
>> If you don't know about the Scala command line, just type "scala" in a
>> terminal window.  It's a good place to try things out.
>>
>> You can make a function out of this transformation and apply it to your
>> features column to make a new column.  Then add this with
>> Dataset.withColumn.
>>
>> See here
>> <http://stackoverflow.com/questions/35227568/applying-function-to-spark-dataframe-column>
>> on how to apply a function to a Column to make a new column.
>>
>> On Tue, Sep 6, 2016 at 1:56 AM, 颜发才(Yan Facai)  wrote:
>>
>>> Hi,
>>> I have a csv file like:
>>> uid  mid  features   label
>>> 1235231[0, 1, 3, ...]True
>>>
>>> Both  "features" and "label" columns are used for GBTClassifier.
>>>
>>> However, when I read the file:
>>> Dataset samples = sparkSession.read().csv(file);
>>> The type of samples.select("features") is String.
>>>
>>> My question is:
>>> How to map samples.select("features") to Vector or any appropriate type,
>>> so I can use it to train like:
>>> GBTClassifier gbdt = new GBTClassifier()
>>> .setLabelCol("label")
>>> .setFeaturesCol("features")
>>> .setMaxIter(2)
>>> .setMaxDepth(7);
>>>
>>> Thanks.
>>>
>>
>>
>


Re: distribute work (files)

2016-09-06 Thread Peter Figliozzi
This is a great question.  Basically you don't have to worry about the
details-- just give a wildcard in your call to textFile.  See the Programming
Guide  section
entitled "External Datasets".  The Spark framework will distribute your
data across the workers.  Note that:

*If using a path on the local filesystem, the file must also be accessible
> at the same path on worker nodes. Either copy the file to all workers or
> use a network-mounted shared file system.*


In your case this would mean the directory of files.

Curiously, I cannot get this to work when I mount a directory with sshfs on
all of my worker nodes.  It says "file not found" even though the file
clearly exists in the specified path on all workers.   Anyone care to try
and comment on this?

Thanks,

Pete

On Tue, Sep 6, 2016 at 9:51 AM, Lydia Ickler 
wrote:

> Hi,
>
> maybe this is a stupid question:
>
> I have a list of files. Each file I want to take as an input for a
> ML-algorithm. All files are independent from another.
> My question now is how do I distribute the work so that each worker takes
> a block of files and just runs the algorithm on them one by one.
> I hope somebody can point me in the right direction! :)
>
> Best regards,
> Lydia
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Dataframe, Java: How to convert String to Vector ?

2016-09-06 Thread Peter Figliozzi
Hi Yan, I think you'll have to map the features column to a new numerical
features column.

Here's one way to do the individual transform:

scala> val x = "[1, 2, 3, 4, 5]"
x: String = [1, 2, 3, 4, 5]

scala> val y:Array[Int] = x slice(1, x.length - 1) replace(",", "") split("
") map(_.toInt)
y: Array[Int] = Array(1, 2, 3, 4, 5)

If you don't know about the Scala command line, just type "scala" in a
terminal window.  It's a good place to try things out.

You can make a function out of this transformation and apply it to your
features column to make a new column.  Then add this with
Dataset.withColumn.

See here

on how to apply a function to a Column to make a new column.

On Tue, Sep 6, 2016 at 1:56 AM, 颜发才(Yan Facai)  wrote:

> Hi,
> I have a csv file like:
> uid  mid  features   label
> 1235231[0, 1, 3, ...]True
>
> Both  "features" and "label" columns are used for GBTClassifier.
>
> However, when I read the file:
> Dataset samples = sparkSession.read().csv(file);
> The type of samples.select("features") is String.
>
> My question is:
> How to map samples.select("features") to Vector or any appropriate type,
> so I can use it to train like:
> GBTClassifier gbdt = new GBTClassifier()
> .setLabelCol("label")
> .setFeaturesCol("features")
> .setMaxIter(2)
> .setMaxDepth(7);
>
> Thanks.
>


Re: What do I loose if I run spark without using HDFS or Zookeeper?

2016-08-25 Thread Peter Figliozzi
Spark is a parallel computing framework.  There are many ways to give it
data to chomp down on.  If you don't know why you would need HDFS, then you
don't need it.  Same goes for Zookeeper.  Spark works fine without either.

Much of what we read online comes from people with specialized problems and
requirements (such as maintaining a 'highly available' service, or
accessing an existing HDFS).  It can be extremely confusing to the dude who
just needs to do some parallel computing.

Pete

On Wed, Aug 24, 2016 at 3:54 PM, kant kodali  wrote:

> What do I loose if I run spark without using HDFS or Zookeper ? which of
> them is almost a must in practice?
>