Re: Spark In Memory Shuffle / 5403

2018-10-19 Thread Peter Rudenko
Hi Peter, we're using a part of Crail - it's core library, called disni (
https://github.com/zrlio/disni/). We couldn't reproduce results from that
blog post, any case Crail is more platformic approach (it comes with it's
own file system), while SparkRdma is a pluggable approach - it's just a
plugin, that you can enable/disable for a particular workload, you can use
any hadoop vendor, etc.

The best optimization for shuffle between local jvms could be using
something like short circuit local read (
https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/ShortCircuitLocalReads.html)
to use unix socket for local communication or just directly read a part
from other's jvm shuffle file. But yes, it's not available in spark out of
box.

Thanks,
Peter Rudenko

пт, 19 жовт. 2018 о 16:54 Peter Liu  пише:

> Hi Peter,
>
> thank you for the reply and detailed information! Would this something
> comparable with Crail? (
> http://crail.incubator.apache.org/blog/2017/11/rdmashuffle.html)
> I was more looking for something simple/quick making the shuffle between
> the local jvms quicker (like the idea of using local ram disk) for my
> simple use case.
>
> of course, a general and thorough implementation should cover the shuffle
> between the nodes as major focus. hmm, looks like there is no
> implementation within spark itself yet.
>
> very much appreciated!
>
> Peter
>
> On Fri, Oct 19, 2018 at 9:38 AM Peter Rudenko 
> wrote:
>
>> Hey Peter, in SparkRDMA shuffle plugin (
>> https://github.com/Mellanox/SparkRDMA) we're using mmap of shuffle file,
>> to do Remote Direct Memory Access. If the shuffle data is bigger then RAM,
>> Mellanox NIC support On Demand Paging, where OS invalidates translations
>> which are no longer valid due to either non-present pages or mapping
>> changes. So if you have an RDMA capable NIC (or you can try on Azure cloud
>>
>> https://azure.microsoft.com/en-us/blog/introducing-the-new-hb-and-hc-azure-vm-sizes-for-hpc/
>>  ), have a try. For network intensive apps you should get better
>> performance.
>>
>> Thanks,
>> Peter Rudenko
>>
>> чт, 18 жовт. 2018 о 18:07 Peter Liu  пише:
>>
>>> I would be very interested in the initial question here:
>>>
>>> is there a production level implementation for memory only shuffle and
>>> configurable (similar to  MEMORY_ONLY storage level,  MEMORY_OR_DISK
>>> storage level) as mentioned in this ticket,
>>> https://github.com/apache/spark/pull/5403 ?
>>>
>>> It would be a quite practical and useful option/feature. not sure what
>>> is the status of this ticket implementation?
>>>
>>> Thanks!
>>>
>>> Peter
>>>
>>> On Thu, Oct 18, 2018 at 6:51 AM ☼ R Nair 
>>> wrote:
>>>
>>>> Thanks..great info. Will try and let all know.
>>>>
>>>> Best
>>>>
>>>> On Thu, Oct 18, 2018, 3:12 AM onmstester onmstester <
>>>> onmstes...@zoho.com> wrote:
>>>>
>>>>> create the ramdisk:
>>>>> mount tmpfs /mnt/spark -t tmpfs -o size=2G
>>>>>
>>>>> then point spark.local.dir to the ramdisk, which depends on your
>>>>> deployment strategy, for me it was through SparkConf object before passing
>>>>> it to SparkContext:
>>>>> conf.set("spark.local.dir","/mnt/spark")
>>>>>
>>>>> To validate that spark is actually using your ramdisk (by default it
>>>>> uses /tmp), ls the ramdisk after running some jobs and you should see 
>>>>> spark
>>>>> directories (with date on directory name) on your ramdisk
>>>>>
>>>>>
>>>>> Sent using Zoho Mail <https://www.zoho.com/mail/>
>>>>>
>>>>>
>>>>>  On Wed, 17 Oct 2018 18:57:14 +0330 *☼ R Nair
>>>>> >* wrote 
>>>>>
>>>>> What are the steps to configure this? Thanks
>>>>>
>>>>> On Wed, Oct 17, 2018, 9:39 AM onmstester onmstester <
>>>>> onmstes...@zoho.com.invalid> wrote:
>>>>>
>>>>>
>>>>> Hi,
>>>>> I failed to config spark for in-memory shuffle so currently just
>>>>> using linux memory mapped directory (tmpfs) as working directory of spark,
>>>>> so everything is fast
>>>>>
>>>>> Sent using Zoho Mail <https://www.zoho.com/mail/>
>>>>>
>>>>>
>>>>>
>>>>>


Re: Spark In Memory Shuffle / 5403

2018-10-19 Thread Peter Rudenko
Hey Peter, in SparkRDMA shuffle plugin (
https://github.com/Mellanox/SparkRDMA) we're using mmap of shuffle file, to
do Remote Direct Memory Access. If the shuffle data is bigger then RAM,
Mellanox NIC support On Demand Paging, where OS invalidates translations
which are no longer valid due to either non-present pages or mapping
changes. So if you have an RDMA capable NIC (or you can try on Azure cloud
https://azure.microsoft.com/en-us/blog/introducing-the-new-hb-and-hc-azure-vm-sizes-for-hpc/
 ), have a try. For network intensive apps you should get better
performance.

Thanks,
Peter Rudenko

чт, 18 жовт. 2018 о 18:07 Peter Liu  пише:

> I would be very interested in the initial question here:
>
> is there a production level implementation for memory only shuffle and
> configurable (similar to  MEMORY_ONLY storage level,  MEMORY_OR_DISK
> storage level) as mentioned in this ticket,
> https://github.com/apache/spark/pull/5403 ?
>
> It would be a quite practical and useful option/feature. not sure what is
> the status of this ticket implementation?
>
> Thanks!
>
> Peter
>
> On Thu, Oct 18, 2018 at 6:51 AM ☼ R Nair 
> wrote:
>
>> Thanks..great info. Will try and let all know.
>>
>> Best
>>
>> On Thu, Oct 18, 2018, 3:12 AM onmstester onmstester 
>> wrote:
>>
>>> create the ramdisk:
>>> mount tmpfs /mnt/spark -t tmpfs -o size=2G
>>>
>>> then point spark.local.dir to the ramdisk, which depends on your
>>> deployment strategy, for me it was through SparkConf object before passing
>>> it to SparkContext:
>>> conf.set("spark.local.dir","/mnt/spark")
>>>
>>> To validate that spark is actually using your ramdisk (by default it
>>> uses /tmp), ls the ramdisk after running some jobs and you should see spark
>>> directories (with date on directory name) on your ramdisk
>>>
>>>
>>> Sent using Zoho Mail <https://www.zoho.com/mail/>
>>>
>>>
>>>  On Wed, 17 Oct 2018 18:57:14 +0330 *☼ R Nair
>>> >* wrote 
>>>
>>> What are the steps to configure this? Thanks
>>>
>>> On Wed, Oct 17, 2018, 9:39 AM onmstester onmstester <
>>> onmstes...@zoho.com.invalid> wrote:
>>>
>>>
>>> Hi,
>>> I failed to config spark for in-memory shuffle so currently just
>>> using linux memory mapped directory (tmpfs) as working directory of spark,
>>> so everything is fast
>>>
>>> Sent using Zoho Mail <https://www.zoho.com/mail/>
>>>
>>>
>>>
>>>


Re: [Yarn] Spark AMs dead lock

2016-04-06 Thread Peter Rudenko
It doesn't matter - just an example. Imagine yarn cluster with 100GB of 
ram and i submit simultaneously a lot of jobs in a loop.


Thanks,
Peter Rudenko

On 4/6/16 7:22 PM, Ted Yu wrote:

Which hadoop release are you using ?

bq. yarn cluster with 2GB RAM

I assume 2GB is per node. Isn't this too low for your use case ?

Cheers

On Wed, Apr 6, 2016 at 9:19 AM, Peter Rudenko <mailto:petro.rude...@gmail.com>> wrote:


Hi i have a situation, say i have a yarn cluster with 2GB RAM. I'm
submitting 2 spark jobs with "driver-memory 1GB --num-executors 2
--executor-memory 1GB". So i see 2 spark AM running, but they are
unable to allocate workers containers and start actual job. And
they are hanging for a while. Is it possible to set some sort of
timeout for acquiring executors otherwise kill application?

Thanks,
Peter Rudenko

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
<mailto:user-unsubscr...@spark.apache.org>
For additional commands, e-mail: user-h...@spark.apache.org
<mailto:user-h...@spark.apache.org>






[Yarn] Spark AMs dead lock

2016-04-06 Thread Peter Rudenko
Hi i have a situation, say i have a yarn cluster with 2GB RAM. I'm 
submitting 2 spark jobs with "driver-memory 1GB --num-executors 2 
--executor-memory 1GB". So i see 2 spark AM running, but they are unable 
to allocate workers containers and start actual job. And they are 
hanging for a while. Is it possible to set some sort of timeout for 
acquiring executors otherwise kill application?


Thanks,
Peter Rudenko

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark.ml : eval model outside sparkContext

2016-03-16 Thread Peter Rudenko
Hi Emmanuel, looking for a similar solution. For now found only: 
https://github.com/truecar/mleap


Thanks,
Peter Rudenko

On 3/16/16 12:47 AM, Emmanuel wrote:

Hello,

In MLLib with Spark 1.4, I was able to eval a model by loading it and 
using `predict` on a vector of features.

I would train on Spark but use my model on my workflow.


In `spark.ml` it seems like the only way to eval is to use `transform` 
which only takes a DataFrame.
To build a DataFrame i need a sparkContext or SQLContext, so it 
doesn't seem to be possible to eval outside of Spark.



*Is there either a way to build a DataFrame without a sparkContext, or 
predict with a vector or list of features without a DataFrame?*

*
*
Thanks




Re: [Yarn] Executor cores isolation

2015-11-10 Thread Peter Rudenko
As i've tried cgroups - seems the isolation is done by percantage not by 
cores number. E.g. i've set min share to 256 - i still see all 8 cores, 
but i could only load only 20% of each core.


Thanks,
Peter Rudenko
On 2015-11-10 15:52, Saisai Shao wrote:
From my understanding, it depends on whether you enabled CGroup 
isolation or not in Yarn. By default it is not, which means you could 
allocate one core but bump a lot of thread in your task to occupy the 
CPU resource, this is just a logic limitation. For Yarn CPU isolation 
you may refer to this post 
(http://hortonworks.com/blog/apache-hadoop-yarn-in-hdp-2-2-isolation-of-cpu-resources-in-your-hadoop-yarn-clusters/). 



Thanks
Jerry

On Tue, Nov 10, 2015 at 9:33 PM, Peter Rudenko 
mailto:petro.rude...@gmail.com>> wrote:


Hi i have a question: how does the cores isolation works on spark
on yarn. E.g. i have a machine with 8 cores, but launched a worker
with --executor-cores 1, and after doing something like:

rdd.foreachPartition(=>{for all visible cores: burn core in a new
tread})

Will it see 1 core or all 8 cores?

    Thanks,
Peter Rudenko






[Yarn] Executor cores isolation

2015-11-10 Thread Peter Rudenko
Hi i have a question: how does the cores isolation works on spark on 
yarn. E.g. i have a machine with 8 cores, but launched a worker with 
--executor-cores 1, and after doing something like:


rdd.foreachPartition(=>{for all visible cores: burn core in a new tread})

Will it see 1 core or all 8 cores?

Thanks,
Peter Rudenko



[Yarn] How to set user in ContainerLaunchContext?

2015-11-02 Thread Peter Rudenko
Hi, i have an ApplicationMaster which accepts requests and launches 
container on which it launches spark-submit --master yarn. In request i 
have a field "username" - the user i want to laucnh a job from. How can 
i set a user which will be run conmmand on a container? Currently they 
all running as yarn user even though AM is running as a root user.


Here's my code:


private def setupTokens(username:String): ByteBuffer = {
  val credentials = UserGroupInformation.createProxyUser(username, 
UserGroupInformation.getCurrentUser).getCredentials
  val dob =new DataOutputBuffer();
  credentials.writeTokenStorageToStream(dob);
  ByteBuffer.wrap(dob.getData(),0, dob.getLength()).duplicate();
}

val cCLC = Records.newRecord(classOf[ContainerLaunchContext])

cCLC.setCommands(List("spark-submit --master yarn ..."))

cCLC.setTokens(setupTokens(user))

Thanks, Peter Rudenko


input file from tar.gz

2015-09-29 Thread Peter Rudenko
Hi, i have a huge tar.gz file on dfs. This file contains several files, 
but i want to use only one of them as input. Is it possible to filter 
somehow a tar.gz schema, something like this:

sc.textFile("hdfs:///data/huge.tar.gz#input.txt")

Thanks,
Peter Rudenko


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Input size increasing every iteration of gradient boosted trees [1.4]

2015-09-03 Thread Peter Rudenko
Confirm, having the same issue (1.4.1 mllib package). For smaller 
dataset accuracy degradeted also. Haven’t tested yet in 1.5 with ml 
package implementation.


|val boostingStrategy = BoostingStrategy.defaultParams("Classification") 
boostingStrategy.setNumIterations(30) 
boostingStrategy.setLearningRate(1.0) 
boostingStrategy.treeStrategy.setMaxDepth(3) 
boostingStrategy.treeStrategy.setMaxBins(128) 
boostingStrategy.treeStrategy.setSubsamplingRate(1.0) 
boostingStrategy.treeStrategy.setMinInstancesPerNode(1) 
boostingStrategy.treeStrategy.setUseNodeIdCache(true) 
boostingStrategy.treeStrategy.setCategoricalFeaturesInfo( 
mapAsJavaMap(categoricalFeatures).asInstanceOf[java.util.Map[java.lang.Integer, 
java.lang.Integer]]) val model = GradientBoostedTrees.train(instances, 
boostingStrategy) |


Thanks,
Peter Rudenko

On 2015-08-14 00:33, Sean Owen wrote:


Not that I have any answer at this point, but I was discussing this
exact same problem with Johannes today. An input size of ~20K records
was growing each iteration by ~15M records. I could not see why on a
first look.

@jkbradley I know it's not much info but does that ring any bells? I
think Johannes even has an instance of this up and running for
examination.

On Thu, Aug 13, 2015 at 10:04 PM, Matt Forbes
 wrote:

I am training a boosted trees model on a couple million input samples (with
around 300 features) and am noticing that the input size of each stage is
increasing each iteration. For each new tree, the first step seems to be
building the decision tree metadata, which does a .count() on the input
data, so this is the step I've been using to track the input size changing.
Here is what I'm seeing:

count at DecisionTreeMetadata.scala:111
1. Input Size / Records: 726.1 MB / 1295620
2. Input Size / Records: 106.9 GB / 64780816
3. Input Size / Records: 160.3 GB / 97171224
4. Input Size / Records: 214.8 GB / 129680959
5. Input Size / Records: 268.5 GB / 162533424

Input Size / Records: 1912.6 GB / 1382017686


This step goes from taking less than 10s up to 5 minutes by the 15th or so
iteration. I'm not quite sure what could be causing this. I am passing a
memory-only cached RDD[LabeledPoint] to GradientBoostedTrees.train

Does anybody have some insight? Is this a bug or could it be an error on my
part?

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org


​


Re: StringIndexer + VectorAssembler equivalent to HashingTF?

2015-08-07 Thread Peter Rudenko

No, here's an example:

COL1  COL2
a one
b two
a two
c three


StringIndexer.setInputCol(COL1).setOutputCol(SI1) ->

(0-> a, 1->b,2->c)
SI1
0
1
0
2

StringIndexer.setInputCol(COL2).setOutputCol(SI2) ->
(0-> one, 1->two, 2->three)
SI1
0
1
1
2

VectorAssembler.setInputCols(SI1, SI2).setOutputCol(features) ->
features
00
11
01
22


HashingTF.setNumFeatures(2).setInputCol(COL1).setOutputCol(HT1)

bucket1 bucket2
a,a,b   c

HT1
3 //Hash collision
3
3
1

Thanks,
Peter Rudenko
On 2015-08-07 09:55, praveen S wrote:


Is StringIndexer + VectorAssembler equivalent to HashingTF while 
converting the document for analysis?





-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Delete NA in a dataframe

2015-08-04 Thread Peter Rudenko

Hi Clark,
the problem is that in this dataset null values represented as NA 
marker. Spark-csv doesn't have configurable null values marker (i've 
made a PR with it some time ago: 
https://github.com/databricks/spark-csv/pull/76).


So one option for you is to do post filtering, something like this:

val rv = allyears2k.filter("COLUMN != `NA`")

Thanks,
Peter Rudenko
On 2015-08-04 15:03, clark djilo kuissu wrote:

Hello,

I try to magage NA in this dataset. I import my dataset with the 
com.databricks.spark.csv package


When I do this: allyears2k.na.drop() I have no result.

Can you help me please ?

Regards,

--- The dataset -

dataset: https://s3.amazonaws.com/h2o-airlines-unpacked/allyears2k.csv

---   The code -

// Prepare environment
import sys.process._
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._


val allyears2k = 
sqlContext.read.format("com.databricks.spark.csv").option("header", 
"true").load("/home/clark/allyears2k.csv")

allyears2k.registerTempTable("allyears2k")

val rv = allyears2k.na.drop()





Re: what is metadata in StructField ?

2015-07-15 Thread Peter Rudenko

Hi Mathieu,
metadata is very usefull if you need to save some data about a column 
(e.g. count of null values, cardinality, domain, min/max/std, etc.). 
It's currently used in ml package in attributes: 
https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/attribute/attributes.scala


Take a look how i'm using metadata to get summary statistics from h2o: 
https://github.com/h2oai/sparkling-water/pull/17/files


Let me know if you'll have questions.

Thanks,
Peter Rudenko

On 2015-07-15 12:48, matd wrote:

I see in StructField that we can provide metadata.

What is it meant for ?  How is it used by Spark later on ?
Are there any rules on what we can/cannot do with it ?

I'm building some DataFrame processing, and I need to maintain a set of
(meta)data along with the DF. I was wondering if I can use
StructField.metadata for this use, or if I should build my own structure.

Mathieu



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/what-is-metadata-in-StructField-tp23854.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to restrict disk space for spark caches on yarn?

2015-07-13 Thread Peter Rudenko
Hi Andrew, here's what i found. Maybe would be relevant for people with 
the same issue:


1) There's 3 types of local resources in YARN (public, private, 
application). More about it here: 
http://hortonworks.com/blog/management-of-application-dependencies-in-yarn/


2) Spark cache is of application type of resource.

3) Currently it's not possible to specify quota for application 
resources (https://issues.apache.org/jira/browse/YARN-882)


4) The only it's possible to specify these 2 settings:
yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage 
- The maximum percentage of disk space utilization allowed after which a 
disk is marked as bad. Values can range from 0.0 to 100.0. If the value 
is greater than or equal to 100, the nodemanager will check for full 
disk. This applies to yarn-nodemanager.local-dirs and 
yarn.nodemanager.log-dirs.


yarn.nodemanager.disk-health-checker.min-free-space-per-disk-mb - The 
minimum space that must be available on a disk for it to be used. This 
applies to yarn-nodemanager.local-dirs and yarn.nodemanager.log-dirs.


5) Yarn's cache cleanup doesn't cleaned app resources: 
https://github.com/apache/hadoop/blob/8d58512d6e6d9fe93784a9de2af0056bcc316d96/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java#L511


As i understood application resources cleaned when spark application 
correctly terminates (using sc.stop()). But in my case when it fills all 
disk space it was stucked and couldn't stop correctly. After i restarted 
yarn i don't know how easily trigger cache cleanup except of manually on 
all the nodes.


Thanks,
Peter Rudenko

On 2015-07-10 20:07, Andrew Or wrote:

Hi Peter,

AFAIK Spark assumes infinite disk space, so there isn't really a way 
to limit how much space it uses. Unfortunately I'm not aware of a 
simpler workaround than to simply provision your cluster with more 
disk space. By the way, are you sure that it's disk space that 
exceeded the limit, but not the number of inodes? If it's the latter, 
maybe you could control the ulimit of the container.


To answer your other question: if it can't persist to disk then yes it 
will fail. It will only recompute from the data source if for some 
reason someone evicted our blocks from memory, but that shouldn't 
happen in your case since your'e using MEMORY_AND_DISK_SER.


-Andrew


2015-07-10 3:51 GMT-07:00 Peter Rudenko <mailto:petro.rude...@gmail.com>>:


Hi, i have a spark ML worklflow. It uses some persist calls. When
i launch it with 1 tb dataset - it puts down all cluster, becauses
it fills all disk space at /yarn/nm/usercache/root/appcache:
http://i.imgur.com/qvRUrOp.png

I found a yarn settings:
/yarn/.nodemanager.localizer./cache/.target-size-mb - Target size
of localizer cache in MB, per nodemanager. It is a target
retention size that only includes resources with PUBLIC and
PRIVATE visibility and excludes resources with APPLICATION visibility

But it excludes resources with APPLICATION visibility, and spark
cache as i understood is of APPLICATION type.

Is it possible to restrict a disk space for spark application?
Will spark fail if it wouldn't be able to persist on disk
(StorageLevel.MEMORY_AND_DISK_SER) or it would recompute from data
source?

Thanks,
Peter Rudenko









How to restrict disk space for spark caches on yarn?

2015-07-10 Thread Peter Rudenko
Hi, i have a spark ML worklflow. It uses some persist calls. When i 
launch it with 1 tb dataset - it puts down all cluster, becauses it 
fills all disk space at /yarn/nm/usercache/root/appcache: 
http://i.imgur.com/qvRUrOp.png


I found a yarn settings:
/yarn/.nodemanager.localizer./cache/.target-size-mb - Target size of 
localizer cache in MB, per nodemanager. It is a target retention size 
that only includes resources with PUBLIC and PRIVATE visibility and 
excludes resources with APPLICATION visibility


But it excludes resources with APPLICATION visibility, and spark cache 
as i understood is of APPLICATION type.


Is it possible to restrict a disk space for spark application? Will 
spark fail if it wouldn't be able to persist on disk 
(StorageLevel.MEMORY_AND_DISK_SER) or it would recompute from data source?


Thanks,
Peter Rudenko






Re: MLLib- Probabilities with LogisticRegression

2015-06-30 Thread Peter Rudenko

Hi Klaus, you can use new ml api with dataframe:

val model = (new 
LogisticRegresion).setInputCol("fetures").setProbabilityCol("probability").setOutputCol("prediction").fit(data)


Thanks,
Peter Rudenko

On 2015-06-30 14:00, Klaus Schaefers wrote:

Hello,

is there a way to get the during the predict() phase also the class 
probabilities like I would get in sklearn?



Cheers,

Klaus

--

--

Klaus Schaefers
Senior Optimization Manager

Ligatus GmbH
Hohenstaufenring 30-32
D-50674 Köln

Tel.:  +49 (0) 221 / 56939 -784
Fax:  +49 (0) 221 / 56 939 - 599
E-Mail: klaus.schaef...@ligatus.com <mailto:klaus.schaef...@ligatus.com>
Web: www.ligatus.de <http://www.ligatus.de/>

HRB Köln 56003
Geschäftsführung:
Dipl.-Kaufmann Lars Hasselbach, Dipl.-Kaufmann Klaus Ludemann, 
Dipl.-Wirtschaftsingenieur Arne Wolter




Re: Using Spark on Azure Blob Storage

2015-06-25 Thread Peter Rudenko
Hi Daniel, yes it supported, however you need to add hadoop-azure.jar to 
classpath of spark shell 
(http://search.maven.org/#search%7Cga%7C1%7Chadoop-azure - it's 
available only for hadoop-2.7.0). Try to find it on your node and run: 
export CLASSPATH=$CLASSPATH:hadoop-azure.jar && spark-shell


Thanks,
Peter Rudenko

On 2015-06-25 20:37, Daniel Haviv wrote:

Hi,
I'm trying to use spark over Azure's HDInsight but the spark-shell 
fails when starting:

java.io.IOException: No FileSystem for scheme: wasb
at 
org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2584)
at 
org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591)

at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
at 
org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630)


Is Azure's blob storage supported ?

Thanks,
Daniel



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Parallel parameter tuning: distributed execution of MLlib algorithms

2015-06-17 Thread Peter Rudenko

Hi, here's how to get Parrallel search pipleine:

package org.apache.spark.ml.pipeline

import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.sql._

class ParralelGridSearchPipelineextends Pipeline {

  override def fit(dataset: DataFrame, paramMaps: 
Array[ParamMap]):Seq[PipelineModel] = {
  paramMaps.par.map(fit(dataset, _)).toVector
  }

}

For this you need: 1) Make sure you have a lot of RAM, since for each 
parameter you need to cache label points in LR (i've made a bit 
different - first run sequentially for the first param - cache instances 
and after run in parralell. You can check prototype here: 
https://issues.apache.org/jira/browse/SPARK-5844?focusedCommentId=14323253&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14323253) 
2) Set spark.scheduler.mode="FAIR" othervise tasks submitted within same 
context would execute in FIFO mode - so no parrallelizm 3) Also probably 
would need to configure pool to use FAIR scheduler also 
(http://spark.apache.org/docs/latest/job-scheduling.html#configuring-pool-properties) 
What i'm currently looking is to have fork/join pipeline. I have 2 
separate branches in my DAG pipeline - to process Numeric columns and to 
process categorical columns and after merge everything together with 
VectorAssembler. But i want this 2 branches to process in parallel. Also 
looking to define a bunch of different crossvalidators that uses other 
technics than grid search (random search crossvalidator, bayesian 
optimization CV, etc.). Thanks, Peter Rudenko On 2015-06-18 01:58, 
Xiangrui Meng wrote:

On Fri, May 22, 2015 at 6:15 AM, Hugo Ferreira  wrote:

Hi,

I am currently experimenting with linear regression (SGD) (Spark + MLlib,
ver. 1.2). At this point in time I need to fine-tune the hyper-parameters. I
do this (for now) by an exhaustive grid search of the step size and the
number of iterations. Currently I am on a dual core that acts as a master
(local mode for now but will be adding spark worker later). In order to
maximize throughput I need to execute each execution of the linear
regression algorithm in parallel.


How big is your dataset? If it is small or medium-sized, you might get better
performance by broadcasting the entire dataset and use a single machine solver
on each workers.


According to the documentation it seems like parallel jobs may be scheduled
if they are executed in separate threads [1]. So this brings me to my first
question: does this mean I am CPU bound by the Spark master? In other words
the maximum number of jobs = maximum number of threads of the OS?


We use the driver to collect model updates. Increasing the number of
parallel jobs
also increasing the driver load for both communication and computation. I don't
think you need to worry much about the max number of threads, which is usually
much larger than the number of parallel jobs we can actually run.


I searched the mailing list but did not find anything regarding MLlib
itself. I even peaked into the new MLlib API that uses pipelines and has
support for parameter tuning. However, it looks like each job (instance of
the learning algorithm) is executed in sequence. Can anyone confirm this?
This brings me to my 2ndo question: is their any example that shows how one
can execute MLlib algorithms as parallel jobs?


The new API is not optimized for performance yet. There is an example
here for k-means:

https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala#L393


Finally, is their any general technique I can use to execute an algorithm in
a distributed manner using Spark? More specifically I would like to have
several MLlib algorithms run in parallel. Can anyone show me an example of
sorts to do this?

TIA.
Hugo F.







[1] https://spark.apache.org/docs/1.2.0/job-scheduling.html




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Embedding your own transformer in Spark.ml Pipleline

2015-06-04 Thread Peter Rudenko
Hi Brandon, they are available, but private to ml package. They are now 
public in 1.4. For 1.3.1 you can define your transformer in 
org.apache.spark.ml package - then you could use these traits.


Thanks,
Peter Rudenko

On 2015-06-04 20:28, Brandon Plaster wrote:
Is "HasInputCol" and "HasOutputCol" available in 1.3.1? I'm getting 
the following message when I'm trying to implement a Transformer and 
importing org.apache.spark.ml.param.shared.{HasInputCol, HasOutputCol}:


error: object shared is not a member of package org.apache.spark.ml.param


and

error: trait HasInputCol in package param cannot be accessed in 
package org.apache.spark.ml.param



On Tue, Jun 2, 2015 at 1:51 PM, Peter Rudenko <mailto:petro.rude...@gmail.com>> wrote:


Hi Dimple,
take a look to existing transformers:

https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/feature/OneHotEncoder.scala

https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/feature/Tokenizer.scala

https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/feature/HashingTF.scala
(*it's for spark-1.4)

The idea is just to implement class that extends Transformer
withHasInputColwithHasOutputCol (if your transformer 1:1 column
transformer) and has

deftransform(dataset: DataFrame):DataFrame

method.

Thanks,
Peter

On 2015-06-02 20:19, dimple wrote:

Hi,
I would like to embed my own transformer in the Spark.ml Pipleline but do
not see an example of it. Can someone share an example of which
classes/interfaces I need to extend/implement in order to do so. Thanks.

Dimple



--
View this message in 
context:http://apache-spark-user-list.1001560.n3.nabble.com/Embedding-your-own-transformer-in-Spark-ml-Pipleline-tp23112.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail:user-unsubscr...@spark.apache.org
<mailto:user-unsubscr...@spark.apache.org>
For additional commands, e-mail:user-h...@spark.apache.org 
<mailto:user-h...@spark.apache.org>








Re: Embedding your own transformer in Spark.ml Pipleline

2015-06-02 Thread Peter Rudenko
I'm afraid there's no such class for 1.2.1. This API was added to 1.3.0 
AFAIK.


On 2015-06-02 21:40, Dimp Bhat wrote:

Thanks Peter. Can you share the Tokenizer.java class for Spark 1.2.1.

Dimple

On Tue, Jun 2, 2015 at 10:51 AM, Peter Rudenko 
mailto:petro.rude...@gmail.com>> wrote:


Hi Dimple,
take a look to existing transformers:

https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/feature/OneHotEncoder.scala

https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/feature/Tokenizer.scala

https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/feature/HashingTF.scala
(*it's for spark-1.4)

The idea is just to implement class that extends Transformer
withHasInputColwithHasOutputCol (if your transformer 1:1 column
transformer) and has

deftransform(dataset: DataFrame):DataFrame

method.

Thanks,
Peter
On 2015-06-02 20:19, dimple wrote:

Hi,
I would like to embed my own transformer in the Spark.ml Pipleline but do
not see an example of it. Can someone share an example of which
classes/interfaces I need to extend/implement in order to do so. Thanks.

Dimple



--
View this message in 
context:http://apache-spark-user-list.1001560.n3.nabble.com/Embedding-your-own-transformer-in-Spark-ml-Pipleline-tp23112.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail:user-unsubscr...@spark.apache.org
<mailto:user-unsubscr...@spark.apache.org>
For additional commands, e-mail:user-h...@spark.apache.org 
<mailto:user-h...@spark.apache.org>








Re: Embedding your own transformer in Spark.ml Pipleline

2015-06-02 Thread Peter Rudenko

Hi Dimple,
take a look to existing transformers:
https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/feature/OneHotEncoder.scala
https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/feature/Tokenizer.scala
https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/feature/HashingTF.scala
(*it's for spark-1.4)

The idea is just to implement class that extends Transformer 
withHasInputColwithHasOutputCol (if your transformer 1:1 column 
transformer) and has


deftransform(dataset: DataFrame):DataFrame

method.

Thanks,
Peter
On 2015-06-02 20:19, dimple wrote:

Hi,
I would like to embed my own transformer in the Spark.ml Pipleline but do
not see an example of it. Can someone share an example of which
classes/interfaces I need to extend/implement in order to do so. Thanks.

Dimple



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Embedding-your-own-transformer-in-Spark-ml-Pipleline-tp23112.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org





Re: Dataframe random permutation?

2015-06-01 Thread Peter Rudenko

Hi Cesar,
try to do:

hc.createDataFrame(df.rdd.coalesce(NUM_PARTITIONS, shuffle =true),df.schema) 
It's a bit inefficient, but should shuffle the whole dataframe.

Thanks,
Peter Rudenko
On 2015-06-01 22:49, Cesar Flores wrote:


I would like to know what will be the best approach to randomly 
permute a Data Frame. I have tried:


df.sample(false,1.0,x).show(100)

where x is the seed. However, it gives the same result no matter the 
value of x (it only gives different values when the fraction is 
smaller than 1.0) . I have tried also:


hc.createDataFrame(df.rdd.repartition(100),df.schema)

which appears to be a random permutation. Can some one confirm me that 
the last line is in fact a random permutation, or point me out to a 
better approach?



Thanks
--
Cesar Flores




Re: [SQL][Dataframe] Change data source after saveAsParquetFile

2015-05-08 Thread Peter Rudenko

Hm, thanks.
Do you know what this setting mean: 
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala#L1178 
?


Thanks,
Peter Rudenko

On 2015-05-08 17:48, ayan guha wrote:


From S3. As the dependency of df will be on s3. And because rdds are 
not replicated.


On 8 May 2015 23:02, "Peter Rudenko" <mailto:petro.rude...@gmail.com>> wrote:


Hi, i have a next question:

|val data = sc.textFile("s3:///") val df = data.toDF
df.saveAsParquetFile("hdfs://") df.someAction(...) |

if during someAction some workers would die, would recomputation
download files from s3 or from hdfs parquet?

Thanks,
Peter Rudenko

​





[SQL][Dataframe] Change data source after saveAsParquetFile

2015-05-08 Thread Peter Rudenko

Hi, i have a next question:

|val data = sc.textFile("s3:///") val df = data.toDF 
df.saveAsParquetFile("hdfs://") df.someAction(...) |


if during someAction some workers would die, would recomputation 
download files from s3 or from hdfs parquet?


Thanks,
Peter Rudenko

​


[Ml][Dataframe] Ml pipeline & dataframe repartitioning

2015-04-24 Thread Peter Rudenko
 exceeded 15/04/24 18:04:55 
ERROR Executor: Exception in task 1.0 in stage 52.0 (TID 2237) 
java.lang.OutOfMemoryError: GC overhead limit exceeded at 
scala.collection.mutable.HashMap.createNewEntry(HashMap.scala:131) at 
scala.collection.mutable.HashMap$$anonfun$readObject$1.apply(HashMap.scala:142) 
at 
scala.collection.mutable.HashMap$$anonfun$readObject$1.apply(HashMap.scala:142) 
at scala.collection.mutable.HashTable$class.init(HashTable.scala:105) at 
scala.collection.mutable.HashMap.init(HashMap.scala:39) at 
scala.collection.mutable.HashMap.readObject(HashMap.scala:142) at 
sun.reflect.GeneratedMethodAccessor17.invoke(Unknown Source) at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:497) at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) 
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1896) 
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) 
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) 
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) 
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) 
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) 
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) 
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) 
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) 
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at 
java.io.ObjectInputStream.readArray(ObjectInputStream.java:1707) at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1345) at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) 
15/04/24 18:04:55 INFO ActorSystemImpl: starting new LARS thread 
15/04/24 18:04:55 ERROR ActorSystemImpl: Uncaught fatal error from 
thread [sparkDriver-scheduler-1] shutting down ActorSystem [sparkDriver] 
java.lang.OutOfMemoryError: GC overhead limit exceeded at 
akka.dispatch.AbstractNodeQueue.(AbstractNodeQueue.java:22) at 
akka.actor.LightArrayRevolverScheduler$TaskQueue.(Scheduler.scala:443) 
at 
akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:409) 
at 
akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375) 
at java.lang.Thread.run(Thread.java:745) |


It’s done with 4GB RAM on 2GB file in local context with 4 treads, 
(label and vector columns serialized to parquet is about 500 mb).
I’ve tried to increase default parallelism, but my transformations are 
linear: take a column and produce another column. What’s the best 
practice to handle partitions in dataframes with a lots of columns? 
Should i repartition manually after adding columns? What’s better & 
faster: Applying 30 transformers for each numeric column or combine 
these columns to 1 vector column and apply 1 transformer?


Thanks,
Peter Rudenko

​


Re: How to merge two dataframes with same schema

2015-04-22 Thread Peter Rudenko

Just use unionAll method:

df1.show()
nameid
a   1
b   2

df2.show()
nameid
c   3
d   4

df1.unionAll(df2).show()
nameid
a   1
b   2
c   3
d   4

Thanks,
Peter Rudneko

On 2015-04-22 16:38, bipin wrote:

I have looked into sqlContext documentation but there is nothing on how to
merge two data-frames. How can I do this ?

Thanks
Bipin



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-merge-two-dataframes-with-same-schema-tp22606.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Reading files from http server

2015-04-13 Thread Peter Rudenko
Hi, i want to play with Criteo 1 tb dataset. Files are located on azure 
storage. Here's a command to download them:
curl -O 
http://azuremlsampleexperiments.blob.core.windows.net/criteo/day_{`seq 
-s ‘,’ 0 23`}.gz
is there any way to read files through http protocol with spark without 
downloading them first to hdfs?. Something like this:
sc.textFile(" 
http://azuremlsampleexperiments.blob.core.windows.net/criteo/day_{0-23}.gz";), 
so it will have 24 partitions.


Thanks,
Peter Rudenko



Re: From DataFrame to LabeledPoint

2015-04-02 Thread Peter Rudenko

Hi try next code:

|val labeledPoints: RDD[LabeledPoint] = features.zip(labels).map{ case 
Row(feture1, feture2,..., label) => LabeledPoint(label, 
Vectors.dense(feature1, feature2, ...)) } |


Thanks,
Peter Rudenko

On 2015-04-02 17:17, drarse wrote:


Hello!,

I have a questions since days ago. I am working with DataFrame and with
Spark SQL I imported a jsonFile:

/val df = sqlContext.jsonFile("file.json")/

In this json I have the label and de features. I selected it:

/
val features = df.select ("feature1","feature2","feature3",...);

val labels = df.select ("cassification")/

But, now, I don't know create a LabeledPoint for RandomForest. I tried some
solutions without success. Can you help me?

Thanks for all!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/From-DataFrame-to-LabeledPoint-tp22354.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org


​


Re: Spark ML Pipeline inaccessible types

2015-03-25 Thread Peter Rudenko

Hi Martin, here’s 2 possibilities to overcome this:

1) Put your logic into org.apache.spark package in your project - then 
everything would be accessible.

2) Dirty trick:

|object SparkVector extends HashingTF { val VectorUDT: DataType = 
outputDataType } |


then you can do like this:

|StructType("vectorTypeColumn", SparkVector.VectorUDT, false)) |

Thanks,
Peter Rudenko

On 2015-03-25 13:14, zapletal-mar...@email.cz wrote:


Sean,

thanks for your response. I am familiar with /NoSuchMethodException/ 
in general, but I think it is not the case this time. The code 
actually attempts to get parameter by name using /val m = 
this.getClass.getMethodName(paramName)./


This may be a bug, but it is only a side effect caused by the real 
problem I am facing. My issue is that VectorUDT is not accessible by 
user code and therefore it is not possible to use custom ML pipeline 
with the existing Predictors (see the last two paragraphs in my first 
email).


Best Regards,
Martin

-- Původní zpráva --
Od: Sean Owen 
Komu: zapletal-mar...@email.cz
Datum: 25. 3. 2015 11:05:54
Předmět: Re: Spark ML Pipeline inaccessible types


NoSuchMethodError in general means that your runtime and compile-time
environments are different. I think you need to first make sure you
don't have mismatching versions of Spark.

On Wed, Mar 25, 2015 at 11:00 AM,  wrote:
> Hi,
>
> I have started implementing a machine learning pipeline using
Spark 1.3.0
> and the new pipelining API and DataFrames. I got to a point
where I have my
> training data set prepared using a sequence of Transformers, but
I am
> struggling to actually train a model and use it for predictions.
>
> I am getting a java.lang.NoSuchMethodException:
>
org.apache.spark.ml.regression.LinearRegression.myFeaturesColumnName()
> exception thrown at checkInputColumn method in Params trait when
using a
> Predictor (LinearRegression in my case, but that should not
matter). This
> looks like a bug - the exception is thrown when executing
getParam(colName)
> when the require(actualDataType.equals(datatype), ...)
requirement is not
> met so the expected requirement failed exception is not thrown
and is hidden
> by the unexpected NoSuchMethodException instead. I can raise a
bug if this
> really is an issue and I am not using something incorrectly.
>
> The problem I am facing however is that the Predictor expects
features to
> have VectorUDT type as defined in Predictor class (protected def
> featuresDataType: DataType = new VectorUDT). But since this type is
> private[spark] my Transformer can not prepare features with this
type which
> then correctly results in the exception above when I use a
different type.
>
> Is there a way to define a custom Pipeline that would be able to
use the
> existing Predictors without having to bypass the access modifiers or
> reimplement something or is the pipelining API not yet expected
to be used
> in this way?
>
> Thanks,
> Martin
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org


​


Re: ML Pipeline question about caching

2015-03-17 Thread Peter Rudenko

Hi Cesar,
I had a similar issue. Yes for now it’s better to do A,B,C outside a 
crossvalidator. Take a look to my comment 
<https://issues.apache.org/jira/browse/SPARK-4766?focusedCommentId=14320038&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14320038> 
and this jira <https://issues.apache.org/jira/browse/SPARK-5844>. The 
problem is that transformers could also have hyperparameters in the 
future (like word2vec transformer). Then crossvalidator would need to 
find need to find the best parameters for both transformer + estimator. 
It will blow number of combinations (num parameters for transformer 
/number parameters for estimator / number of folds).


Thanks,
Peter Rudenko

On 2015-03-18 00:26, Cesar Flores wrote:



Hello all:

I am using the ML Pipeline, which I consider very powerful. I have the 
next use case:


  * I have three transformers, which I will call A,B,C, that basically
extract features from text files, with no parameters.
  * I have a final stage D, which is the logistic regression estimator.
  * I am creating a pipeline with the sequence A,B,C,D.
  * Finally, I am using this pipeline as estimator parameter of the
CrossValidator class.

I have some concerns about how data persistance inside the cross 
validator works. For example, if only D has multiple parameters to 
tune using the cross validator, my concern is that the transformation 
A->B->C is being performed multiple times?. Is that the case, or it is 
Spark smart enough to realize that it is possible to persist the 
output of C? Do it will be better to leave A,B, and C outside the 
cross validator pipeline?


Thanks a lot
--
Cesar Flores


​


Re: Workflow layer for Spark

2015-03-13 Thread Peter Rudenko
Take a look to the new spark ml api 
<http://spark.apache.org/docs/latest/ml-guide.html> with Pipeline 
functionality and also to spark dataflow 
<https://github.com/cloudera/spark-dataflow> - Google Cloud Dataflow API 
implementation on top of spark.


Thanks,
Peter Rudenko
On 2015-03-13 17:46, Karthikeyan Muthukumar wrote:


Hi,
We are building a machine learning platform based on ML-Lib in Spark. 
We would be using Scala for the development.
We need a thin workflow layer where we can easily configure the 
different actions to be done, configuration for the actions (like 
load-data, clean-data, split-data etc), and the order in which the 
actions are to be executed. We will implement the actions themselves 
in Scala.
Is there any open source library that could be used as the think 
workflow layer here?

Thanks & Regards
MK


​


Re: Is there any Sparse Matrix implementation in Spark/MLib?

2015-02-27 Thread Peter Rudenko
Yes, it's called Coordinated 
Matrix(http://spark.apache.org/docs/latest/mllib-data-types.html#coordinatematrix) 
you need to fill it with elemets of type MatrixEntry( (Long, Long, Double))



Thanks,
Peter Rudenko
On 2015-02-27 14:01, shahab wrote:

Hi,

I just wonder if there is any Sparse Matrix implementation available 
 in Spark, so it can be used in spark application?


best,
/Shahab




Re: ML Transformer

2015-02-19 Thread Peter Rudenko

Hi Cesar,
these methods would be private until new ml api would stabilize (aprox. 
in spark 1.4). My solution for the same issue was to create 
org.apache.spark.ml package in my project and extends/implement 
everything there.


Thanks,
Peter Rudenko


On 2015-02-18 22:17, Cesar Flores wrote:


I am working right now with the ML pipeline, which I really like it. 
However in order to make a real use of it, I would like create my own 
transformers that implements org.apache.spark.ml.Transformer. In order 
to do that, a method from the PipelineStage needs to be implemented. 
But this method is private to the ml package:


private[ml] deftransformSchema(schema: StructType, paramMap: 
ParamMap):StructType


Do any user can create their own transformers? If not, do this 
functionality will be added in the future.


Thanks
--
Cesar Flores




Re: ephemeral-hdfs vs persistent-hdfs - performance

2015-02-04 Thread Peter Rudenko
Hi if i have a 10GB file on s3 and set 10 partitions, would it be 
download whole file on master first and broadcast it or each worker 
would just read it's range from the file?


Thanks,
Peter

On 2015-02-03 23:30, Sven Krasser wrote:

Hey Joe,

With the ephemeral HDFS, you get the instance store of your worker 
nodes. For m3.xlarge that will be two 40 GB SSDs local to each 
instance, which are very fast.


For the persistent HDFS, you get whatever EBS volumes the launch 
script configured. EBS volumes are always network drives, so the usual 
limitations apply. To optimize throughput, you can use EBS volumes 
with provisioned IOPS and you can use EBS optimized instances. I don't 
have hard numbers at hand, but I'd expect this to be noticeably slower 
than using local SSDs.


As far as only using S3 goes, it depends on your use case (i.e. what 
you plan on doing with the data while it is there). If you store it 
there in between running different applications, you can likely work 
around consistency issues.


Also, if you use Amazon's EMRFS to access data in S3, you can use 
their new consistency feature 
(https://aws.amazon.com/blogs/aws/emr-consistent-file-system/).


Hope this helps!
-Sven


On Tue, Feb 3, 2015 at 9:32 AM, Joe Wass > wrote:


The data is coming from S3 in the first place, and the results
will be uploaded back there. But even in the same availability
zone, fetching 170 GB (that's gzipped) is slow. From what I
understand of the pipelines, multiple transforms on the same RDD
might involve re-reading the input, which very quickly add up in
comparison to having the data locally. Unless I persisted the data
(which I am in fact doing) but that would involve storing
approximately the same amount of data in HDFS, which wouldn't fit.

Also, I understood that S3 was unsuitable for practical? See "Why
you cannot use S3 as a replacement for HDFS"[0]. I'd love to be
proved wrong, though, that would make things a lot easier.

[0] http://wiki.apache.org/hadoop/AmazonS3



On 3 February 2015 at 16:45, David Rosenstrauch mailto:dar...@darose.net>> wrote:

You could also just push the data to Amazon S3, which would
un-link the size of the cluster needed to process the data
from the size of the data.

DR


On 02/03/2015 11:43 AM, Joe Wass wrote:

I want to process about 800 GB of data on an Amazon EC2
cluster. So, I need
to store the input in HDFS somehow.

I currently have a cluster of 5 x m3.xlarge, each of which
has 80GB disk.
Each HDFS node reports 73 GB, and the total capacity is
~370 GB.

If I want to process 800 GB of data (assuming I can't
split the jobs up),
I'm guessing I need to get persistent-hdfs involved.

1 - Does persistent-hdfs have noticeably different
performance than
ephemeral-hdfs?
2 - If so, is there a recommended configuration (like
storing input and
output on persistent, but persisted RDDs on ephemeral?)

This seems like a common use-case, so sorry if this has
already been
covered.

Joe



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org

For additional commands, e-mail: user-h...@spark.apache.org






--
http://sites.google.com/site/krasser/?utm_source=sig