LynxKite is now open-source

2020-06-24 Thread Daniel Darabos
Hi,
LynxKite is a graph analytics application built on Apache Spark. (From very
early on, like Spark 0.9.) We have talked about it on occasion on Spark
Summits.

So I wanted to let you know that it's now open-source!

https://github.com/lynxkite/lynxkite

You should totally check it out if you work with graphs. But even if you
only use SparkSQL you may find it a nice (graphical) environment for data
exploration.

We're going to do development completely in the open and with the community
from now on, so any bug reports and pull requests are extremely welcome.
Cheers,
Daniel


LynxKite is now open-source

2020-06-24 Thread Daniel Darabos
Hi,
LynxKite is a graph analytics application built on Apache Spark. (From very
early on, like Spark 0.9.) We have talked about it on occasion on Spark
Summits.

So I wanted to let you know that it's now open-source!

https://github.com/lynxkite/lynxkite

You should totally check it out if you work with graphs. But even if you
only use SparkSQL you may find it a nice (graphical) environment for data
exploration.

We're going to do development completely in the open and with the community
from now on, so any bug reports and pull requests are extremely welcome.
Cheers,
Daniel


Re: Quick one on evaluation

2017-08-04 Thread Daniel Darabos
On Fri, Aug 4, 2017 at 4:36 PM, Jean Georges Perrin  wrote:

> Thanks Daniel,
>
> I like your answer for #1. It makes sense.
>
> However, I don't get why you say that there are always pending
> transformations... After you call an action, you should be "clean" from
> pending transformations, no?
>

Nope. Say you have val df = spark.read.csv("data.csv"); println(df.count +
df.count). The first "df.count" reads in the file and counts the rows. The
action was executed, but "df" still represents the same pending
transformations. The second "df.count" again reads in the file and counts
the rows. Actions do not modify DataFrames/RDDs. (The only exception is
"cache()".)


Re: Quick one on evaluation

2017-08-03 Thread Daniel Darabos
On Wed, Aug 2, 2017 at 2:16 PM, Jean Georges Perrin  wrote:

> Hi Sparkians,
>
> I understand the lazy evaluation mechanism with transformations and
> actions. My question is simpler: 1) are show() and/or printSchema()
> actions? I would assume so...
>

show() is an action (it prints data) but printSchema() is not an action.
Spark can tell you the schema of the result without computing the result.

and optional question: 2) is there a way to know if there are
> transformations "pending"?
>

There are always transformations pending :). An RDD or DataFrame is a
series of pending transformations. If you say val df =
spark.read.csv("foo.csv"), that is a pending transformation. Even
spark.emptyDataFrame is best understood as a pending transformation: it
does not do anything on the cluster, but records locally what it will have
to do on the cluster.


Re: Strongly Connected Components

2016-11-11 Thread Daniel Darabos
Hi Shreya,
GraphFrames just calls the GraphX strongly connected components code. (
https://github.com/graphframes/graphframes/blob/release-0.2.0/src/main/scala/org/graphframes/lib/StronglyConnectedComponents.scala#L51
)

For choosing the number of iterations: If the number of iterations is less
than the diameter of the graph, you may get an incorrect result. But
running for more iterations than that buys you nothing. The algorithm is
basically to broadcast your ID to all your neighbors in the first round,
and then broadcast the smallest ID that you have seen so far in the next
rounds. So with only 1 round you will get a wrong result unless each vertex
is connected to the vertex with the lowest ID in that component. (Unlikely
in a real graph.)

See
https://github.com/apache/spark/blob/v2.0.2/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala
for the actual implementation.

A better algorithm exists for this problem that only requires O(log(N))
iterations when N is the largest component diameter. (It is described in "A
Model of Computation for MapReduce",
http://www.sidsuri.com/Publications_files/mrc.pdf.) This outperforms
GraphX's implementation immensely. (See the last slide of
http://www.slideshare.net/SparkSummit/interactive-graph-analytics-daniel-darabos#33.)
The large advantage is due to the lower number of necessary iterations.

For why this is failing even with one iteration: I would first check your
partitioning. Too many or too few partitions could equally cause the issue.
If you are lucky, there is no overlap between the "too many" and "too few"
domains :).

On Fri, Nov 11, 2016 at 7:39 PM, Shreya Agarwal 
wrote:

> Tried GraphFrames. Still faced the same – job died after a few hours . The
> errors I see (And I see tons of them) are –
>
> (I ran with 3 times the partitions as well, which was 12 times number of
> executors , but still the same.)
>
>
>
> -
>
> ERROR NativeAzureFileSystem: Encountered Storage Exception for write on
> Blob : hdp/spark2-events/application_1478717432179_0021.inprogress
> Exception details: null Error Code : RequestBodyTooLarge
>
>
>
> -
>
>
>
> 16/11/11 09:21:46 ERROR TransportResponseHandler: Still have 3 requests
> outstanding when connection from /10.0.0.95:43301 is closed
>
> 16/11/11 09:21:46 INFO RetryingBlockFetcher: Retrying fetch (1/3) for 2
> outstanding blocks after 5000 ms
>
> 16/11/11 09:21:46 INFO ShuffleBlockFetcherIterator: Getting 1500 non-empty
> blocks out of 1500 blocks
>
> 16/11/11 09:21:46 ERROR OneForOneBlockFetcher: Failed while starting block
> fetches
>
> java.io.IOException: Connection from /10.0.0.95:43301 closed
>
>
>
> -
>
>
>
> 16/11/11 09:21:46 ERROR OneForOneBlockFetcher: Failed while starting block
> fetches
>
> java.lang.RuntimeException: java.io.FileNotFoundException:
> /mnt/resource/hadoop/yarn/local/usercache/shreyagrssh/
> appcache/application_1478717432179_0021/blockmgr-b1dde30d-359e-4932-b7a4-
> a5e138a52360/37/shuffle_1346_21_0.index (No such file or directory)
>
>
>
> -
>
>
>
> org.apache.spark.SparkException: Exception thrown in awaitResult
>
> at org.apache.spark.rpc.RpcTimeout$$anonfun$1.
> applyOrElse(RpcTimeout.scala:77)
>
> at org.apache.spark.rpc.RpcTimeout$$anonfun$1.
> applyOrElse(RpcTimeout.scala:75)
>
> at scala.runtime.AbstractPartialFunction.apply(
> AbstractPartialFunction.scala:36)
>
> at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.
> applyOrElse(RpcTimeout.scala:59)
>
> at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.
> applyOrElse(RpcTimeout.scala:59)
>
> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
>
> at org.apache.spark.rpc.RpcTimeout.awaitResult(
> RpcTimeout.scala:83)
>
> at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(
> RpcEndpointRef.scala:102)
>
> at org.apache.spark.executor.Executor.org$apache$spark$
> executor$Executor$$reportHeartBeat(Executor.scala:518)
>
> at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$
> 1.apply$mcV$sp(Executor.scala:547)
>
> at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$
> 1.apply(Executor.scala:547)
>
> at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$
> 1.apply(Executor.scala:547)
>
> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.
> scala:1857)
>
> at org.apache.spark.executor.Executor$$anon$1.run(Executor.
> scala:547)
>
> at java.util.concurrent.Executo

Re: Spark 2.0 - DataFrames vs Dataset performance

2016-10-24 Thread Daniel Darabos
Hi Antoaneta,
I believe the difference is not due to Datasets being slower (DataFrames
are just an alias to Datasets now), but rather using a user defined
function for filtering vs using Spark builtins. The builtin can use tricks
from Project Tungsten, such as only deserializing the "event_type" column.
The user-defined function on the other hand has to be called with a full
case class, so the whole object needs to be deserialized for each row.

Well, that is my understanding from reading some slides. Hopefully someone
with a more direct knowledge of the code will correct me if I'm wrong.

On Mon, Oct 24, 2016 at 2:50 PM, Antoaneta Marinova <
antoaneta.vmarin...@gmail.com> wrote:

> Hello,
>
> I am using Spark 2.0 for performing filtering, grouping and counting
> operations on events data saved in parquet files. As the events schema has
> very nested structure I wanted to read them as scala beans to simplify the
> code but I noticed a severe performance degradation. Below you can find
> simple comparison of the same operation using DataFrame and Dataset.
>
> val data = session.read.parquet("events_data/")
>
> // Using Datasets with custom class
>
> //Case class matching the events schema
>
> case class CustomEvent(event_id: Option[String],
>
> event_type: Option[String]
>context : Option[Context],
>
> ….
>time: Option[BigInt]) extends Serializable {}
>
> scala> val start = System.currentTimeMillis ;
>
>   val count = data.as[CustomEvent].filter(event =>
> eventNames.contains(event.event_type.get)).count ;
>
>  val time =  System.currentTimeMillis - start
>
> count: Long = 5545
>
> time: Long = 11262
>
> // Using DataFrames
>
> scala>
>
> val start = System.currentTimeMillis ;
>
> val count = data.filter(col("event_type").isin(eventNames:_*)).count ;
>
> val time =  System.currentTimeMillis - start
>
> count: Long = 5545
>
> time: Long = 147
>
> The schema of the events is something like this:
>
> //events schma
>
> schemaroot
>
> |-- event_id: string (nullable = true)
>
> |-- event_type: string (nullable = true)
>
> |-- context: struct (nullable = true)
>
> ||-- environment_1: struct (nullable = true)
>
> |||-- filed1: integer (nullable = true)
>
> |||-- filed2: integer (nullable = true)
>
> |||-- filed3: integer (nullable = true)
>
> ||-- environment_2: struct (nullable = true)
>
> |||-- filed_1: string (nullable = true)
>
> 
>
> |||-- filed_n: string (nullable = true)
>
> |-- metadata: struct (nullable = true)
>
> ||-- elements: array (nullable = true)
>
> |||-- element: struct (containsNull = true)
>
> ||||-- config: string (nullable = true)
>
> ||||-- tree: array (nullable = true)
>
> |||||-- element: struct (containsNull = true)
>
> ||||||-- path: array (nullable = true)
>
> |||||||-- element: struct (containsNull = true)
>
> ||||||||-- key: string (nullable = true)
>
> ||||||||-- name: string (nullable = true)
>
> ||||||||-- level: integer (nullable = true)
>
> |-- time: long (nullable = true)
>
> Could you please advise me on the usage of the different abstractions and
> help me understand why using datasets with user defined class is so much
> slower.
>
> Thank you,
> Antoaneta
>


Re: how to run local[k] threads on a single core

2016-08-04 Thread Daniel Darabos
You could run the application in a Docker container constrained to one CPU
with --cpuset-cpus (
https://docs.docker.com/engine/reference/run/#/cpuset-constraint).

On Thu, Aug 4, 2016 at 8:51 AM, Sun Rui  wrote:

> I don’t think it possible as Spark does not support thread to CPU affinity.
> > On Aug 4, 2016, at 14:27, sujeet jog  wrote:
> >
> > Is there a way we can run multiple tasks concurrently on a single core
> in local mode.
> >
> > for ex :- i have 5 partition ~ 5 tasks, and only a single core , i want
> these tasks to run concurrently, and specifiy them to use /run on a single
> core.
> >
> > The machine itself is say 4 core, but i want to utilize only 1 core out
> of it,.
> >
> > Is it possible ?
> >
> > Thanks,
> > Sujeet
> >
>
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Spark 1.6.2 version displayed as 1.6.1

2016-07-25 Thread Daniel Darabos
Another possible explanation is that by accident you are still running
Spark 1.6.1. Which download are you using? This is what I see:

$ ~/spark-1.6.2-bin-hadoop2.6/bin/spark-shell
log4j:WARN No appenders could be found for logger
(org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for
more info.
Using Spark's repl log4j profile:
org/apache/spark/log4j-defaults-repl.properties
To adjust logging level use sc.setLogLevel("INFO")
Welcome to
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.6.2
  /_/


On Mon, Jul 25, 2016 at 7:45 AM, Sean Owen  wrote:

> Are you certain? looks like it was correct in the release:
>
>
> https://github.com/apache/spark/blob/v1.6.2/core/src/main/scala/org/apache/spark/package.scala
>
>
>
> On Mon, Jul 25, 2016 at 12:33 AM, Ascot Moss  wrote:
> > Hi,
> >
> > I am trying to upgrade spark from 1.6.1 to 1.6.2, from 1.6.2
> spark-shell, I
> > found the version is still displayed 1.6.1
> >
> > Is this a minor typo/bug?
> >
> > Regards
> >
> >
> >
> > ###
> >
> > Welcome to
> >
> >     __
> >
> >  / __/__  ___ _/ /__
> >
> > _\ \/ _ \/ _ `/ __/  '_/
> >
> >/___/ .__/\_,_/_/ /_/\_\   version 1.6.1
> >
> >   /_/
> >
> >
> >
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: spark.executor.cores

2016-07-15 Thread Daniel Darabos
Mich's invocation is for starting a Spark application against an already
running Spark standalone cluster. It will not start the cluster for you.

We used to not use "spark-submit", but we started using it when it solved
some problem for us. Perhaps that day has also come for you? :)

On Fri, Jul 15, 2016 at 5:14 PM, Jean Georges Perrin  wrote:

> I don't use submit: I start my standalone cluster and connect to it
> remotely. Is that a bad practice?
>
> I'd like to be able to it dynamically as the system knows whether it needs
> more or less resources based on its own  context
>
> On Jul 15, 2016, at 10:55 AM, Mich Talebzadeh 
> wrote:
>
> Hi,
>
> You can also do all this at env or submit time with spark-submit which I
> believe makes it more flexible than coding in.
>
> Example
>
> ${SPARK_HOME}/bin/spark-submit \
> --packages com.databricks:spark-csv_2.11:1.3.0 \
> --driver-memory 2G \
> --num-executors 2 \
> --executor-cores 3 \
> --executor-memory 2G \
> --master spark://50.140.197.217:7077 \
> --conf "spark.scheduler.mode=FAIR" \
> --conf
> "spark.executor.extraJavaOptions=-XX:+PrintGCDetails
> -XX:+PrintGCTimeStamps" \
> --jars
> /home/hduser/jars/spark-streaming-kafka-assembly_2.10-1.6.1.jar \
> --class "${FILE_NAME}" \
> --conf "spark.ui.port=${SP}" \
>
> HTH
>
> Dr Mich Talebzadeh
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
> http://talebzadehmich.wordpress.com
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 15 July 2016 at 13:48, Jean Georges Perrin  wrote:
>
>> Merci Nihed, this is one of the tests I did :( still not working
>>
>>
>>
>> On Jul 15, 2016, at 8:41 AM, nihed mbarek  wrote:
>>
>> can you try with :
>> SparkConf conf = new SparkConf().setAppName("NC Eatery app").set(
>> "spark.executor.memory", "4g")
>> .setMaster("spark://10.0.100.120:7077");
>> if (restId == 0) {
>> conf = conf.set("spark.executor.cores", "22");
>> } else {
>> conf = conf.set("spark.executor.cores", "2");
>> }
>> JavaSparkContext javaSparkContext = new JavaSparkContext(conf);
>>
>> On Fri, Jul 15, 2016 at 2:31 PM, Jean Georges Perrin  wrote:
>>
>>> Hi,
>>>
>>> Configuration: standalone cluster, Java, Spark 1.6.2, 24 cores
>>>
>>> My process uses all the cores of my server (good), but I am trying to
>>> limit it so I can actually submit a second job.
>>>
>>> I tried
>>>
>>> SparkConf conf = new SparkConf().setAppName("NC Eatery app").set(
>>> "spark.executor.memory", "4g")
>>> .setMaster("spark://10.0.100.120:7077");
>>> if (restId == 0) {
>>> conf = conf.set("spark.executor.cores", "22");
>>> } else {
>>> conf = conf.set("spark.executor.cores", "2");
>>> }
>>> JavaSparkContext javaSparkContext = new JavaSparkContext(conf);
>>>
>>> and
>>>
>>> SparkConf conf = new SparkConf().setAppName("NC Eatery app").set(
>>> "spark.executor.memory", "4g")
>>> .setMaster("spark://10.0.100.120:7077");
>>> if (restId == 0) {
>>> conf.set("spark.executor.cores", "22");
>>> } else {
>>> conf.set("spark.executor.cores", "2");
>>> }
>>> JavaSparkContext javaSparkContext = new JavaSparkContext(conf);
>>>
>>> but it does not seem to take it. Any hint?
>>>
>>> jg
>>>
>>>
>>>
>>
>>
>> --
>>
>> M'BAREK Med Nihed,
>> Fedora Ambassador, TUNISIA, Northern Africa
>> http://www.nihed.com
>>
>> 
>>
>>
>>
>
>


Re: How to Register Permanent User-Defined-Functions (UDFs) in SparkSQL

2016-07-12 Thread Daniel Darabos
Hi Lokesh,
There is no way to do that. SqlContext.newSession documentation says:

Returns a SQLContext as new session, with separated SQL configurations,
temporary tables, registered functions, but sharing the same SparkContext,
CacheManager, SQLListener and SQLTab.

You have two options: either use the same SQLContext instead of creating
new SQLContexts, or have a function for creating SQLContexts, and this
function can also register the UDFs in every created SQLContext.

On Sun, Jul 10, 2016 at 6:14 PM, Lokesh Yadav 
wrote:

> Hi
> with sqlContext we can register a UDF like
> this: sqlContext.udf.register("sample_fn", sample_fn _ )
> But this UDF is limited to that particular sqlContext only. I wish to make
> the registration persistent, so that I can access the same UDF in any
> subsequent sqlcontext.
> Or is there any other way to register UDFs in sparkSQL so that they remain
> persistent?
>
> Regards
> Lokesh
>


Re: What is the interpretation of Cores in Spark doc

2016-06-12 Thread Daniel Darabos
Spark is a software product. In software a "core" is something that a
process can run on. So it's a "virtual core". (Do not call these "threads".
A "thread" is not something a process can run on.)

local[*] uses java.lang.Runtime.availableProcessors()
.
Since Java is software, this also returns the number of virtual cores. (You
can test this easily.)


On Sun, Jun 12, 2016 at 9:23 PM, Mich Talebzadeh 
wrote:

>
> Hi,
>
> I was writing some docs on Spark P&T and came across this.
>
> It is about the terminology or interpretation of that in Spark doc.
>
> This is my understanding of cores and threads.
>
>  Cores are physical cores. Threads are virtual cores. Cores with 2 threads
> is called hyper threading technology so 2 threads per core makes the core
> work on two loads at same time. In other words, every thread takes care of
> one load.
>
> Core has its own memory. So if you have a dual core with hyper threading,
> the core works with 2 loads each at same time because of the 2 threads per
> core, but this 2 threads will share memory in that core.
>
> Some vendors as I am sure most of you aware charge licensing per core.
>
> For example on the same host that I have Spark, I have a SAP product that
> checks the licensing and shuts the application down if the license does not
> agree with the cores speced.
>
> This is what it says
>
> ./cpuinfo
> License hostid:00e04c69159a 0050b60fd1e7
> Detected 12 logical processor(s), 6 core(s), in 1 chip(s)
>
> So here I have 12 logical processors  and 6 cores and 1 chip. I call
> logical processors as threads so I have 12 threads?
>
> Now if I go and start worker process ${SPARK_HOME}/sbin/start-slaves.sh, I
> see this in GUI page
>
> [image: Inline images 1]
>
> it says 12 cores but I gather it is threads?
>
>
> Spark document
>  states
> and I quote
>
>
> [image: Inline images 2]
>
>
>
> OK the line local[k] adds  ..  *set this to the number of cores on your
> machine*
>
>
> But I know that it means threads. Because if I went and set that to 6, it
> would be only 6 threads as opposed to 12 threads.
>
>
> the next line local[*] seems to indicate it correctly as it refers to
> "logical cores" that in my understanding it is threads.
>
>
> I trust that I am not nitpicking here!
>
>
> Cheers,
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>


Re: [REPOST] Severe Spark Streaming performance degradation after upgrading to 1.6.1

2016-06-07 Thread Daniel Darabos
On Sun, Jun 5, 2016 at 9:51 PM, Daniel Darabos <
daniel.dara...@lynxanalytics.com> wrote:

> If you fill up the cache, 1.6.0+ will suffer performance degradation from
> GC thrashing. You can set spark.memory.useLegacyMode to true, or
> spark.memory.fraction to 0.66, or spark.executor.extraJavaOptions to
> -XX:NewRatio=3 to avoid this issue.
>
> I think my colleague filed a ticket for this issue, but I can't find it
> now. So treat it like unverified rumor for now, and try it for yourself if
> you're out of better ideas :). Good luck!
>

FYI there is a ticket for this issue now, with much more details:
https://issues.apache.org/jira/browse/SPARK-15796

On Sat, Jun 4, 2016 at 11:49 AM, Cosmin Ciobanu  wrote:
>
>> Microbatch is 20 seconds. We’re not using window operations.
>>
>>
>>
>> The graphs are for a test cluster, and the entire load is artificially
>> generated by load tests (100k / 200k generated sessions).
>>
>>
>>
>> We’ve performed a few more performance tests. On the same 5 node cluster,
>> with the same application:
>>
>> · Spark 1.5.1 handled 170k+ generated sessions for 24hours with
>> no scheduling delay – the limit seems to be around 180k, above which
>> scheduling delay starts to increase;
>>
>> · Spark 1.6.1 had constant upward-trending scheduling delay from
>> the beginning for 100k+ generated sessions (this is also mentioned in the
>> initial post) – the load test was stopped after 25 minutes as scheduling
>> delay reached 3,5 minutes.
>>
>>
>>
>> P.S. Florin and I will be in SF next week, attending the Spark Summit on
>> Tuesday and Wednesday. We can meet and go into more details there - is
>> anyone working on Spark Streaming available?
>>
>>
>>
>> Cosmin
>>
>>
>>
>>
>>
>> *From: *Mich Talebzadeh 
>> *Date: *Saturday 4 June 2016 at 12:33
>> *To: *Florin Broască 
>> *Cc: *David Newberger , Adrian Tanase <
>> atan...@adobe.com>, "user@spark.apache.org" ,
>> ciobanu 
>> *Subject: *Re: [REPOST] Severe Spark Streaming performance degradation
>> after upgrading to 1.6.1
>>
>>
>>
>> batch interval I meant
>>
>>
>>
>> thx
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn  
>> *https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>>
>>
>> On 4 June 2016 at 10:32, Mich Talebzadeh 
>> wrote:
>>
>> I may have missed these but:
>>
>>
>>
>> What is the windows interval, windowsLength and SlidingWindow
>>
>>
>>
>> Has the volume of ingest data (Kafka streaming) changed recently that you
>> may not be aware of?
>>
>>
>>
>> HTH
>>
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn  
>> *https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>>
>>
>> On 4 June 2016 at 09:50, Florin Broască  wrote:
>>
>> Hi David,
>>
>>
>>
>> Thanks for looking into this. This is how the processing time looks like:
>>
>>
>>
>> [image: nline image 1]
>>
>>
>>
>> Appreciate any input,
>>
>> Florin
>>
>>
>>
>>
>>
>> On Fri, Jun 3, 2016 at 3:22 PM, David Newberger <
>> david.newber...@wandcorp.com> wrote:
>>
>> What does your processing time look like. Is it consistently within that
>> 20sec micro batch window?
>>
>>
>>
>> *David Newberger*
>>
>>
>>
>> *From:* Adrian Tanase [mailto:atan...@adobe.com]
>> *Sent:* Friday, June 3, 2016 8:14 AM
>> *To:* user@spark.apache.org
>> *Cc:* Cosmin Ciobanu
>> *Subject:* [REPOST] Severe Spark Streaming performance degradation after
>> upgrading to 1.6.1
>>
>>
>>
>> Hi all,
>>
>>
>>
>> Trying to repost this question from a colleague on my team, somehow his
>> subscription is not active:
>>
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/Severe-Spark-Streaming-performance-degradation-after-upgrading-to-1-6-1-td27056.html
>>
>>
>>
>> Appreciate any thoughts,
>>
>> -adrian
>>
>>
>>
>>
>>
>>
>>
>
>


Re: [REPOST] Severe Spark Streaming performance degradation after upgrading to 1.6.1

2016-06-05 Thread Daniel Darabos
If you fill up the cache, 1.6.0+ will suffer performance degradation from
GC thrashing. You can set spark.memory.useLegacyMode to true, or
spark.memory.fraction to 0.66, or spark.executor.extraJavaOptions to
-XX:NewRatio=3 to avoid this issue.

I think my colleague filed a ticket for this issue, but I can't find it
now. So treat it like unverified rumor for now, and try it for yourself if
you're out of better ideas :). Good luck!

On Sat, Jun 4, 2016 at 11:49 AM, Cosmin Ciobanu  wrote:

> Microbatch is 20 seconds. We’re not using window operations.
>
>
>
> The graphs are for a test cluster, and the entire load is artificially
> generated by load tests (100k / 200k generated sessions).
>
>
>
> We’ve performed a few more performance tests. On the same 5 node cluster,
> with the same application:
>
> · Spark 1.5.1 handled 170k+ generated sessions for 24hours with
> no scheduling delay – the limit seems to be around 180k, above which
> scheduling delay starts to increase;
>
> · Spark 1.6.1 had constant upward-trending scheduling delay from
> the beginning for 100k+ generated sessions (this is also mentioned in the
> initial post) – the load test was stopped after 25 minutes as scheduling
> delay reached 3,5 minutes.
>
>
>
> P.S. Florin and I will be in SF next week, attending the Spark Summit on
> Tuesday and Wednesday. We can meet and go into more details there - is
> anyone working on Spark Streaming available?
>
>
>
> Cosmin
>
>
>
>
>
> *From: *Mich Talebzadeh 
> *Date: *Saturday 4 June 2016 at 12:33
> *To: *Florin Broască 
> *Cc: *David Newberger , Adrian Tanase <
> atan...@adobe.com>, "user@spark.apache.org" ,
> ciobanu 
> *Subject: *Re: [REPOST] Severe Spark Streaming performance degradation
> after upgrading to 1.6.1
>
>
>
> batch interval I meant
>
>
>
> thx
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn  
> *https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
>
>
> On 4 June 2016 at 10:32, Mich Talebzadeh 
> wrote:
>
> I may have missed these but:
>
>
>
> What is the windows interval, windowsLength and SlidingWindow
>
>
>
> Has the volume of ingest data (Kafka streaming) changed recently that you
> may not be aware of?
>
>
>
> HTH
>
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn  
> *https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
>
>
> On 4 June 2016 at 09:50, Florin Broască  wrote:
>
> Hi David,
>
>
>
> Thanks for looking into this. This is how the processing time looks like:
>
>
>
> [image: nline image 1]
>
>
>
> Appreciate any input,
>
> Florin
>
>
>
>
>
> On Fri, Jun 3, 2016 at 3:22 PM, David Newberger <
> david.newber...@wandcorp.com> wrote:
>
> What does your processing time look like. Is it consistently within that
> 20sec micro batch window?
>
>
>
> *David Newberger*
>
>
>
> *From:* Adrian Tanase [mailto:atan...@adobe.com]
> *Sent:* Friday, June 3, 2016 8:14 AM
> *To:* user@spark.apache.org
> *Cc:* Cosmin Ciobanu
> *Subject:* [REPOST] Severe Spark Streaming performance degradation after
> upgrading to 1.6.1
>
>
>
> Hi all,
>
>
>
> Trying to repost this question from a colleague on my team, somehow his
> subscription is not active:
>
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Severe-Spark-Streaming-performance-degradation-after-upgrading-to-1-6-1-td27056.html
>
>
>
> Appreciate any thoughts,
>
> -adrian
>
>
>
>
>
>
>


Re: best way to do deep learning on spark ?

2016-03-19 Thread Daniel Darabos
On Thu, Mar 17, 2016 at 3:51 AM, charles li  wrote:

> Hi, Alexander,
>
> that's awesome, and when will that feature be released ? Since I want to
> know the opportunity cost between waiting for that release and use caffe or
> tensorFlow ?
>

I don't expect MLlib will be able to compete with major players like Caffe
or TensorFlow, at least not in the short term.

Check out https://github.com/amplab/SparkNet. It's from AMPLab (like
Spark), and it runs Caffe or TensorFlow on Spark. I think it's the state of
the art for deep learning on Spark.

great thanks again
>
> On Thu, Mar 17, 2016 at 10:32 AM, Ulanov, Alexander <
> alexander.ula...@hpe.com> wrote:
>
>> Hi Charles,
>>
>>
>>
>> There is an implementation of multilayer perceptron in Spark (since 1.5):
>>
>>
>> https://spark.apache.org/docs/latest/ml-classification-regression.html#multilayer-perceptron-classifier
>>
>>
>>
>> Other features such as autoencoder, convolutional layers, etc. are
>> currently under development. Please refer to
>> https://issues.apache.org/jira/browse/SPARK-5575
>>
>>
>>
>> Best regards, Alexander
>>
>>
>>
>> *From:* charles li [mailto:charles.up...@gmail.com]
>> *Sent:* Wednesday, March 16, 2016 7:01 PM
>> *To:* user 
>> *Subject:* best way to do deep learning on spark ?
>>
>>
>>
>>
>>
>> Hi, guys, I'm new to MLlib on spark, after reading the document, it seems
>> that MLlib does not support deep learning, I want to know is there any way
>> to implement deep learning on spark ?
>>
>>
>>
>> *Do I must use 3-party package like caffe or tensorflow ?*
>>
>>
>>
>> or
>>
>>
>>
>> *Does deep learning module list in the MLlib development plan?*
>>
>>
>>
>>
>> great thanks
>>
>>
>>
>> --
>>
>> *--*
>>
>> a spark lover, a quant, a developer and a good man.
>>
>>
>>
>> http://github.com/litaotao
>>
>
>
>
> --
> *--*
> a spark lover, a quant, a developer and a good man.
>
> http://github.com/litaotao
>


Re: SPARK-9559

2016-02-18 Thread Daniel Darabos
YARN may be a workaround.

On Thu, Feb 18, 2016 at 4:13 PM, Ashish Soni  wrote:

> Hi All ,
>
> Just wanted to know if there is any work around or resolution for below
> issue in Stand alone mode
>
> https://issues.apache.org/jira/browse/SPARK-9559
>
> Ashish
>


Re: coalesce and executor memory

2016-02-13 Thread Daniel Darabos
On Fri, Feb 12, 2016 at 11:10 PM, Koert Kuipers  wrote:

> in spark, every partition needs to fit in the memory available to the core
> processing it.
>

That does not agree with my understanding of how it works. I think you
could do sc.textFile("input").coalesce(1).map(_.replace("A",
"B")).saveAsTextFile("output") on a 1 TB local file and it would work fine.
(I just tested this with a 3 GB file with a 1 GB executor.)

RDDs are mostly implemented using iterators. For example map() operates
line-by-line, never pulling in the whole partition into memory. coalesce()
also just concatenates the iterators of the parent partitions into a new
iterator.

Some operations, like reduceByKey(), need to have the whole contents of the
partition to work. But they typically use ExternalAppendOnlyMap, so they
spill to disk instead of filling up the memory.

I know I'm not helping to answer Christopher's issue. Christopher, can you
perhaps give us an example that we can easily try in spark-shell to see the
same problem?


Re: coalesce and executor memory

2016-02-13 Thread Daniel Darabos
On Fri, Feb 12, 2016 at 11:10 PM, Koert Kuipers  wrote:

> in spark, every partition needs to fit in the memory available to the core
> processing it.
>

That does not agree with my understanding of how it works. I think you
could do sc.textFile("input").coalesce(1).map(_.replace("A",
"B")).saveAsTextFile("output") on a 1 TB local file and it would work fine.
(I just tested this with a 3 GB file with a 1 GB executor.)

RDDs are mostly implemented using iterators. For example map() operates
line-by-line, never pulling in the whole partition into memory. coalesce()
also just concatenates the iterators of the parent partitions into a new
iterator.

Some operations, like reduceByKey(), need to have the whole contents of the
partition to work. But they typically use ExternalAppendOnlyMap, so they
spill to disk instead of filling up the memory.

I know I'm not helping to answer Christopher's issue. Christopher, can you
perhaps give us an example that we can easily try in spark-shell to see the
same problem?


Re: Dataframe, Spark SQL - Drops First 8 Characters of String on Amazon EMR

2016-01-29 Thread Daniel Darabos
Hi Andrew,

If you still see this with Spark 1.6.0, it would be very helpful if you
could file a bug about it at https://issues.apache.org/jira/browse/SPARK with
as much detail as you can. This issue could be a nasty source of silent
data corruption in a case where some intermediate data loses 8 characters
but it is not obvious in the final output. Thanks!

On Fri, Jan 29, 2016 at 7:53 AM, Jonathan Kelly 
wrote:

> Just FYI, Spark 1.6 was released on emr-4.3.0 a couple days ago:
> https://aws.amazon.com/blogs/aws/emr-4-3-0-new-updated-applications-command-line-export/
>
> On Thu, Jan 28, 2016 at 7:30 PM Andrew Zurn  wrote:
>
>> Hey Daniel,
>>
>> Thanks for the response.
>>
>> After playing around for a bit, it looks like it's probably the something
>> similar to the first situation you mentioned, with the Parquet format
>> causing issues. Both programmatically created dataset and a dataset pulled
>> off the internet (rather than out of S3 and put into HDFS/Hive) acted with
>> DataFrames as one would expect (printed out everything, grouped properly,
>> etc.)
>>
>> It looks like there is more than likely an outstanding bug that causes
>> issues with data coming from S3 and is converted in the parquet format
>> (found an article here highlighting it was around in 1.4, and I guess it
>> wouldn't be out of the realm of things for it still to exist. Link to
>> article:
>> https://www.appsflyer.com/blog/the-bleeding-edge-spark-parquet-and-s3/
>>
>> Hopefully a little more stability will come out with the upcoming Spark
>> 1.6 release on EMR (I think that is happening sometime soon).
>>
>> Thanks again for the advice on where to dig further into. Much
>> appreciated.
>>
>> Andrew
>>
>> On Tue, Jan 26, 2016 at 9:18 AM, Daniel Darabos <
>> daniel.dara...@lynxanalytics.com> wrote:
>>
>>> Have you tried setting spark.emr.dropCharacters to a lower value? (It
>>> defaults to 8.)
>>>
>>> :) Just joking, sorry! Fantastic bug.
>>>
>>> What data source do you have for this DataFrame? I could imagine for
>>> example that it's a Parquet file and on EMR you are running with two wrong
>>> version of the Parquet library and it messes up strings. It should be easy
>>> enough to try a different data format. You could also try what happens if
>>> you just create the DataFrame programmatically, e.g.
>>> sc.parallelize(Seq("asdfasdfasdf")).toDF.
>>>
>>> To understand better at which point the characters are lost you could
>>> try grouping by a string attribute. I see "education" ends up either as ""
>>> (empty string) or "y" in the printed output. But are the characters already
>>> lost when you try grouping by the attribute? Will there be a single ""
>>> category, or will you have separate categories for "primary" and "tertiary"?
>>>
>>> I think the correct output through the RDD suggests that the issue
>>> happens at the very end. So it will probably happen also with different
>>> data sources, and grouping will create separate groups for "primary" and
>>> "tertiary" even though they are printed as the same string at the end. You
>>> should also check the data from "take(10)" to rule out any issues with
>>> printing. You could try the same "groupBy" trick after "take(10)". Or you
>>> could print the lengths of the strings.
>>>
>>> Good luck!
>>>
>>> On Tue, Jan 26, 2016 at 3:53 AM, awzurn  wrote:
>>>
>>>> Sorry for the bump, but wondering if anyone else has seen this before.
>>>> We're
>>>> hoping to either resolve this soon, or move on with further steps to
>>>> move
>>>> this into an issue.
>>>>
>>>> Thanks in advance,
>>>>
>>>> Andrew Zurn
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context:
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Dataframe-Spark-SQL-Drops-First-8-Characters-of-String-on-Amazon-EMR-tp26022p26065.html
>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>>
>>>> -
>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>
>>>>
>>>
>>


Re: Dataframe, Spark SQL - Drops First 8 Characters of String on Amazon EMR

2016-01-26 Thread Daniel Darabos
Have you tried setting spark.emr.dropCharacters to a lower value? (It
defaults to 8.)

:) Just joking, sorry! Fantastic bug.

What data source do you have for this DataFrame? I could imagine for
example that it's a Parquet file and on EMR you are running with two wrong
version of the Parquet library and it messes up strings. It should be easy
enough to try a different data format. You could also try what happens if
you just create the DataFrame programmatically, e.g.
sc.parallelize(Seq("asdfasdfasdf")).toDF.

To understand better at which point the characters are lost you could try
grouping by a string attribute. I see "education" ends up either as ""
(empty string) or "y" in the printed output. But are the characters already
lost when you try grouping by the attribute? Will there be a single ""
category, or will you have separate categories for "primary" and "tertiary"?

I think the correct output through the RDD suggests that the issue happens
at the very end. So it will probably happen also with different data
sources, and grouping will create separate groups for "primary" and
"tertiary" even though they are printed as the same string at the end. You
should also check the data from "take(10)" to rule out any issues with
printing. You could try the same "groupBy" trick after "take(10)". Or you
could print the lengths of the strings.

Good luck!

On Tue, Jan 26, 2016 at 3:53 AM, awzurn  wrote:

> Sorry for the bump, but wondering if anyone else has seen this before.
> We're
> hoping to either resolve this soon, or move on with further steps to move
> this into an issue.
>
> Thanks in advance,
>
> Andrew Zurn
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Dataframe-Spark-SQL-Drops-First-8-Characters-of-String-on-Amazon-EMR-tp26022p26065.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: spark 1.6.0 on ec2 doesn't work

2016-01-18 Thread Daniel Darabos
On Mon, Jan 18, 2016 at 5:24 PM, Oleg Ruchovets 
wrote:

> I thought script tries to install hadoop / hdfs also. And it looks like it
> failed. Installation is only standalone spark without hadoop. Is it correct
> behaviour?
>

Yes, it also sets up two HDFS clusters. Are they not working? Try to see if
Spark is working by running some simple jobs on it. (See
http://spark.apache.org/docs/latest/ec2-scripts.html.)

There is no program called Hadoop. If you mean YARN, then indeed the script
does not set up YARN. It sets up standalone Spark.


> Also errors in the log:
>ERROR: Unknown Tachyon version
>Error: Could not find or load main class crayondata.com.log
>

As long as Spark is working fine, you can ignore all output from the EC2
script :).


Re: spark 1.6.0 on ec2 doesn't work

2016-01-18 Thread Daniel Darabos
Hi,

How do you know it doesn't work? The log looks roughly normal to me. Is
Spark not running at the printed address? Can you not start jobs?

On Mon, Jan 18, 2016 at 11:51 AM, Oleg Ruchovets 
wrote:

> Hi ,
>I try to follow the spartk 1.6.0 to install spark on EC2.
>
> It doesn't work properly -  got exceptions and at the end standalone spark
> cluster installed.
>

The purpose of the script is to install a standalone Spark cluster. So
that's not an error :).


> here is log information:
>
> Any suggestions?
>
> Thanks
> Oleg.
>
> oleg@robinhood:~/install/spark-1.6.0-bin-hadoop2.6/ec2$ ./spark-ec2
> --key-pair=CC-ES-Demo
>  
> --identity-file=/home/oleg/work/entity_extraction_framework/ec2_pem_key/CC-ES-Demo.pem
> --region=us-east-1 --zone=us-east-1a --spot-price=0.05   -s 5
> --spark-version=1.6.0launch entity-extraction-spark-cluster
> Setting up security groups...
> Searching for existing cluster entity-extraction-spark-cluster in region
> us-east-1...
> Spark AMI: ami-5bb18832
> Launching instances...
> Requesting 5 slaves as spot instances with price $0.050
> Waiting for spot instances to be granted...
> 0 of 5 slaves granted, waiting longer
> 0 of 5 slaves granted, waiting longer
> 0 of 5 slaves granted, waiting longer
> 0 of 5 slaves granted, waiting longer
> 0 of 5 slaves granted, waiting longer
> 0 of 5 slaves granted, waiting longer
> 0 of 5 slaves granted, waiting longer
> 0 of 5 slaves granted, waiting longer
> 0 of 5 slaves granted, waiting longer
> All 5 slaves granted
> Launched master in us-east-1a, regid = r-9384033f
> Waiting for AWS to propagate instance metadata...
> Waiting for cluster to enter 'ssh-ready' state..
>
> Warning: SSH connection error. (This could be temporary.)
> Host: ec2-52-90-186-83.compute-1.amazonaws.com
> SSH return code: 255
> SSH output: ssh: connect to host ec2-52-90-186-83.compute-1.amazonaws.com
> port 22: Connection refused
>
> .
>
> Warning: SSH connection error. (This could be temporary.)
> Host: ec2-52-90-186-83.compute-1.amazonaws.com
> SSH return code: 255
> SSH output: ssh: connect to host ec2-52-90-186-83.compute-1.amazonaws.com
> port 22: Connection refused
>
> .
>
> Warning: SSH connection error. (This could be temporary.)
> Host: ec2-52-90-186-83.compute-1.amazonaws.com
> SSH return code: 255
> SSH output: ssh: connect to host ec2-52-90-186-83.compute-1.amazonaws.com
> port 22: Connection refused
>
> .
> Cluster is now in 'ssh-ready' state. Waited 442 seconds.
> Generating cluster's SSH key on master...
> Warning: Permanently added 
> 'ec2-52-90-186-83.compute-1.amazonaws.com,52.90.186.83'
> (ECDSA) to the list of known hosts.
> Connection to ec2-52-90-186-83.compute-1.amazonaws.com closed.
> Warning: Permanently added 
> 'ec2-52-90-186-83.compute-1.amazonaws.com,52.90.186.83'
> (ECDSA) to the list of known hosts.
> Transferring cluster's SSH key to slaves...
> ec2-54-165-243-74.compute-1.amazonaws.com
> Warning: Permanently added 
> 'ec2-54-165-243-74.compute-1.amazonaws.com,54.165.243.74'
> (ECDSA) to the list of known hosts.
> ec2-54-88-245-107.compute-1.amazonaws.com
> Warning: Permanently added 
> 'ec2-54-88-245-107.compute-1.amazonaws.com,54.88.245.107'
> (ECDSA) to the list of known hosts.
> ec2-54-172-29-47.compute-1.amazonaws.com
> Warning: Permanently added 
> 'ec2-54-172-29-47.compute-1.amazonaws.com,54.172.29.47'
> (ECDSA) to the list of known hosts.
> ec2-54-165-131-210.compute-1.amazonaws.com
> Warning: Permanently added 
> 'ec2-54-165-131-210.compute-1.amazonaws.com,54.165.131.210'
> (ECDSA) to the list of known hosts.
> ec2-54-172-46-184.compute-1.amazonaws.com
> Warning: Permanently added 
> 'ec2-54-172-46-184.compute-1.amazonaws.com,54.172.46.184'
> (ECDSA) to the list of known hosts.
> Cloning spark-ec2 scripts from
> https://github.com/amplab/spark-ec2/tree/branch-1.5 on master...
> Warning: Permanently added 
> 'ec2-52-90-186-83.compute-1.amazonaws.com,52.90.186.83'
> (ECDSA) to the list of known hosts.
> Cloning into 'spark-ec2'...
> remote: Counting objects: 2068, done.
> remote: Total 2068 (delta 0), reused 0 (delta 0), pack-reused 2068
> Receiving objects: 100% (2068/2068), 349.76 KiB, done.
> Resolving deltas: 100% (796/796), done.
> Connection to ec2-52-90-186-83.compute-1.amazonaws.com closed.
> Deploying files to master...
> Warning: Permanently added 
> 'ec2-52-90-186-83.compute-1.amazonaws.com,52.90.186.83'
> (ECDSA) to the list of known hosts.
> sending incremental file list
> root/spark-ec2/ec2-variables.sh
>
> sent 1,835 bytes  received 40 bytes  416.67 bytes/sec
> total size is 1,684  speedup is 0.90
> Running setup on master...
> Warning: Permanently added 
> 'ec2-52-90-186-83.compute-1.amazonaws.com,52.90.186.83'
> (ECDSA) to the list of known hosts.
> Connection to ec2-52-90-186-83.compute-1.amazonaws.com closed.
> Warning: Permanently added 
> 'ec2-52-90-186-83.compute-1.amazonaws.com,52.90.186.83'
> (ECDSA) to the list of known hosts.
> Setting up Spark on ip-172-31-24-124.ec2.internal...

Re: Using JDBC clients with "Spark on Hive"

2016-01-15 Thread Daniel Darabos
Does Hive JDBC work if you are not using Spark as a backend? I just had
very bad experience with Hive JDBC in general. E.g. half the JDBC protocol
is not implemented (https://issues.apache.org/jira/browse/HIVE-3175, filed
in 2012).

On Fri, Jan 15, 2016 at 2:15 AM, sdevashis  wrote:

> Hello Experts,
>
> I am getting started with Hive with Spark as the query engine. I built the
> package from sources. I am able to invoke Hive CLI and run queries and see
> in Ambari that Spark application are being created confirming hive is using
> Spark as the engine.
>
> However other than Hive CLI, I am not able to run queries from any other
> clients that use the JDBC to connect to hive through thrift. I tried
> Squirrel, Aginity Netezza workbench, and even Hue.
>
> No yarn applications are getting created, the query times out after
> sometime. Nothing gets into /tmp/user/hive.log Am I missing something?
>
> Again I am using Hive on Spark and not spark SQL.
>
> Version Info:
> Spark 1.4.1 built for Hadoop 2.4
>
>
> Thank you in advance for any pointers.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Using-JDBC-clients-with-Spark-on-Hive-tp25976.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: stopping a process usgin an RDD

2016-01-04 Thread Daniel Darabos
You can cause a failure by throwing an exception in the code running on the
executors. The task will be retried (if spark.task.maxFailures > 1), and
then the stage is failed. No further tasks are processed after that, and an
exception is thrown on the driver. You could catch the exception and see if
it was caused by your own special exception.

On Mon, Jan 4, 2016 at 1:05 PM, domibd  wrote:

> Hello,
>
> Is there a way to stop under a condition a process (like map-reduce) using
> an RDD ?
>
> (this could be use if the process does not always need to
>  explore all the RDD)
>
> thanks
>
> Dominique
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/stopping-a-process-usgin-an-RDD-tp25870.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Is coalesce smart while merging partitions?

2015-10-08 Thread Daniel Darabos
> For example does spark try to merge the small partitions first or the
election of partitions to merge is random?

It is quite smart as Iulian has pointed out. But it does not try to merge
small partitions first. Spark doesn't know the size of partitions. (The
partitions are represented as Iterators. You cannot know its size without
destroying it.)


Re: Meaning of local[2]

2015-08-17 Thread Daniel Darabos
Hi Praveen,

On Mon, Aug 17, 2015 at 12:34 PM, praveen S  wrote:

> What does this mean in .setMaster("local[2]")
>
Local mode (executor in the same JVM) with 2 executor threads.

> Is this applicable only for standalone Mode?
>
It is not applicable for standalone mode, only for local.

> Can I do this in a cluster setup, eg:
> . setMaster("[2]")..
>
No. It's faster to try than to ask a mailing list, actually. Also it's
documented at
http://spark.apache.org/docs/latest/submitting-applications.html#master-urls
.

> Is it number of threads per worker node?
>
You can control the number of total threads with
spark-submit's --total-executor-cores parameter, if that's what you're
looking for.


Re: Research ideas using spark

2015-07-14 Thread Daniel Darabos
Hi Shahid,
To be honest I think this question is better suited for Stack Overflow than
for a PhD thesis.

On Tue, Jul 14, 2015 at 7:42 AM, shahid ashraf  wrote:

> hi
>
> I have a 10 node cluster  i loaded the data onto hdfs, so the no. of
> partitions i get is 9. I am running a spark application , it gets stuck on
> one of tasks, looking at the UI it seems application is not using all nodes
> to do calculations. attached is the screen shot of tasks, it seems tasks
> are put on each node more then once. looking at tasks 8 tasks get completed
> under 7-8 minutes and one task takes around 30 minutes so causing the delay
> in results.
>
>
> On Tue, Jul 14, 2015 at 10:48 AM, Shashidhar Rao <
> raoshashidhar...@gmail.com> wrote:
>
>> Hi,
>>
>> I am doing my PHD thesis on large scale machine learning e.g  Online
>> learning, batch and mini batch learning.
>>
>> Could somebody help me with ideas especially in the context of Spark and
>> to the above learning methods.
>>
>> Some ideas like improvement to existing algorithms, implementing new
>> features especially the above learning methods and algorithms that have not
>> been implemented etc.
>>
>> If somebody could help me with some ideas it would really accelerate my
>> work.
>>
>> Plus few ideas on research papers regarding Spark or Mahout.
>>
>> Thanks in advance.
>>
>> Regards
>>
>
>
>
> --
> with Regards
> Shahid Ashraf
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>


Re: S3 vs HDFS

2015-07-09 Thread Daniel Darabos
I recommend testing it for yourself. Even if you have no application, you
can just run the spark-ec2 script, log in, run spark-shell and try reading
files from an S3 bucket and from hdfs://:9000/. (This is the
ephemeral HDFS cluster, which uses SSD.)

I just tested our application this way yesterday and found the SSD-based
HDFS to outperform S3 by a factor of 2. I don't know the cause. It may be
locality like Akhil suggests, or SSD vs HDD (assuming S3 is HDD-backed). Or
the HDFS client library and protocol are just better than the S3 versions
(which is HTTP-based and uses some 6-year-old libraries).

On Thu, Jul 9, 2015 at 9:54 AM, Sujee Maniyam  wrote:

> latency is much bigger for S3 (if that matters)
> And with HDFS you'd get data-locality that will boost your app performance.
>
> I did some light experimenting on this.
> see my presentation here for some benchmark numbers ..etc
> http://www.slideshare.net/sujee/hadoop-to-sparkv2
> from slide# 34
>
> cheers
> Sujee Maniyam (http://sujee.net | http://www.linkedin.com/in/sujeemaniyam
> )
> teaching Spark
> 
>
>
> On Wed, Jul 8, 2015 at 11:35 PM, Brandon White 
> wrote:
>
>> Are there any significant performance differences between reading text
>> files from S3 and hdfs?
>>
>
>


Re: Split RDD into two in a single pass

2015-07-06 Thread Daniel Darabos
This comes up so often. I wonder if the documentation or the API could be
changed to answer this question.

The solution I found is from
http://stackoverflow.com/questions/23995040/write-to-multiple-outputs-by-key-spark-one-spark-job.
You basically write the items into two directories in a single pass through
the RDD. Then you read back the two directories as two RDDs.

It avoids traversing the RDD twice, but writing and reading to the file
system is also costly. It may not worth it always.


On Mon, Jul 6, 2015 at 9:32 AM, Anand Nalya  wrote:

> Hi,
>
> I've a RDD which I want to split into two disjoint RDDs on with a boolean
> function. I can do this with the following
>
> val rdd1 = rdd.filter(f)
> val rdd2 = rdd.filter(fnot)
>
> I'm assuming that each of the above statement will traverse the RDD once
> thus resulting in 2 passes.
>
> Is there a way of doing this in a single pass over the RDD so that when f
> returns true, the element goes to rdd1 and to rdd2 otherwise.
>
> Regards,
> Anand
>


Re: .NET on Apache Spark?

2015-07-02 Thread Daniel Darabos
Indeed Spark does not have .NET bindings.

On Thu, Jul 2, 2015 at 10:33 AM, Zwits 
wrote:

> I'm currently looking into a way to run a program/code (DAG) written in
> .NET
> on a cluster using Spark. However I ran into problems concerning the coding
> language, Spark has no .NET API.
> I tried looking into IronPython because Spark does have a Python API, but i
> couldn't find a way to use this.
>
> Is there a way to implement a DAG of jobs on a cluster using .NET
> programming language?
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/NET-on-Apache-Spark-tp23578.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: map vs mapPartitions

2015-06-25 Thread Daniel Darabos
Spark creates a RecordReader and uses next() on it when you call
input.next(). (See
https://github.com/apache/spark/blob/v1.4.0/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala#L215)
How
the RecordReader works is an HDFS question, but it's safe to say there is
no difference between using map and mapPartitions.

On Thu, Jun 25, 2015 at 1:28 PM, Shushant Arora 
wrote:

> say source is HDFS,And file is divided in 10 partitions. so what will be
>  input contains.
>
> public Iterable call(Iterator input)
>
> say I have 10 executors in job each having single partition.
>
> will it have some part of partition or complete. And if some when I call
> input.next() - it will fetch rest or how is it handled ?
>
>
>
>
>
> On Thu, Jun 25, 2015 at 3:11 PM, Sean Owen  wrote:
>
>> No, or at least, it depends on how the source of the partitions was
>> implemented.
>>
>> On Thu, Jun 25, 2015 at 12:16 PM, Shushant Arora
>>  wrote:
>> > Does mapPartitions keep complete partitions in memory of executor as
>> > iterable.
>> >
>> > JavaRDD rdd = jsc.textFile("path");
>> > JavaRDD output = rdd.mapPartitions(new
>> > FlatMapFunction, Integer>() {
>> >
>> > public Iterable call(Iterator input)
>> > throws Exception {
>> > List output = new ArrayList();
>> > while(input.hasNext()){
>> > output.add(input.next().length());
>> > }
>> > return output;
>> > }
>> >
>> > });
>> >
>> >
>> > Here does input is present in memory and can contain complete partition
>> of
>> > gbs ?
>> > Will this function call(Iterator input) is called only for no of
>> > partitions(say if I have 10 in this example) times. Not no of lines
>> > times(say 1000) .
>> >
>> >
>> > And whats the use of mapPartitionsWithIndex ?
>> >
>> > Thanks
>> >
>>
>
>


Re: Different Sorting RDD methods in Apache Spark

2015-06-09 Thread Daniel Darabos
It would be even faster to load the data on the driver and sort it there
without using Spark :). Using reduce() is cheating, because it only works
as long as the data fits on one machine. That is not the targeted use case
of a distributed computation system. You can repeat your test with more
data (that doesn't fit on one machine) to see what I mean.

On Tue, Jun 9, 2015 at 8:30 AM, raggy  wrote:

> For a research project, I tried sorting the elements in an RDD. I did this
> in
> two different approaches.
>
> In the first method, I applied a mapPartitions() function on the RDD, so
> that it would sort the contents of the RDD, and provide a result RDD that
> contains the sorted list as the only record in the RDD. Then, I applied a
> reduce function which basically merges sorted lists.
>
> I ran these experiments on an EC2 cluster containing 30 nodes. I set it up
> using the spark ec2 script. The data file was stored in HDFS.
>
> In the second approach I used the sortBy method in Spark.
>
> I performed these operation on the US census data(100MB) found here
>
> A single lines looks like this
>
> 9, Not in universe, 0, 0, Children, 0, Not in universe, Never married, Not
> in universe or children, Not in universe, White, All other, Female, Not in
> universe, Not in universe, Children or Armed Forces, 0, 0, 0, Nonfiler, Not
> in universe, Not in universe, Child <18 never marr not in subfamily, Child
> under 18 never married, 1758.14, Nonmover, Nonmover, Nonmover, Yes, Not in
> universe, 0, Both parents present, United-States, United-States,
> United-States, Native- Born in the United States, 0, Not in universe, 0, 0,
> 94, - 5.
> I sorted based on the 25th value in the CSV. In this line that is 1758.14.
>
> I noticed that sortBy performs worse than the other method. Is this the
> expected scenario? If it is, why wouldn't the mapPartitions() and reduce()
> be the default sorting approach?
>
> Here is my implementation
>
> public static void sortBy(JavaSparkContext sc){
> JavaRDD rdd = sc.textFile("/data.txt",32);
> long start = System.currentTimeMillis();
> rdd.sortBy(new Function(){
>
> @Override
> public Double call(String v1) throws Exception {
>   // TODO Auto-generated method stub
>   String [] arr = v1.split(",");
>   return Double.parseDouble(arr[24]);
> }
> }, true, 9).collect();
> long end = System.currentTimeMillis();
> System.out.println("SortBy: " + (end - start));
>   }
>
> public static void sortList(JavaSparkContext sc){
> JavaRDD rdd = sc.textFile("/data.txt",32); //parallelize(l,
> 8);
> long start = System.currentTimeMillis();
> JavaRDD>> rdd3 =
> rdd.mapPartitions(new FlatMapFunction,
> LinkedList>>(){
>
> @Override
> public Iterable>>
> call(Iterator t)
> throws Exception {
>   // TODO Auto-generated method stub
>   LinkedList> lines = new
> LinkedList>();
>   while(t.hasNext()){
> String s = t.next();
> String arr1[] = s.split(",");
> Tuple2 t1 = new Tuple2 String>(Double.parseDouble(arr1[24]),s);
> lines.add(t1);
>   }
>   Collections.sort(lines, new IncomeComparator());
>   LinkedList>> list = new
> LinkedList>>();
>   list.add(lines);
>   return list;
> }
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Different-Sorting-RDD-methods-in-Apache-Spark-tp23214.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: coGroup on RDD

2015-06-08 Thread Daniel Darabos
I suggest you include your code and the error message! It's not even
immediately clear what programming language you mean to ask about.

On Mon, Jun 8, 2015 at 2:50 PM, elbehery  wrote:

> Hi,
>
> I have two datasets of customer types, and I would like to apply coGrouping
> on them.
>
> I could not find example for that on the website, and I have tried to apply
> this on my code, but the compiler complained ..
>
> Any suggestions ?
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/coGroup-on-RDD-Pojos-tp23206.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark error in execution

2015-01-06 Thread Daniel Darabos
Hello! I just had a very similar stack trace. It was caused by an Akka
version mismatch. (From trying to use Play 2.3 with Spark 1.1 by accident
instead of 1.2.)

On Mon, Nov 24, 2014 at 7:15 PM, Blackeye 
wrote:

> I created an application in spark. When I run it with spark, everything
> works
> fine. But when I export my application with the libraries (via sbt), and
> trying to run it as an executable jar, I get the following error:
>
> 14/11/24 20:06:11 ERROR OneForOneStrategy: exception during creation
> akka.actor.ActorInitializationException: exception during creation
> at akka.actor.ActorInitializationException$.apply(Actor.scala:164)
> at akka.actor.ActorCell.create(ActorCell.scala:596)
> at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:456)
> at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
> at
> akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263)
> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
> at
>
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
> at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
>
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
>
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.reflect.InvocationTargetException
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> Method)
> at
>
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
> at
>
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
> at akka.util.Reflect$.instantiate(Reflect.scala:66)
> at akka.actor.ArgsReflectConstructor.produce(Props.scala:349)
> at akka.actor.Props.newActor(Props.scala:249)
> at akka.actor.ActorCell.newActor(ActorCell.scala:552)
> at akka.actor.ActorCell.create(ActorCell.scala:578)
> ... 9 more
> Caused by: java.lang.AbstractMethodError:
>
> akka.remote.RemoteActorRefProvider$RemotingTerminator.akka$actor$FSM$_setter_$Event_$eq(Lakka/actor/FSM$Event$;)V
> at akka.actor.FSM$class.$init$(FSM.scala:272)
> at
>
> akka.remote.RemoteActorRefProvider$RemotingTerminator.(RemoteActorRefProvider.scala:36)
> ... 18 more
> 14/11/24 20:06:11 ERROR ActorSystemImpl: Uncaught fatal error from thread
> [sparkDriver-akka.actor.default-dispatcher-2] shutting down ActorSystem
> [sparkDriver]
> java.lang.AbstractMethodError
> at akka.actor.ActorCell.create(ActorCell.scala:580)
> at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:456)
> at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
> at
> akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263)
> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
> at
>
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
> at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
>
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
>
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> [ERROR] [11/24/2014 20:06:11.478]
> [sparkDriver-akka.actor.default-dispatcher-4] [ActorSystem(sparkDriver)]
> Uncaught fatal error from thread
> [sparkDriver-akka.actor.default-dispatcher-4] shutting down ActorSystem
> [sparkDriver]
> java.lang.AbstractMethodError
> at
>
> akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210)
> at
> akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
> at akka.actor.ActorCell.terminate(ActorCell.scala:369)
> at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462)
> at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
> at
> akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263)
> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
> at
>
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
> at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
>
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
>
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> [ERROR] [11/24/2014 20:06:11.481]
> [sparkDriver-akka.actor.default-

Re: when will the spark 1.3.0 be released?

2014-12-17 Thread Daniel Darabos
Spark 1.2.0 is coming in the next 48 hours according to
http://apache-spark-developers-list.1001551.n3.nabble.com/RESULT-VOTE-Release-Apache-Spark-1-2-0-RC2-tc9815.html

On Wed, Dec 17, 2014 at 10:11 AM, Madabhattula Rajesh Kumar <
mrajaf...@gmail.com> wrote:
>
> Hi All,
>
> When will the Spark 1.2.0 be released? and What are the features in Spark
> 1.2.0
>
> Regards,
> Rajesh
>
> On Wed, Dec 17, 2014 at 11:14 AM, Andrew Ash  wrote:
>>
>> Releases are roughly every 3mo so you should expect around March if the
>> pace stays steady.
>>
>> 2014-12-16 22:56 GMT-05:00 Marco Shaw :
>>
>>> When it is ready.
>>>
>>>
>>>
>>> > On Dec 16, 2014, at 11:43 PM, 张建轶  wrote:
>>> >
>>> > Hi £¡
>>> >
>>> > when will the spark 1.3.0 be released£¿
>>> > I want to use new LDA feature.
>>> > Thank you!
>>> B‹CB• È
>>> [œÝXœØÜšX™K  K[XZ[ ˆ \Ù\‹][œÝXœØÜšX™P Ü \šË˜\ XÚ K›Ü™ÃB‘›Üˆ Y  ] [Û˜[
>>> ÛÛ[X[™ Ë  K[XZ[ ˆ \Ù\‹Z [   Ü \šË˜\ XÚ K›Ü™ÃBƒB
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>


Re: Can spark job have sideeffects (write files to FileSystem)

2014-12-11 Thread Daniel Darabos
Yes, this is perfectly "legal". This is what RDD.foreach() is for! You may
be encountering an IO exception while writing, and maybe using() suppresses
it. (?) I'd try writing the files with java.nio.file.Files.write() -- I'd
expect there is less that can go wrong with that simple call.

On Thu, Dec 11, 2014 at 12:50 PM, Paweł Szulc  wrote:

> Imagine simple Spark job, that will store each line of the RDD to a
> separate file
>
>
> val lines = sc.parallelize(1 to 100).map(n => s"this is line $n")
> lines.foreach(line => writeToFile(line))
>
> def writeToFile(line: String) = {
> def filePath = "file://..."
> val file = new File(new URI(path).getPath)
> // using function simply closes the output stream
> using(new FileOutputStream(file)) { output =>
>   output.write(value)
> }
> }
>
>
> Now, example above works 99,9% of a time. Files are generated for each
> line, each file contains that particular line.
>
> However, when dealing with large number of data, we encounter situations
> where some of the files are empty! Files are generated, but there is no
> content inside of them (0 bytes).
>
> Now the question is: can Spark job have side effects. Is it even legal to
> write such code?
> If no, then what other choice do we have when we want to save data from
> our RDD?
> If yes, then do you guys see what could be the reason of this job acting
> in this strange manner 0.1% of the time?
>
>
> disclaimer: we are fully aware of .saveAsTextFile method in the API,
> however the example above is a simplification of our code - normally we
> produce PDF files.
>
>
> Best regards,
> Paweł Szulc
>
>
>
>
>
>
>


Re: How can I create an RDD with millions of entries created programmatically

2014-12-09 Thread Daniel Darabos
Ah... I think you're right about the flatMap then :). Or you could use
mapPartitions. (I'm not sure if it makes a difference.)

On Mon, Dec 8, 2014 at 10:09 PM, Steve Lewis  wrote:

> looks good but how do I say that in Java
> as far as I can see sc.parallelize (in Java)  has only one implementation
> which takes a List - requiring an in memory representation
>
> On Mon, Dec 8, 2014 at 12:06 PM, Daniel Darabos <
> daniel.dara...@lynxanalytics.com> wrote:
>
>> Hi,
>> I think you have the right idea. I would not even worry about flatMap.
>>
>> val rdd = sc.parallelize(1 to 100, numSlices = 1000).map(x =>
>> generateRandomObject(x))
>>
>> Then when you try to evaluate something on this RDD, it will happen
>> partition-by-partition. So 1000 random objects will be generated at a time
>> per executor thread.
>>
>> On Mon, Dec 8, 2014 at 8:05 PM, Steve Lewis 
>> wrote:
>>
>>>  I have a function which generates a Java object and I want to explore
>>> failures which only happen when processing large numbers of these object.
>>> the real code is reading a many gigabyte file but in the test code I can
>>> generate similar objects programmatically. I could create a small list,
>>> parallelize it and then use flatmap to inflate it several times by a factor
>>> of 1000 (remember I can hold a list of 1000 items in memory but not a
>>> million)
>>> Are there better ideas - remember I want to create more objects than can
>>> be held in memory at once.
>>>
>>>
>>
>
>
> --
> Steven M. Lewis PhD
> 4221 105th Ave NE
> Kirkland, WA 98033
> 206-384-1340 (cell)
> Skype lordjoe_com
>
>


Re: How can I create an RDD with millions of entries created programmatically

2014-12-08 Thread Daniel Darabos
Hi,
I think you have the right idea. I would not even worry about flatMap.

val rdd = sc.parallelize(1 to 100, numSlices = 1000).map(x =>
generateRandomObject(x))

Then when you try to evaluate something on this RDD, it will happen
partition-by-partition. So 1000 random objects will be generated at a time
per executor thread.

On Mon, Dec 8, 2014 at 8:05 PM, Steve Lewis  wrote:

>  I have a function which generates a Java object and I want to explore
> failures which only happen when processing large numbers of these object.
> the real code is reading a many gigabyte file but in the test code I can
> generate similar objects programmatically. I could create a small list,
> parallelize it and then use flatmap to inflate it several times by a factor
> of 1000 (remember I can hold a list of 1000 items in memory but not a
> million)
> Are there better ideas - remember I want to create more objects than can
> be held in memory at once.
>
>


Re: Efficient self-joins

2014-12-08 Thread Daniel Darabos
On Mon, Dec 8, 2014 at 5:26 PM, Theodore Vasiloudis <
theodoros.vasilou...@gmail.com> wrote:

> @Daniel
> It's true that the first map in your code is needed, i.e. mapping so that
> dstID is the new RDD key.
>

You wrote groupByKey is "highly inefficient due to the need to shuffle all
the data", but you seem to agree that the shuffle cannot be avoided. Both
approaches cause 1 shuffle.

I still don't see why you expect a speedup from doing this with a join. But
don't let me discourage you or anything. I'm not an expert, just trying to
learn.

The self-join on the dstKey will then create all the pairs of incoming
> edges (plus self-referential and duplicates that need to be filtered out).
>
> @Koert
> Are there any guidelines about setting the number of partitions in
> HashParitioner then?
>
> What I know about my data is that the distribution of indegree value
> (number of incoming edges for a vertex) will be similar to a power law
> <https://en.wikipedia.org/wiki/Power_law>, i.e.
> there will be a small number of keys with a high number of incoming edges,
> while most of the keys will
> have incoming few edges.
>
> What is a good partitioning strategy for a self-join on an RDD with
> unbalanced key distributions?
>
>
>
> On Mon, Dec 8, 2014 at 3:59 PM, Daniel Darabos <
> daniel.dara...@lynxanalytics.com> wrote:
>
>> I do not see how you hope to generate all incoming edge pairs without
>> repartitioning the data by dstID. You need to perform this shuffle for
>> joining too. Otherwise two incoming edges could be in separate partitions
>> and never meet. Am I missing something?
>>
>> On Mon, Dec 8, 2014 at 3:53 PM, Theodore Vasiloudis <
>> theodoros.vasilou...@gmail.com> wrote:
>>
>>> Using groupByKey was our first approach, and as noted in the docs is
>>> highly inefficient due to the need to shuffle all the data. See
>>> http://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html
>>>
>>> On Mon, Dec 8, 2014 at 3:47 PM, Daniel Darabos <
>>> daniel.dara...@lynxanalytics.com> wrote:
>>>
>>>> Could you not use a groupByKey instead of the join? I mean something
>>>> like this:
>>>>
>>>> val byDst = rdd.map { case (src, dst, w) => dst -> (src, w) }
>>>> byDst.groupByKey.map { case (dst, edges) =>
>>>>   for {
>>>> (src1, w1) <- edges
>>>> (src2, w2) <- edges
>>>>   } {
>>>> ??? // Do something.
>>>>   }
>>>>   ??? // Return something.
>>>> }
>>>>
>>>> On Mon, Dec 8, 2014 at 3:28 PM, Koert Kuipers 
>>>> wrote:
>>>>
>>>>> spark can do efficient joins if both RDDs have the same partitioner.
>>>>> so in case of self join I would recommend to create an rdd that has
>>>>> explicit partitioner and has been cached.
>>>>> On Dec 8, 2014 8:52 AM, "Theodore Vasiloudis" <
>>>>> theodoros.vasilou...@gmail.com> wrote:
>>>>>
>>>>>> Hello all,
>>>>>>
>>>>>> I am working on a graph problem using vanilla Spark (not GraphX) and
>>>>>> at some
>>>>>> point I would like to do a
>>>>>> self join on an edges RDD[(srcID, dstID, w)] on the dst key, in order
>>>>>> to get
>>>>>> all pairs of incoming edges.
>>>>>>
>>>>>> Since this is the performance bottleneck for my code, I was wondering
>>>>>> if
>>>>>> there any steps to take before
>>>>>> performing the self-join in order to make it as efficient as possible.
>>>>>>
>>>>>> In the  Learning Spark book
>>>>>> <
>>>>>> https://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch04.html
>>>>>> >
>>>>>> for example, in the "Data partitioning" section they recommend
>>>>>> performing .partitionBy(new HashPartitioner(100)) on an RDD before
>>>>>> joining
>>>>>> it with another.
>>>>>>
>>>>>> Are there any guidelines for optimizing self-join performance?
>>>>>>
>>>>>> Regards,
>>>>>> Theodore
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> View this message in context:
>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Efficient-self-joins-tp20576.html
>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>> Nabble.com.
>>>>>>
>>>>>> -
>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>>>
>>>>>>
>>>>
>>>
>>
>


Re: Efficient self-joins

2014-12-08 Thread Daniel Darabos
I do not see how you hope to generate all incoming edge pairs without
repartitioning the data by dstID. You need to perform this shuffle for
joining too. Otherwise two incoming edges could be in separate partitions
and never meet. Am I missing something?

On Mon, Dec 8, 2014 at 3:53 PM, Theodore Vasiloudis <
theodoros.vasilou...@gmail.com> wrote:

> Using groupByKey was our first approach, and as noted in the docs is
> highly inefficient due to the need to shuffle all the data. See
> http://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html
>
> On Mon, Dec 8, 2014 at 3:47 PM, Daniel Darabos <
> daniel.dara...@lynxanalytics.com> wrote:
>
>> Could you not use a groupByKey instead of the join? I mean something like
>> this:
>>
>> val byDst = rdd.map { case (src, dst, w) => dst -> (src, w) }
>> byDst.groupByKey.map { case (dst, edges) =>
>>   for {
>> (src1, w1) <- edges
>> (src2, w2) <- edges
>>   } {
>> ??? // Do something.
>>   }
>>   ??? // Return something.
>> }
>>
>> On Mon, Dec 8, 2014 at 3:28 PM, Koert Kuipers  wrote:
>>
>>> spark can do efficient joins if both RDDs have the same partitioner. so
>>> in case of self join I would recommend to create an rdd that has explicit
>>> partitioner and has been cached.
>>> On Dec 8, 2014 8:52 AM, "Theodore Vasiloudis" <
>>> theodoros.vasilou...@gmail.com> wrote:
>>>
>>>> Hello all,
>>>>
>>>> I am working on a graph problem using vanilla Spark (not GraphX) and at
>>>> some
>>>> point I would like to do a
>>>> self join on an edges RDD[(srcID, dstID, w)] on the dst key, in order
>>>> to get
>>>> all pairs of incoming edges.
>>>>
>>>> Since this is the performance bottleneck for my code, I was wondering if
>>>> there any steps to take before
>>>> performing the self-join in order to make it as efficient as possible.
>>>>
>>>> In the  Learning Spark book
>>>> <
>>>> https://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch04.html
>>>> >
>>>> for example, in the "Data partitioning" section they recommend
>>>> performing .partitionBy(new HashPartitioner(100)) on an RDD before
>>>> joining
>>>> it with another.
>>>>
>>>> Are there any guidelines for optimizing self-join performance?
>>>>
>>>> Regards,
>>>> Theodore
>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context:
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Efficient-self-joins-tp20576.html
>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>>
>>>> -
>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>
>>>>
>>
>


Re: Efficient self-joins

2014-12-08 Thread Daniel Darabos
Could you not use a groupByKey instead of the join? I mean something like
this:

val byDst = rdd.map { case (src, dst, w) => dst -> (src, w) }
byDst.groupByKey.map { case (dst, edges) =>
  for {
(src1, w1) <- edges
(src2, w2) <- edges
  } {
??? // Do something.
  }
  ??? // Return something.
}

On Mon, Dec 8, 2014 at 3:28 PM, Koert Kuipers  wrote:

> spark can do efficient joins if both RDDs have the same partitioner. so in
> case of self join I would recommend to create an rdd that has explicit
> partitioner and has been cached.
> On Dec 8, 2014 8:52 AM, "Theodore Vasiloudis" <
> theodoros.vasilou...@gmail.com> wrote:
>
>> Hello all,
>>
>> I am working on a graph problem using vanilla Spark (not GraphX) and at
>> some
>> point I would like to do a
>> self join on an edges RDD[(srcID, dstID, w)] on the dst key, in order to
>> get
>> all pairs of incoming edges.
>>
>> Since this is the performance bottleneck for my code, I was wondering if
>> there any steps to take before
>> performing the self-join in order to make it as efficient as possible.
>>
>> In the  Learning Spark book
>> <
>> https://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch04.html
>> >
>> for example, in the "Data partitioning" section they recommend
>> performing .partitionBy(new HashPartitioner(100)) on an RDD before joining
>> it with another.
>>
>> Are there any guidelines for optimizing self-join performance?
>>
>> Regards,
>> Theodore
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Efficient-self-joins-tp20576.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>


Re: SPARK LIMITATION - more than one case class is not allowed !!

2014-12-05 Thread Daniel Darabos
On Fri, Dec 5, 2014 at 7:12 AM, Tobias Pfeiffer  wrote:

> Rahul,
>
> On Fri, Dec 5, 2014 at 2:50 PM, Rahul Bindlish <
> rahul.bindl...@nectechnologies.in> wrote:
>>
>> I have done so thats why spark is able to load objectfile [e.g.
>> person_obj]
>> and spark has maintained serialVersionUID [person_obj].
>>
>> Next time when I am trying to load another objectfile [e.g. office_obj]
>> and
>> I think spark is matching serialVersionUID [person_obj] with previous
>> serialVersionUID [person_obj] and giving mismatch error.
>>
>> In my first post, I have give statements which can be executed easily to
>> replicate this issue.
>>
>
> Can you post the Scala source for your case classes? I have tried the
> following in spark-shell:
>
> case class Dog(name: String)
> case class Cat(age: Int)
> val dogs = sc.parallelize(Dog("foo") :: Dog("bar") :: Nil)
> val cats = sc.parallelize(Cat(1) :: Cat(2) :: Nil)
> dogs.saveAsObjectFile("test_dogs")
> cats.saveAsObjectFile("test_cats")
>
> This gives two directories "test_dogs/" and "test_cats/". Then I restarted
> spark-shell and entered:
>
> case class Dog(name: String)
> case class Cat(age: Int)
> val dogs = sc.objectFile("test_dogs")
> val cats = sc.objectFile("test_cats")
>
> I don't get an exception, but:
>
> dogs: org.apache.spark.rdd.RDD[Nothing] = FlatMappedRDD[1] at objectFile
> at :12
>

You need to specify the type of the RDD. The compiler does not know what is
in "test_dogs".

val dogs = sc.objectFile[Dog]("test_dogs")
val cats = sc.objectFile[Cat]("test_cats")

It's an easy mistake to make... I wonder if an assertion could be
implemented that makes sure the type parameter is present.


Re: Increasing the number of retry in case of job failure

2014-12-05 Thread Daniel Darabos
It is controlled by "spark.task.maxFailures". See
http://spark.apache.org/docs/latest/configuration.html#scheduling.

On Fri, Dec 5, 2014 at 11:02 AM, shahab  wrote:

> Hello,
>
> By some (unknown) reasons some of my tasks, that fetch data from
> Cassandra, are failing so often, and apparently the master removes a tasks
> which fails more than 4 times (in my case).
>
> Is there any way to increase the number of re-tries ?
>
> best,
> /Shahab
>


Re: akka.remote.transport.Transport$InvalidAssociationException: The remote system terminated the association because it is shutting down

2014-12-05 Thread Daniel Darabos
Hi, Alexey,
I'm getting the same error on startup with Spark 1.1.0. Everything works
fine fortunately.

The error is mentioned in the logs in
https://issues.apache.org/jira/browse/SPARK-4498, so maybe it will also be
fixed in Spark 1.2.0 and 1.1.2. I have no insight into it unfortunately.

On Tue, Dec 2, 2014 at 1:38 PM, Alexey Romanchuk  wrote:

> Any ideas? Anyone got the same error?
>
> On Mon, Dec 1, 2014 at 2:37 PM, Alexey Romanchuk <
> alexey.romanc...@gmail.com> wrote:
>
>> Hello spark users!
>>
>> I found lots of strange messages in driver log. Here it is:
>>
>> 2014-12-01 11:54:23,849 [sparkDriver-akka.actor.default-dispatcher-25]
>> ERROR
>> akka.remote.EndpointWriter[akka://sparkDriver/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FsparkExecutor%40data1.hadoop%3A17372-5/endpointWriter]
>> - AssociationError [akka.tcp://sparkDriver@10.54.87.173:55034] <-
>> [akka.tcp://sparkExecutor@data1.hadoop:17372]: Error [Shut down address:
>> akka.tcp://sparkExecutor@data1.hadoop:17372] [
>> akka.remote.ShutDownAssociation: Shut down address:
>> akka.tcp://sparkExecutor@data1.hadoop:17372
>> Caused by: akka.remote.transport.Transport$InvalidAssociationException:
>> The remote system terminated the association because it is shutting down.
>> ]
>>
>> I got this message for every worker twice. First - for driverPropsFetcher
>> and next for sparkExecutor. Looks like spark shutdown remote akka system
>> incorrectly or there is some race condition in this process and driver sent
>> some data to worker, but worker's actor system already in shutdown state.
>>
>> Except for this message everything works fine. But this is ERROR level
>> message and I found it in my "ERROR only" log.
>>
>> Do you have any idea is it configuration issue, bug in spark or akka or
>> something else?
>>
>> Thanks!
>>
>>
>


Re: How to enforce RDD to be cached?

2014-12-03 Thread Daniel Darabos
On Wed, Dec 3, 2014 at 10:52 AM, shahab  wrote:

> Hi,
>
> I noticed that rdd.cache() is not happening immediately rather due to lazy
> feature of Spark, it is happening just at the moment  you perform some
> map/reduce actions. Is this true?
>

Yes, this is correct.

If this is the case, how can I enforce Spark to cache immediately at its
> cache() statement? I need this to perform some benchmarking and I need to
> separate rdd caching and rdd transformation/action processing time.
>

The typical solution I think is to run rdd.foreach(_ => ()) to trigger a
calculation.


Re: Is sorting persisted after pair rdd transformations?

2014-11-19 Thread Daniel Darabos
Ah, so I misunderstood you too :).

My reading of org/ apache/spark/Aggregator.scala is that your function will
always see the items in the order that they are in the input RDD. An RDD
partition is always accessed as an iterator, so it will not be read out of
order.

On Wed, Nov 19, 2014 at 2:28 PM, Aniket Bhatnagar <
aniket.bhatna...@gmail.com> wrote:

> Thanks Daniel. I can understand that the keys will not be in sorted order
> but what I am trying to understanding is whether the functions are passed
> values in sorted order in a given partition.
>
> For example:
>
> sc.parallelize(1 to 8).map(i => (1, i)).sortBy(t => t._2).foldByKey(0)((a,
> b) => b).collect
> res0: Array[(Int, Int)] = Array((1,8))
>
> The fold always given me last value as 8 which suggests values preserve
> sorting earlier defined in stage in DAG?
>
> On Wed Nov 19 2014 at 18:10:11 Daniel Darabos <
> daniel.dara...@lynxanalytics.com> wrote:
>
>> Akhil, I think Aniket uses the word "persisted" in a different way than
>> what you mean. I.e. not in the RDD.persist() way. Aniket asks if running
>> combineByKey on a sorted RDD will result in a sorted RDD. (I.e. the sorting
>> is preserved.)
>>
>> I think the answer is no. combineByKey uses AppendOnlyMap, which is a
>> hashmap. That will shuffle your keys. You can quickly verify it in
>> spark-shell:
>>
>> scala> sc.parallelize(7 to 8).map(_ -> 1).reduceByKey(_ + _).collect
>> res0: Array[(Int, Int)] = Array((8,1), (7,1))
>>
>> (The initial size of the AppendOnlyMap seems to be 8, so 8 is the first
>> number that demonstrates this.)
>>
>> On Wed, Nov 19, 2014 at 9:05 AM, Akhil Das 
>> wrote:
>>
>>> If something is persisted you can easily see them under the Storage tab
>>> in the web ui.
>>>
>>> Thanks
>>> Best Regards
>>>
>>> On Tue, Nov 18, 2014 at 7:26 PM, Aniket Bhatnagar <
>>> aniket.bhatna...@gmail.com> wrote:
>>>
>>>> I am trying to figure out if sorting is persisted after applying Pair
>>>> RDD transformations and I am not able to decisively tell after reading the
>>>> documentation.
>>>>
>>>> For example:
>>>> val numbers = .. // RDD of numbers
>>>> val pairedNumbers = numbers.map(number => (number % 100, number))
>>>> val sortedPairedNumbers = pairedNumbers.sortBy(pairedNumber =>
>>>> pairedNumber._2) // Sort by values in the pair
>>>> val aggregates = sortedPairedNumbers.combineByKey(..)
>>>>
>>>> In this example, will the combine functions see values in sorted order?
>>>> What if I had done groupByKey and then combineByKey? What transformations
>>>> can unsort an already sorted data?
>>>>
>>>
>>>
>>


Re: Is sorting persisted after pair rdd transformations?

2014-11-19 Thread Daniel Darabos
Akhil, I think Aniket uses the word "persisted" in a different way than
what you mean. I.e. not in the RDD.persist() way. Aniket asks if running
combineByKey on a sorted RDD will result in a sorted RDD. (I.e. the sorting
is preserved.)

I think the answer is no. combineByKey uses AppendOnlyMap, which is a
hashmap. That will shuffle your keys. You can quickly verify it in
spark-shell:

scala> sc.parallelize(7 to 8).map(_ -> 1).reduceByKey(_ + _).collect
res0: Array[(Int, Int)] = Array((8,1), (7,1))

(The initial size of the AppendOnlyMap seems to be 8, so 8 is the first
number that demonstrates this.)

On Wed, Nov 19, 2014 at 9:05 AM, Akhil Das 
wrote:

> If something is persisted you can easily see them under the Storage tab in
> the web ui.
>
> Thanks
> Best Regards
>
> On Tue, Nov 18, 2014 at 7:26 PM, Aniket Bhatnagar <
> aniket.bhatna...@gmail.com> wrote:
>
>> I am trying to figure out if sorting is persisted after applying Pair RDD
>> transformations and I am not able to decisively tell after reading the
>> documentation.
>>
>> For example:
>> val numbers = .. // RDD of numbers
>> val pairedNumbers = numbers.map(number => (number % 100, number))
>> val sortedPairedNumbers = pairedNumbers.sortBy(pairedNumber =>
>> pairedNumber._2) // Sort by values in the pair
>> val aggregates = sortedPairedNumbers.combineByKey(..)
>>
>> In this example, will the combine functions see values in sorted order?
>> What if I had done groupByKey and then combineByKey? What transformations
>> can unsort an already sorted data?
>>
>
>


Task duration graph on Spark stage UI

2014-11-06 Thread Daniel Darabos
Even though the stage UI has min, 25th%, median, 75th%, and max durations,
I am often still left clueless about the distribution. For example, 100 out
of 200 tasks (started at the same time) have completed in 1 hour. How much
longer do I have to wait? I cannot guess well based on the five numbers.

A graph of the durations will not answer the question either, but I think
it gives a better idea. I can hopefully see if the distribution is linearly
sloped or bimodal or exponentially slowing down, etc.

It's easy to draw this graph, so I set it up as a Chrome extension:

https://chrome.google.com/webstore/detail/spark-distributions/hhgnppbenlghmcimkmiccfiemdohdgoo

And here's the complete source code that you can throw in the JavaScript
console for the same results:

var x = $('table:eq(2)').find('td:nth-child(8)').map(function (i, e) {
return parseInt($(e).attr('sorttable_customkey')); });
x.sort(function(a, b) { return a - b; });
var w = x.length;
var h = x[w - 1];
var W = 180;
var H = 80;
var canvas = $('');
canvas.css({ position: 'absolute', top: '100px', left: '500px' });
$('body').append(canvas);
var ctx = canvas[0].getContext('2d');
ctx.fillStyle = 'orange';
ctx.beginPath();
ctx.moveTo(0, H);
for (var i = 0; i < w; ++i) {
  ctx.lineTo(i * W / (w - 1), H - x[i] * H / h);
}
ctx.lineTo(W, H);
ctx.fill();

It should not be much work to add this to the stage status page itself
either, if there is interest.


Re: registering Array of CompactBuffer to Kryo

2014-10-02 Thread Daniel Darabos
How about this?

Class.forName("[Lorg.apache.spark.util.collection.CompactBuffer;")

On Tue, Sep 30, 2014 at 5:33 PM, Andras Barjak
 wrote:
> Hi,
>
> what is the correct scala code to register an Array of this private spark
> class to Kryo?
>
> "java.lang.IllegalArgumentException: Class is not registered:
> org.apache.spark.util.collection.CompactBuffer[]
> Note: To register this class use:
> kryo.register(org.apache.spark.util.collection.CompactBuffer[].class);"
>
> thanks,
>
> András Barják

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark RDD member of class loses it's value when the class being used as graph attribute

2014-06-30 Thread Daniel Darabos
Can you share some example code of what you are doing?

BTW Gmail puts down your mail as spam, saying it cannot verify it came from
yahoo.com. Might want to check your mail client settings. (It could be a
Gmail or Yahoo bug too of course.)


On Fri, Jun 27, 2014 at 4:29 PM, harsh2005_7  wrote:

> Hi,
>
> I have a scenario where I am having a class X with constructor parameter as
> (RDD,Double).When I am initializing the the class object with corresponding
> RDD and double value (of name say x1) and *putting it as a vertex attribute
> in graph* , I am losing my RDD value . The Double value remains intact . I
> tried accessing simultaneously the RDD from instance variable (x1) and i
> see
> it intact there but for some reason it's not available when i take graph
> vertex attribute and access the RDD. Please help me to understand which
> concept I am missing here ? And whats the correct way to do it.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-RDD-member-of-class-loses-it-s-value-when-the-class-being-used-as-graph-attribute-tp8420.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: Getting different answers running same line of code

2014-06-19 Thread Daniel Darabos
The easiest explanation would be if some other process is continuously
modifying the files. You could make a copy in a new directory and run on
that to eliminate this possibility.

What do you see if you print "rd1.count()" multiple times?

Have you tried the experiment on a smaller set of files? I don't know why a
file would cause this problem, but maybe you can find it that way.

Are you using a wildcard ("s3n://blabla/*.txt") or direct filenames? Maybe
S3 forgets about the existence of some files some of the time. There could
be a limit on the number of files returned to a directory query, and maybe
the order is not fixed, so different files get cut off at times.

(Sorry about the wild, uneducated guesses.)


On Thu, Jun 19, 2014 at 5:54 PM, mrm  wrote:

> Hi,
>
> I have had this issue for some time already, where I get different answers
> when I run the same line of code twice. I have run some experiments to see
> what is happening, please help me! Here is the code and the answers that I
> get. I suspect I have a problem when reading large datasets from S3.
>
> rd1 = sc.textFile('s3n://blabla')
> *rd1.persist()*
> rd2 = rd_imp.filter(lambda x: filter1(x)).map(lambda x: map1(x))
>
> Note: both filter1() and map1() are deterministic
>
> rd2.count()  ==> 294928559
> rd2.count()  ==> 294928559
>
> So far so good, I get the same counts. Now when I unpersist rd1, that's
> when
> I start getting problems!
>
> *rd1.unpersist()*
> rd2 = rd_imp.filter(lambda x: filter1(x)).map(lambda x: map1(x))
> rd2.count()  ==> 294928559
> rd2.count()  ==> 294509501
> rd2.count()  ==> 294679795
> ...
>
> I would appreciate it if you could help me!
>
> Thanks,
> Maria
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Getting-different-answers-running-same-line-of-code-tp7920.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: join operation is taking too much time

2014-06-17 Thread Daniel Darabos
I've been wondering about this. Is there a difference in performance
between these two?

val rdd1 = sc.textFile(files.mkString(",")) val rdd2 = sc.union(files.map(sc
.textFile(_)))

I don't know about your use-case, Meethu, but it may be worth trying to see
if reading all the files into one RDD (like rdd1) would perform better in
the join. (If this is possible in your situation.)



On Tue, Jun 17, 2014 at 6:45 PM, Andrew Or  wrote:

> How long does it get stuck for? This is a common sign for the OS thrashing
> due to out of memory exceptions. If you keep it running longer, does it
> throw an error?
>
> Depending on how large your other RDD is (and your join operation), memory
> pressure may or may not be the problem at all. It could be that spilling
> your shuffles
> to disk is slowing you down (but probably shouldn't hang your
> application). For the 5 RDDs case, what happens if you set
> spark.shuffle.spill to false?
>
>
> 2014-06-17 5:59 GMT-07:00 MEETHU MATHEW :
>
>
>>  Hi all,
>>
>> I want  to do a recursive leftOuterJoin between an RDD (created from
>>  file) with 9 million rows(size of the file is 100MB) and 30 other
>> RDDs(created from 30 diff files in each iteration of a loop) varying from 1
>> to 6 million rows.
>> When I run it for 5 RDDs,its running successfully  in 5 minutes.But when
>> I increase it to 10 or 30 RDDs its gradually slowing down and finally
>> getting stuck without showing any warning or error.
>>
>> I am running in standalone mode with 2 workers of 4GB each and a total
>> of 16 cores .
>>
>> Any of you facing similar problems with JOIN  or is it a problem with my
>> configuration.
>>
>> Thanks & Regards,
>> Meethu M
>>
>
>


Re: What is the best way to handle transformations or actions that takes forever?

2014-06-17 Thread Daniel Darabos
I think you need to implement a timeout in your code. As far as I know,
Spark will not interrupt the execution of your code as long as the driver
is connected. Might be an idea though.


On Tue, Jun 17, 2014 at 7:54 PM, Peng Cheng  wrote:

> I've tried enabling the speculative jobs, this seems partially solved the
> problem, however I'm not sure if it can handle large-scale situations as it
> only start when 75% of the job is finished.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/What-is-the-best-way-to-handle-transformations-or-actions-that-takes-forever-tp7664p7752.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: Is shuffle "stable"?

2014-06-14 Thread Daniel Darabos
Thanks Matei!

In the example all three items have the same key, so they go to the same
partition:

scala> sc.parallelize(Seq(0->3, 0->2, 0->1), 3).partitionBy(new
HashPartitioner(3)).glom.collect


Array(Array((0,3), (0,2), (0,1)), Array(), Array())


I guess the apparent stability is just due to the single-threaded "local"
master processing partitions in order, so (0,3) is produced first, etc.



On Sun, Jun 15, 2014 at 12:55 AM, Matei Zaharia 
wrote:

> The order is not guaranteed actually, only which keys end up in each
> partition. Reducers may fetch data from map tasks in an arbitrary order,
> depending on which ones are available first. If you’d like a specific
> order, you should sort each partition. Here you might be getting it because
> each partition only ends up having one element, and collect() does return
> the partitions in order.
>
> Matei
>
> On Jun 14, 2014, at 12:14 PM, Daniel Darabos <
> daniel.dara...@lynxanalytics.com> wrote:
>
> What I mean is, let's say I run this:
>
> sc.parallelize(Seq(0->3, 0->2, 0->1), 
> 3).partitionBy(HashPartitioner(3)).collect
>
>
> Will the result always be Array((0,3), (0,2), (0,1))? Or could I possibly get 
> a different order?
>
>
> I'm pretty sure the shuffle files are taken in the order of the source 
> partitions... But after much search, and the discussion on 
> http://stackoverflow.com/questions/24206660/does-groupbykey-in-spark-preserve-the-original-order
>  I still can't find the code that does this.
>
>
> Thanks!
>
>
>


Is shuffle "stable"?

2014-06-14 Thread Daniel Darabos
What I mean is, let's say I run this:

sc.parallelize(Seq(0->3, 0->2, 0->1), 3).partitionBy(HashPartitioner(3)).collect


Will the result always be Array((0,3), (0,2), (0,1))? Or could I
possibly get a different order?


I'm pretty sure the shuffle files are taken in the order of the source
partitions... But after much search, and the discussion on
http://stackoverflow.com/questions/24206660/does-groupbykey-in-spark-preserve-the-original-order
I still can't find the code that does this.


Thanks!


Re: list of persisted rdds

2014-06-13 Thread Daniel Darabos
Check out SparkContext.getPersistentRDDs!


On Fri, Jun 13, 2014 at 1:06 PM, mrm  wrote:

> Hi,
>
> How do I check the rdds that I have persisted? I have some code that looks
> like:
> "rd1.cache()
> 
> rd2.cache()
> ...
> 
> rdN.cache()"
>
> How can I unpersist all rdd's at once? And is it possible to get the names
> of the rdd's that are currently persisted (list = rd1, rd2, ..., rdN)?
>
> Thank you!
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/list-of-persisted-rdds-tp7565.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: Information on Spark UI

2014-06-11 Thread Daniel Darabos
About more succeeded tasks than total tasks:
 - This can happen if you have enabled speculative execution. Some
partitions can get processed multiple times.
 - More commonly, the result of the stage may be used in a later
calculation, and has to be recalculated. This happens if some of the
results were evicted from cache.


On Wed, Jun 11, 2014 at 2:23 AM, Shuo Xiang  wrote:

> Hi,
>   Came up with some confusion regarding the information on SparkUI. The
> following info is gathered while factorizing a large matrix using ALS:
>   1. some stages have more succeeded tasks than total tasks, which are
> displayed in the 5th column.
>   2. duplicate stages with exactly same stageID (stage 1/3/7)
>   3. Clicking into some stages, some executors cannot be addressed. Does
> that mean lost of executor or this does not matter?
>
>   Any explanation are appreciated!
>
>
>


Re: Hanging Spark jobs

2014-06-11 Thread Daniel Darabos
These stack traces come from the stuck node? Looks like it's waiting on
data in BlockFetcherIterator. Waiting for data from another node. But you
say all other nodes were done? Very curious.

Maybe you could try turning on debug logging, and try to figure out what
happens in BlockFetcherIterator (
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala).
I do not think it is supposed to get stuck indefinitely.

On Tue, Jun 10, 2014 at 8:22 PM, Hurwitz, Daniel  wrote:

>  Hi,
>
>
>
> We are observing a recurring issue where our Spark jobs are hanging for
> several hours, even days, until we kill them.
>
>
>
> We are running Spark v0.9.1 over YARN.
>
>
>
> Our input is a list of edges of a graph on which we use Bagel to compute
> connected components using the following method:
>
>
>
> *class* CCMessage(*var* targetId: Long, *var* myComponentId: Long)
> *extends* Message[Long] *with* Serializable
>
> *def* compute(self: CC, msgs: Option[Array[CCMessage]], superstep: Int):
> (CC, Array[CCMessage]) = {
>
>   *val* smallestComponentId = msgs.map(sq => *sq.map(_.*
> *myComponentId**)*.min).getOrElse(Long.MaxValue)
>
>   *val* newComponentId = math.min(self.clusterID, smallestComponentId)
>
>   *val* halt = (newComponentId == self.clusterID) || (superstep >=
> maxIters)
>
>   self.active = *if* (superstep == 0) *true* *else* !halt
>
>   *val* outGoingMessages = *if* (halt && superstep > 0)
> Array[CCMessage]()
>
>   *else* self.edges.map(targetId => *new* CCMessage(targetId,
> newComponentId)).toArray
>
>   self.clusterID = newComponentId
>
>
>
>   (self, outGoingMessages)
>
> }
>
>
>
> Our output is a text file in which each line is a list of the node IDs in
> each component. The size of the output may be up to 6 GB.
>
>
>
> We see in the job tracker that most of the time jobs usually get stuck on
> the “saveAsTextFile” command, the final line in our code. In some cases,
> the job will hang during one of the iterations of Bagel during the
> computation of the connected components.
>
>
>
> Oftentimes, when we kill the job and re-execute it, it will finish
> successfully within an hour which is the expected duration. We notice that
> if our Spark jobs don’t finish after a few hours, they will never finish
> until they are killed, regardless of the load on our cluster.
>
>
>
> After consulting with our Hadoop support team, they noticed that after a
> particular hanging Spark job was running for 38 hours, all Spark processes
> on all nodes were completed except for one node which was running more than
> 9 hours consuming very little CPU, then suddenly consuming 14s of CPU, then
> back to calm. Also, the other nodes were not relinquishing their resources
> until our Hadoop admin killed the process on that problematic node and
> suddenly the job finished and “success” was reported in the job tracker.
> The output seemed to be fine too. If it helps you understand the issue, the
> Hadoop admin suggested this was a Spark issue and sent us two stack dumps
> which I attached to this email: before killing the node’s Spark process
> (dump1.txt) and after (dump2.txt).
>
>
>
> Any advice on how to resolve this issue? How can we debug this?
>
>  Thanks,
>
> ~Daniel
>
>
>


Re: Better line number hints for logging?

2014-06-05 Thread Daniel Darabos
On Wed, Jun 4, 2014 at 10:39 PM, Matei Zaharia 
wrote:

> That’s a good idea too, maybe we can change CallSiteInfo to do that.
>

I've filed an issue: https://issues.apache.org/jira/browse/SPARK-2035

Matei
>
> On Jun 4, 2014, at 8:44 AM, Daniel Darabos <
> daniel.dara...@lynxanalytics.com> wrote:
>
> Oh, this would be super useful for us too!
>
> Actually wouldn't it be best if you could see the whole call stack on the
> UI, rather than just one line? (Of course you would have to click to expand
> it.)
>
>
> On Wed, Jun 4, 2014 at 2:38 AM, John Salvatier 
> wrote:
>
>> Ok, I will probably open a Jira.
>>
>>
>> On Tue, Jun 3, 2014 at 5:29 PM, Matei Zaharia 
>> wrote:
>>
>>> You can use RDD.setName to give it a name. There’s also a creationSite
>>> field that is private[spark] — we may want to add a public setter for that
>>> later. If the name isn’t enough and you’d like this, please open a JIRA
>>> issue for it.
>>>
>>> Matei
>>>
>>> On Jun 3, 2014, at 5:22 PM, John Salvatier  wrote:
>>>
>>> I have created some extension methods for RDDs in RichRecordRDD and
>>> these are working exceptionally well for me.
>>>
>>> However, when looking at the logs, its impossible to tell what's going
>>> on because all the line number hints point to RichRecordRDD.scala rather
>>> than the code that uses it. For example:
>>>
>>>> INFO scheduler.DAGScheduler: Submitting Stage 122 (MappedRDD[1223] at
>>>> map at RichRecordRDD.scala:633), which is now runnable
>>>
>>> Is there any way set up my extension methods class so that the logs will
>>> print a more useful line number?
>>>
>>>
>>>
>>
>
>


Re: Better line number hints for logging?

2014-06-04 Thread Daniel Darabos
Oh, this would be super useful for us too!

Actually wouldn't it be best if you could see the whole call stack on the
UI, rather than just one line? (Of course you would have to click to expand
it.)


On Wed, Jun 4, 2014 at 2:38 AM, John Salvatier  wrote:

> Ok, I will probably open a Jira.
>
>
> On Tue, Jun 3, 2014 at 5:29 PM, Matei Zaharia 
> wrote:
>
>> You can use RDD.setName to give it a name. There’s also a creationSite
>> field that is private[spark] — we may want to add a public setter for that
>> later. If the name isn’t enough and you’d like this, please open a JIRA
>> issue for it.
>>
>> Matei
>>
>> On Jun 3, 2014, at 5:22 PM, John Salvatier  wrote:
>>
>> I have created some extension methods for RDDs in RichRecordRDD and these
>> are working exceptionally well for me.
>>
>> However, when looking at the logs, its impossible to tell what's going on
>> because all the line number hints point to RichRecordRDD.scala rather than
>> the code that uses it. For example:
>>
>>> INFO scheduler.DAGScheduler: Submitting Stage 122 (MappedRDD[1223] at
>>> map at RichRecordRDD.scala:633), which is now runnable
>>
>> Is there any way set up my extension methods class so that the logs will
>> print a more useful line number?
>>
>>
>>
>


Re: Strange problem with saveAsTextFile after upgrade Spark 0.9.0->1.0.0

2014-06-04 Thread Daniel Darabos
On Tue, Jun 3, 2014 at 8:46 PM, Marek Wiewiorka 
wrote:

> Hi All,
> I've been experiencing a very strange error after upgrade from Spark 0.9
> to 1.0 - it seems that saveAsTestFile function is throwing
> java.lang.UnsupportedOperationException that I have never seen before.
>

In the stack trace you quoted, saveAsTextFile is not called. Is it really
throwing an exception? Do you have the stack trace from the executor
process? I think the exception originates from there, and the scheduler is
just reporting it here.


> Any hints appreciated.
>
> scheduler.TaskSetManager: Loss was due to
> java.lang.ClassNotFoundException:
> org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1 [duplicate 45]
> 14/06/03 16:46:23 ERROR actor.OneForOneStrategy:
> java.lang.UnsupportedOperationException
> at
> org.apache.spark.scheduler.SchedulerBackend$class.killTask(SchedulerBackend.scala:32)
> at
> org.apache.spark.scheduler.cluster.mesos.MesosSchedulerBackend.killTask(MesosSchedulerBackend.scala:41)
> at
> org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$cancelTasks$3$$anonfun$apply$1.apply$mcVJ$sp(TaskSchedulerImpl.scala:185)
> at
> org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$cancelTasks$3$$anonfun$apply$1.apply(TaskSchedulerImpl.scala:183)
> at
> org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$cancelTasks$3$$anonfun$apply$1.apply(TaskSchedulerImpl.scala:183)
> at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
> at
> org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$cancelTasks$3.apply(TaskSchedulerImpl.scala:183)
> at
> org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$cancelTasks$3.apply(TaskSchedulerImpl.scala:176)
> at scala.Option.foreach(Option.scala:236)
> at
> org.apache.spark.scheduler.TaskSchedulerImpl.cancelTasks(TaskSchedulerImpl.scala:176)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages$1.apply$mcVI$sp(DAGScheduler.scala:1058)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages$1.apply(DAGScheduler.scala:1045)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages$1.apply(DAGScheduler.scala:1045)
> at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
> at org.apache.spark.scheduler.DAGScheduler.org
> 
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1045)
> at
> org.apache.spark.scheduler.DAGScheduler.handleJobCancellation(DAGScheduler.scala:998)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$doCancelAllJobs$1.apply$mcVI$sp(DAGScheduler.scala:499)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$doCancelAllJobs$1.apply(DAGScheduler.scala:499)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$doCancelAllJobs$1.apply(DAGScheduler.scala:499)
> at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
> at
> org.apache.spark.scheduler.DAGScheduler.doCancelAllJobs(DAGScheduler.scala:499)
> at
> org.apache.spark.scheduler.DAGSchedulerActorSupervisor$$anonfun$2.applyOrElse(DAGScheduler.scala:1151)
> at
> org.apache.spark.scheduler.DAGSchedulerActorSupervisor$$anonfun$2.applyOrElse(DAGScheduler.scala:1147)
> at
> akka.actor.SupervisorStrategy.handleFailure(FaultHandling.scala:295)
> at
> akka.actor.dungeon.FaultHandling$class.handleFailure(FaultHandling.scala:253)
> at akka.actor.ActorCell.handleFailure(ActorCell.scala:338)
> at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:423)
> at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
> at
> akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
> at akka.dispatch.Mailbox.run(Mailbox.scala:218)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
> at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> Thanks,
> Marek
>


Re: Persist and unpersist

2014-05-28 Thread Daniel Darabos
On Wed, May 28, 2014 at 12:08 AM, Ankur Dave  wrote:

> I think what's desired here is for input to be unpersisted automatically
> as soon as result is materialized. I don't think there's currently a way
> to do this, but the usual workaround is to force result to be
> materialized immediately and then unpersist input:
>
> input.cache()val count = input.countval result =
>  input.filter(...)
> result.cache().foreach(x => {}) // materialize resultinput.unpersist() // 
> safe because `result` is materialized  // and is the only RDD 
> that depends on `input`return result
>
> Thanks! Right, this would work. But maybe we don't want to cache the
result. We often work with RDDs, that are too big to cache, but as we
process them partition by partition, the whole RDD never needs to exist in
memory. You probably know what I mean, but here's a small example:

1
2
3
4
5
6
7
8
9

def blowUp(input: RDD[Int]): RDD[Int] = {
  input.cache()
  println(input.count)
  val result = input.flatMap(i => 0 to i)
  // `result` is too big to cache :(.
  return result
}

println(blowUp(sc.parallelize(0 to 1, 1000)).count)


We were discussing these difficulties, and Andras had an interesting
idea. What if you could have a limited-use cache. You could say
"input.cacheFor(2)" and it would be cached for 2 actions, and dropped
after that. In the above example the first action would be "count" on
line 3 , and the second would be "count" on line 9. So blowUp() would
guarantee that the return value would be good for 1 action. (In the
sense, that "input" would not be recalculated.) If the caller wants to
use it more than once, they can cache it as usual.


Persist and unpersist

2014-05-27 Thread Daniel Darabos
I keep bumping into a problem with persisting RDDs. Consider this (silly)
example:

def everySecondFromBehind(input: RDD[Int]): RDD[Int] = {
  val count = input.count
  if (count % 2 == 0) {
return input.filter(_ % 2 == 1)
  } else {
return input.filter(_ % 2 == 0)
  }
}


The situation is that we want to do two things with an RDD (a "count" and a
"filter" in the example). The "input" RDD may represent a very expensive
calculation. So it would make sense to add an "input.cache()" line at the
beginning. But where do we put "input.unpersist()"?

input.cache()val count = input.countval result = input.filter(...)
input.unpersist()return result


"input.filter()" is lazy, so this does not work as expected. We only want
to release "input" from the cache once nothing depends on it anymore. Maybe
"result" was garbage collected. Maybe "result" itself has been cached. But
there is no way to detect such conditions.

Our current approach is to just leave the RDD cached, and it will get
dumped at some point anyway. Is there a better solution? Thanks for any
tips.


Re: My talk on "Spark: The Next Top (Compute) Model"

2014-05-01 Thread Daniel Darabos
Cool intro, thanks! One question. On slide 23 it says "Standalone ("local"
mode)". That sounds a bit confusing without hearing the talk.

Standalone mode is not local. It just does not depend on a cluster
software. I think it's the best mode for EC2/GCE, because they provide a
distributed filesystem anyway (S3/GCS). Why configure Hadoop if you don't
have to.


On Thu, May 1, 2014 at 12:25 AM, Dean Wampler  wrote:

> I meant to post this last week, but this is a talk I gave at the Philly
> ETE conf. last week:
>
> http://www.slideshare.net/deanwampler/spark-the-next-top-compute-model
>
> Also here:
>
> http://polyglotprogramming.com/papers/Spark-TheNextTopComputeModel.pdf
>
> dean
>
> --
> Dean Wampler, Ph.D.
> Typesafe
> @deanwampler
> http://typesafe.com
> http://polyglotprogramming.com
>


Re: GraphX. How to remove vertex or edge?

2014-05-01 Thread Daniel Darabos
Graph.subgraph() allows you to apply a filter to edges and/or vertices.


On Thu, May 1, 2014 at 8:52 AM, Николай Кинаш  wrote:

> Hello.
>
> How to remove vertex or edges from graph in GraphX?
>


Re: Shuffle phase is very slow, any help, thx!

2014-04-30 Thread Daniel Darabos
So the problem is that 99 tasks are fast (< 1 second), but 1 task is really
slow (5+ hours), is that right? And your operation is graph.vertices.count?
That is odd, but it could be that this job includes running previous
transformations. How did you construct the graph?

On Tue, Apr 29, 2014 at 3:41 AM, gogototo  wrote:

> I has an application using grapx, and some phase is very slow.
>

That would look great on a T-shirt!

Stage IdDescription Submitted   Duration ▴  Tasks:
> Succeeded/Total  Shuffle
> ReadShuffle Write
> 282 reduce at VertexRDD.scala:912014/04/28 14:07:13  5.20 h
> 100/100 3.8 MB
> 419 zipPartitions at ReplicatedVertexView.scala:101 2014/04/28 22:18:37
> 5.14 h  100/100 71.3 KB 4.5 MB
>
> In it, you can see task info as below:
> 94  5758SUCCESS PROCESS_LOCAL   BP-YZH-2-5971.360buy.com
>  2014/04/28 14:07:13
> 54 ms37.7 KB
> 71  5759SUCCESS PROCESS_LOCAL   BP-YZH-2-5978.360buy.com
>  2014/04/28 14:07:13
> 15 ms38.7 KB
> 14  5760SUCCESS PROCESS_LOCAL   BP-YZH-2-5977.360buy.com
>  2014/04/28 14:07:16
> 585 ms   38.6 KB
> 91  5761SUCCESS PROCESS_LOCAL   BP-YZH-2-5977.360buy.com
>  2014/04/28 14:07:16
> 209 ms   38.3 KB
> 53  5762SUCCESS NODE_LOCAL  BP-YZH-2-5977.360buy.com
>  2014/04/28 14:07:19
> 5.20 h   40.8 s  39.6 KB
>
> And in the slow task, can see log:
> 14/04/29 09:30:10 INFO
> storage.BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight:
> 50331648, minRequest: 10066329
> 14/04/29 09:30:10 INFO
> storage.BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight:
> 50331648, minRequest: 10066329
> 14/04/29 09:30:10 INFO
> storage.BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight:
> 50331648, minRequest: 10066329
> 14/04/29 09:30:10 INFO
> storage.BlockFetcherIterator$BasicBlockFetcherIterator: Getting 100
> non-zero-bytes blocks out of 100 blocks
> 14/04/29 09:30:10 INFO
> storage.BlockFetcherIterator$BasicBlockFetcherIterator: Started 45 remote
> gets in  2 ms
>
>
> Why this? How to solve it? Many Thx!
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Shuffle-phase-is-very-slow-any-help-thx-tp5004.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: something about memory usage

2014-04-30 Thread Daniel Darabos
On Wed, Apr 30, 2014 at 1:52 PM, wxhsdp  wrote:

> Hi, guys
>
>   i want to do some optimizations of my spark codes. i use VisualVM to
> monitor the executor when run the app.
>   here's the snapshot:
> <
> http://apache-spark-user-list.1001560.n3.nabble.com/file/n5107/executor.png
> >
>
> from the snapshot, i can get the memory usage information about the
> executor, but the executor contains lots of tasks. is it possible to get
> the
> memory usage of one single task in JVM with GC running in the background?
>

I guess you could run 1-core slaves. That way they would only work on one
task at a time.

by the way, you can see every time when memory is consumed up to 90%, JVM
> does GC operation.
> i'am a little confused about that. i originally thought that 60% of the
> memory is kept for Spark's memory cache(i did not cache any RDDs in my
> application), so there was only 40% left for running the app.
>

The way I understand it, Spark does not have a tight control on the memory.
Your code running on the executor can easily use more than 40% of memory.
Spark only limits the memory used for RDD caches and shuffles. If its RDD
caches are full, taking up 60% of the heap, and your code takes up more
than 40% (after GC), the executor will die with OOM.

I suppose there is not much Spark could do about this. You cannot control
how much memory a function you call is allowed to use.


Re: Shuffle Spill Issue

2014-04-30 Thread Daniel Darabos
Whoops, you are right. Sorry for the misinformation. Indeed reduceByKey
just calls combineByKey:

def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] =
{
  combineByKey[V]((v: V) => v, func, func, partitioner)
}

(I think I confused reduceByKey with groupByKey.)


On Wed, Apr 30, 2014 at 2:55 AM, Liu, Raymond  wrote:

> Hi Daniel
>
> Thanks for your reply, While I think for reduceByKey, it will also
> do map side combine, thus extra the result is the same, say, for each
> partition, one entry per distinct word. In my case with javaserializer,
>  240MB dataset yield to around 70MB shuffle data. Only that shuffle Spill (
> memory ) is abnormal, and sounds to me should not trigger at all. And, by
> the way, this behavior only occurs in map out side, on reduce / shuffle
> fetch side, this strange behavior won't happen.
>
> Best Regards,
> Raymond Liu
>
> From: Daniel Darabos [mailto:daniel.dara...@lynxanalytics.com]
>
> I have no idea why shuffle spill is so large. But this might make it
> smaller:
>
> val addition = (a: Int, b: Int) => a + b
> val wordsCount = wordsPair.combineByKey(identity, addition, addition)
>
> This way only one entry per distinct word will end up in the shuffle for
> each partition, instead of one entry per word occurrence.
>
> On Tue, Apr 29, 2014 at 7:48 AM, Liu, Raymond 
> wrote:
> Hi  Patrick
>
> I am just doing simple word count , the data is generated by
> hadoop random text writer.
>
> This seems to me not quite related to compress , If I turn off
> compress on shuffle, the metrics is something like below for the smaller
> 240MB Dataset.
>
>
> Executor ID Address Task Time   Total Tasks Failed Tasks
>  Succeeded Tasks Shuffle ReadShuffle Write   Shuffle Spill (Memory)
>  Shuffle Spill (Disk)
> 10  sr437:48527 35 s8   0   8   0.0 B   2.5 MB
>  2.2 GB  1291.2 KB
> 12  sr437:46077 34 s8   0   8   0.0 B   2.5 MB
>  1822.6 MB   1073.3 KB
> 13  sr434:37896 31 s8   0   8   0.0 B   2.4 MB
>  1099.2 MB   621.2 KB
> 15  sr438:52819 31 s8   0   8   0.0 B   2.5 MB
>  1898.8 MB   1072.6 KB
> 16  sr434:37103 32 s8   0   8   0.0 B   2.4 MB
>  1638.0 MB   1044.6 KB
>
>
> And the program pretty simple:
>
> val files = sc.textFile(args(1))
> val words = files.flatMap(_.split(" "))
> val wordsPair = words.map(x => (x, 1))
>
> val wordsCount = wordsPair.reduceByKey(_ + _)
> val count = wordsCount.count()
>
> println("Number of words = " + count)
>
>
> Best Regards,
> Raymond Liu
>
> From: Patrick Wendell [mailto:pwend...@gmail.com]
>
> Could you explain more what your job is doing and what data types you are
> using? These numbers alone don't necessarily indicate something is wrong.
> The relationship between the in-memory and on-disk shuffle amount is
> definitely a bit strange, the data gets compressed when written to disk,
> but unless you have a weird dataset (E.g. all zeros) I wouldn't expect it
> to compress _that_ much.
>
> On Mon, Apr 28, 2014 at 1:18 AM, Liu, Raymond 
> wrote:
> Hi
>
>
> I am running a simple word count program on spark standalone
> cluster. The cluster is made up of 6 node, each run 4 worker and each
> worker own 10G memory and 16 core thus total 96 core and 240G memory. (
> well, also used to configed as 1 worker with 40G memory on each node )
>
> I run a very small data set (2.4GB on HDFS on total) to confirm
> the problem here as below:
>
> As you can read from part of the task metrics as below, I noticed
> that the shuffle spill part of metrics indicate that there are something
> wrong.
>
> Executor ID Address Task Time   Total Tasks Failed Tasks
>  Succeeded Tasks Shuffle ReadShuffle Write   Shuffle Spill (Memory)
>  Shuffle Spill (Disk)
> 0   sr437:42139 29 s4   0   4   0.0 B   4.3 MB
>  23.6 GB 4.3 MB
> 1   sr433:46935 1.1 min 4   0   4   0.0 B   4.2 MB
>  19.0 GB 3.4 MB
> 10  sr436:53277 26 s4   0   4   0.0 B   4.3 MB
>  25.6 GB 4.6 MB
> 11  sr437:58872 32 s4   0   4   0.0 B   4.3 MB
>  25.0 GB 4.4 MB
> 12  sr435:48358 27 s4   0   4   0.0 B   4.3 MB
>  25.1 GB 4.4 MB
>
>
> You can see that the Shuffle Spill (Memory) is pretty high, almost 5000x
> of the actual shuffle data and Shuffle Spill (Disk), and also it seems to
> me that by no means that the spill should trigger, since the memory is not
> used up at all.
>
> To verify th

Re: packaging time

2014-04-29 Thread Daniel Darabos
Tips from my experience. Disable scaladoc:

sources in doc in Compile := List()

Do not package the source:

publishArtifact in packageSrc := false

And most importantly do not run "sbt assembly". It creates a fat jar. Use
"sbt package" or "sbt stage" (from sbt-native-packager). They create a
directory full of jars, and only need to update the one containing your
code.



On Tue, Apr 29, 2014 at 8:50 PM, SK  wrote:

> Each time I run sbt/sbt assembly to compile my program, the packaging time
> takes about 370 sec (about 6 min). How can I reduce this time?
>
> thanks
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/packaging-time-tp5048.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: Joining not-pair RDDs in Spark

2014-04-29 Thread Daniel Darabos
Create a key and join on that.

val callPricesByHour = callPrices.map(p => ((p.year, p.month, p.day,
p.hour), p))
val callsByHour = calls.map(c => ((c.year, c.month, c.day, c.hour), c))
val bills = callPricesByHour.join(callsByHour).mapValues({ case (p, c) =>
BillRow(c.customer, c.hour, c.minutes * p.basePrice) }).values

You should be able to expand this approach to three RDDs too.


On Tue, Apr 29, 2014 at 11:55 AM, jsantos  wrote:

> In the context of telecom industry, let's supose we have several existing
> RDDs populated from some tables in Cassandra:
>
> val callPrices: RDD[PriceRow]
> val calls: RDD[CallRow]
> val offersInCourse: RDD[OfferRow]
>
> where types are defined as follows,
>
> /** Represents the price per minute for a concrete hour */
> case class PriceRow(
> val year: Int,
> val month: Int,
> val day: Int,
> val hour: Int,
> val basePrice: Float)
>
> /** Call registries*/
> case class CallRow(
> val customer: String,
> val year: Int,
> val month: Int,
> val day: Int,
> val minutes: Int)
>
> /** Is there any discount that could be applicable here? */
> case class OfferRow(
> val offerName: String,
> val hour: Int,//[0..23]
> val discount: Float)//[0..1]
>
> Assuming we cannot use `flatMap` to mix these three RDDs like this way
> (since RDD is not really 'monadic'):
>
> /**
>  * The final bill at a concrete hour for a call
>  * is defined as {{{
>  *def billPerHour(minutes: Int,basePrice:Float,discount:Float)
> =
>  *  minutes * basePrice * discount
>  * }}}
>  */
> val bills: RDD[BillRow] = for{
> price <- callPrices
> call <- calls if call.hour==price.hour
> offer <- offersInCourse if offer.hour==price.hour
> } yield BillRow(
> call.customer,
> call.hour,
> billPerHour(call.minutes,price.basePrice,offer.discount))
>
> case class BillRow(
> val customer: String,
> val hour: DateTime,
> val amount: Float)
>
> which is the best practise for generating a new RDD that join all these
> three RDDs and represents the bill for a concrete customer?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Joining-not-pair-RDDs-in-Spark-tp5034.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: Shuffle Spill Issue

2014-04-29 Thread Daniel Darabos
I have no idea why shuffle spill is so large. But this might make it
smaller:

val addition = (a: Int, b: Int) => a + b
val wordsCount = wordsPair.combineByKey(identity, addition, addition)

This way only one entry per distinct word will end up in the shuffle for
each partition, instead of one entry per word occurrence.


On Tue, Apr 29, 2014 at 7:48 AM, Liu, Raymond  wrote:

> Hi  Patrick
>
> I am just doing simple word count , the data is generated by
> hadoop random text writer.
>
> This seems to me not quite related to compress , If I turn off
> compress on shuffle, the metrics is something like below for the smaller
> 240MB Dataset.
>
>
> Executor ID Address Task Time   Total Tasks Failed Tasks
>  Succeeded Tasks Shuffle ReadShuffle Write   Shuffle Spill (Memory)
>  Shuffle Spill (Disk)
> 10  sr437:48527 35 s8   0   8   0.0 B   2.5 MB
>  2.2 GB  1291.2 KB
> 12  sr437:46077 34 s8   0   8   0.0 B   2.5 MB
>  1822.6 MB   1073.3 KB
> 13  sr434:37896 31 s8   0   8   0.0 B   2.4 MB
>  1099.2 MB   621.2 KB
> 15  sr438:52819 31 s8   0   8   0.0 B   2.5 MB
>  1898.8 MB   1072.6 KB
> 16  sr434:37103 32 s8   0   8   0.0 B   2.4 MB
>  1638.0 MB   1044.6 KB
>
>
> And the program pretty simple:
>
> val files = sc.textFile(args(1))
> val words = files.flatMap(_.split(" "))
> val wordsPair = words.map(x => (x, 1))
>
> val wordsCount = wordsPair.reduceByKey(_ + _)
> val count = wordsCount.count()
>
> println("Number of words = " + count)
>
>
> Best Regards,
> Raymond Liu
>
> From: Patrick Wendell [mailto:pwend...@gmail.com]
>
> Could you explain more what your job is doing and what data types you are
> using? These numbers alone don't necessarily indicate something is wrong.
> The relationship between the in-memory and on-disk shuffle amount is
> definitely a bit strange, the data gets compressed when written to disk,
> but unless you have a weird dataset (E.g. all zeros) I wouldn't expect it
> to compress _that_ much.
>
> On Mon, Apr 28, 2014 at 1:18 AM, Liu, Raymond 
> wrote:
> Hi
>
>
> I am running a simple word count program on spark standalone
> cluster. The cluster is made up of 6 node, each run 4 worker and each
> worker own 10G memory and 16 core thus total 96 core and 240G memory. (
> well, also used to configed as 1 worker with 40G memory on each node )
>
> I run a very small data set (2.4GB on HDFS on total) to confirm
> the problem here as below:
>
> As you can read from part of the task metrics as below, I noticed
> that the shuffle spill part of metrics indicate that there are something
> wrong.
>
> Executor ID Address Task Time   Total Tasks Failed Tasks
>  Succeeded Tasks Shuffle ReadShuffle Write   Shuffle Spill (Memory)
>  Shuffle Spill (Disk)
> 0   sr437:42139 29 s4   0   4   0.0 B   4.3 MB
>  23.6 GB 4.3 MB
> 1   sr433:46935 1.1 min 4   0   4   0.0 B   4.2 MB
>  19.0 GB 3.4 MB
> 10  sr436:53277 26 s4   0   4   0.0 B   4.3 MB
>  25.6 GB 4.6 MB
> 11  sr437:58872 32 s4   0   4   0.0 B   4.3 MB
>  25.0 GB 4.4 MB
> 12  sr435:48358 27 s4   0   4   0.0 B   4.3 MB
>  25.1 GB 4.4 MB
>
>
> You can see that the Shuffle Spill (Memory) is pretty high, almost 5000x
> of the actual shuffle data and Shuffle Spill (Disk), and also it seems to
> me that by no means that the spill should trigger, since the memory is not
> used up at all.
>
> To verify that I further reduce the data size to 240MB on total
>
> And here is the result:
>
>
> Executor ID Address Task Time   Total Tasks Failed Tasks
>  Succeeded Tasks Shuffle ReadShuffle Write   Shuffle Spill (Memory)
>  Shuffle Spill (Disk)
> 0   sr437:50895 15 s4   0   4   0.0 B   703.0 KB
>  80.0 MB 43.2 KB
> 1   sr433:50207 17 s4   0   4   0.0 B   704.7 KB
>  389.5 MB90.2 KB
> 10  sr436:56352 16 s4   0   4   0.0 B   700.9 KB
>  814.9 MB181.6 KB
> 11  sr437:53099 15 s4   0   4   0.0 B   689.7 KB
>  0.0 B   0.0 B
> 12  sr435:48318 15 s4   0   4   0.0 B   702.1 KB
>  427.4 MB90.7 KB
> 13  sr433:59294 17 s4   0   4   0.0 B   704.8 KB
>  779.9 MB180.3 KB
>
> Nothing prevent spill from happening.
>
> Now, there seems to me that there must be something wrong with the spill
> trigger codes.
>
> So anyone encounter this issue?  By the way, I am using latest trunk code.
>
>
> Best Regards,
> Raymond Liu
>
>


Re: questions about debugging a spark application

2014-04-28 Thread Daniel Darabos
Good question! I am also new to the JVM and would appreciate some tips.

On Sun, Apr 27, 2014 at 5:19 AM, wxhsdp  wrote:

> Hi, all
>   i have some questions about debug in spark:
>   1) when application finished, application UI is shut down, i can not see
> the details about the app, like
>   shuffle size, duration time, stage information... there are not
> sufficient informations in the master UI.
>  do i need to hang the application on?
>

We added a flag to our application that causes it to sleep indefinitely at
the end for exactly this reason. Admittedly the logs contain everything to
reconstruct the same data. But the web UI is easier to understand at a
glance.

  2) how to get details about each task the executor run? like memory
> usage...
>

I had success with using VisualVM on the executor to see details about its
memory and CPU use.

  3) since i'am not familiar with JVM. do i need to run the program step by
> step or hang on the program
>   to use JVM utilities like jstack, jmap...
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/questions-about-debugging-a-spark-application-tp4891.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: Strange lookup behavior. Possible bug?

2014-04-28 Thread Daniel Darabos
That is quite mysterious, and I do not think we have enough information to
answer. JavaPairRDD.lookup() works fine on a remote Spark
cluster:

$ MASTER=spark://localhost:7077 bin/spark-shell
scala> val rdd = org.apache.spark.api.java.JavaPairRDD.fromRDD(sc.makeRDD(0
until 10, 3).map(x => ((x%3).toString, (x, x%3
scala> rdd.lookup("1")
res0: java.util.List[(Int, Int)] = [(1,1), (4,1), (7,1)]

You suggest maybe the driver does not receive a message from an executor. I
guess it is likely possible, though it has not happened to me. I would
recommend running on a single machine in the standalone setup. Start the
master and worker on the same machine, run the application there too. This
should eliminate network configuration problems.

If you still see the issue, I'd check whether the task has really
completed. What do you see on the web UI? Is the executor using CPU?

Good luck.




On Mon, Apr 28, 2014 at 2:35 AM, Yadid Ayzenberg wrote:

> Can someone please suggest how I can move forward with this?
> My spark version is 0.9.1.
> The big challenge is that this issue is not recreated when running in
> local mode. What could be the difference?
>
> I would really appreciate any pointers, as currently the the job just
> hangs.
>
>
>
>
> On 4/25/14, 7:37 PM, Yadid Ayzenberg wrote:
>
>> Some additional information - maybe this rings a bell with someone:
>>
>> I suspect this happens when the lookup returns more than one value.
>> For 0 and 1 values, the function behaves as you would expect.
>>
>> Anyone ?
>>
>>
>>
>> On 4/25/14, 1:55 PM, Yadid Ayzenberg wrote:
>>
>>> Hi All,
>>>
>>> Im running a lookup on a JavaPairRDD.
>>> When running on local machine - the lookup is successfull. However, when
>>> running a standalone cluster with the exact same dataset - one of the tasks
>>> never ends (constantly in RUNNING status).
>>> When viewing the worker log, it seems that the task has finished
>>> successfully:
>>>
>>> 14/04/25 13:40:38 INFO BlockManager: Found block rdd_2_0 locally
>>> 14/04/25 13:40:38 INFO Executor: Serialized size of result for 2 is
>>> 10896794
>>> 14/04/25 13:40:38 INFO Executor: Sending result for 2 directly to driver
>>> 14/04/25 13:40:38 INFO Executor: Finished task ID 2
>>>
>>> But it seems the driver is not aware of this, and hangs indefinitely.
>>>
>>> If I execute a count priot to the lookup - I get the correct number
>>> which suggests that the cluster is operating as expected.
>>>
>>> The exact same scenario works with a different type of key (Tuple2):
>>> JavaPairRDD.
>>>
>>> Any ideas on how to debug this problem ?
>>>
>>> Thanks,
>>>
>>> Yadid
>>>
>>>
>>
>


Re: Comparing RDD Items

2014-04-23 Thread Daniel Darabos
Hi! There is RDD.cartesian(), which creates the Cartiesian product of two
RDDs. You could do data.cartesian(data) to get an RDD of all pairs of
lines. It will be of length data.count * data.count of course.



On Wed, Apr 23, 2014 at 4:48 PM, Jared Rodriguez wrote:

> Hi there,
>
> I am new to Spark and new to scala, although have lots of experience on
> the Java side.  I am experimenting with Spark for a new project where it
> seems like it could be a good fit.  As I go through the examples, there is
> one case scenario that I am trying to figure out, comparing the contents of
> an RDD to itself to result in a new RDD.
>
> In an overly simply example, I have:
>
> JavaSparkContext sc = new JavaSparkContext ...
> JavaRDD data = sc.parallelize(buildData());
>
> I then want to compare each entry in data to other entries and end up with:
>
> JavaPairRDD> mapped = data.???
>
> Is this something easily handled by Spark?  My apologies if this is a
> stupid question, I have spent less than 10 hours tinkering with Spark and
> am trying to come up to speed.
>
>
> --
> Jared Rodriguez
>
>


Re: GraphX: .edges.distinct().count() is 10?

2014-04-23 Thread Daniel Darabos
This is caused by https://issues.apache.org/jira/browse/SPARK-1188. I think
the fix will be in the next release. But until then, do:

g.edges.map(_.copy()).distinct.count



On Wed, Apr 23, 2014 at 2:26 AM, Ryan Compton wrote:

> Try this: https://www.dropbox.com/s/xf34l0ta496bdsn/.txt
>
> This code:
>
> println(g.numEdges)
> println(g.numVertices)
> println(g.edges.distinct().count())
>
> gave me
>
> 1
> 9294
> 2
>
>
>
> On Tue, Apr 22, 2014 at 5:14 PM, Ankur Dave  wrote:
> > I wasn't able to reproduce this with a small test file, but I did change
> the
> > file parsing to use x(1).toLong instead of x(2).toLong. Did you mean to
> take
> > the third column rather than the second?
> >
> > If so, would you mind posting a larger sample of the file, or even the
> whole
> > file if possible?
> >
> > Here's the test that succeeded:
> >
> >   test("graph.edges.distinct.count") {
> > withSpark { sc =>
> >   val edgeFullStrRDD: RDD[String] = sc.parallelize(List(
> > "394365859\t136153151", "589404147\t1361045425"))
> >   val edgeTupRDD = edgeFullStrRDD.map(x => x.split("\t"))
> > .map(x => (x(0).toLong, x(1).toLong))
> >   val g = Graph.fromEdgeTuples(edgeTupRDD, defaultValue = 123,
> > uniqueEdges = Option(CanonicalRandomVertexCut))
> >   assert(edgeTupRDD.distinct.count() === 2)
> >   assert(g.numEdges === 2)
> >   assert(g.edges.distinct.count() === 2)
> > }
> >   }
> >
> > Ankur
>


Re: ERROR TaskSchedulerImpl: Lost an executor

2014-04-23 Thread Daniel Darabos
With the right program you can always exhaust any amount of memory :).
There is no silver bullet. You have to figure out what is happening in your
code that causes a high memory use and address that. I spent all of last
week doing this for a simple program of my own. Lessons I learned that may
or may not apply to your case:

 - If you don't cache (persist) an RDD, it is not stored. This can save
memory at the cost of possibly repeating computation. (I read around a TB
of files twice, for example, rather than cache them.)
 - Use combineByKey instead of groupByKey if you can process values one by
one. This means they do not need to be all stored.
 - If you have a lot of keys per partition, set mapSideCombine=false for
combineByKey. This avoids creating a large map per partition.
 - If you have a key with a disproportionate number of values (like the
empty string for a missing name), discard it before the computation.
 - Read https://spark.apache.org/docs/latest/tuning.html for more (and more
accurate) information.

Good luck.


On Wed, Apr 23, 2014 at 1:25 AM, jaeholee  wrote:

> Ok. I tried setting the partition number to 128 and numbers greater than
> 128,
> and now I get another error message about "Java heap space". Is it possible
> that there is something wrong with the setup of my Spark cluster to begin
> with? Or is it still an issue with partitioning my data? Or do I just need
> more worker nodes?
>
>
> ERROR TaskSetManager: Task 194.0:14 failed 4 times; aborting job
> org.apache.spark.SparkException: Job aborted: Task 194.0:14 failed 4 times
> (most recent failure: Exception failure: java.lang.OutOfMemoryError: Java
> heap space)
> at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020)
> at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018)
> at
>
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018)
> at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
> at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
> at scala.Option.foreach(Option.scala:236)
> at
>
> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:604)
> at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
> at
>
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
> at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
>
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
>
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/ERROR-TaskSchedulerImpl-Lost-an-executor-tp4566p4623.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: ERROR TaskSchedulerImpl: Lost an executor

2014-04-22 Thread Daniel Darabos
On Wed, Apr 23, 2014 at 12:06 AM, jaeholee  wrote:

> How do you determine the number of partitions? For example, I have 16
> workers, and the number of cores and the worker memory set in spark-env.sh
> are:
>
> CORE = 8
> MEMORY = 16g
>

So you have the capacity to work on 16 * 8 = 128 tasks at a time. If you
use less than 128 partitions, some CPUs will go unused, and the job will
complete more slowly than it could. But depending on what you are actually
doing with the data, you may want to use much more partitions than that. I
find it fairly difficult to estimate the memory use -- it is much easier to
just try with more partitions and see if it is enough :). The overhead for
using more partitions than necessary is pretty small.

The .csv data I have is about 500MB, but I am eventually going to use a file
> that is about 15GB.
>
> Is the MEMORY variable in spark-env.sh different from spark.executor.memory
> that you mentioned? If they're different, how do I set
> spark.executor.memory?
>

It is probably the same thing. Sorry for confusing you. To be sure, check
the Spark master web UI while the application is connected. You will see
how much RAM is used per worker on the main page. You can set
"spark.executor.memory" through SparkConf.set("spark.executor.memory",
"10g") when creating the SparkContext.


Re: ERROR TaskSchedulerImpl: Lost an executor

2014-04-22 Thread Daniel Darabos
Most likely the data is not "just too big". For most operations the data is
processed partition by partition. The partitions may be too big. This is
what your last question hints at too:

> val numWorkers = 10
> val data = sc.textFile("somedirectory/data.csv", numWorkers)

This will work, but not quite what you want to do. The second parameter to
textFile is the number of partitions you want. Given the error you are
seeing, I'd recommend asking for more partitions -- they will be smaller.

Also make sure you set spark.executor.memory to the capacity of the worker
machines.


On Tue, Apr 22, 2014 at 11:09 PM, jaeholee  wrote:

> Spark is running fine, but I get this message. Does this mean that my data
> is
> just too big?
>
> 14/04/22 17:06:20 ERROR TaskSchedulerImpl: Lost executor 2 on WORKER#2:
> OutOfMemoryError
> 14/04/22 17:06:20 ERROR TaskSetManager: Task 550.0:2 failed 4 times;
> aborting job
> org.apache.spark.SparkException: Job aborted: Task 550.0:2 failed 4 times
> (most recent failure: unknown)
> at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020)
> at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018)
> at
>
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018)
> at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
> at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
> at scala.Option.foreach(Option.scala:236)
> at
>
> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:604)
> at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
> at
>
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
> at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
>
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
>
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/ERROR-TaskSchedulerImpl-Lost-an-executor-tp4566p4618.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: stdout in workers

2014-04-22 Thread Daniel Darabos
On Mon, Apr 21, 2014 at 7:59 PM, Jim Carroll  wrote:

>
> I'm experimenting with a few things trying to understand how it's working.
> I
> took the JavaSparkPi example as a starting point and added a few System.out
> lines.
>
> I added a system.out to the main body of the driver program (not inside of
> any Functions).
> I added another to the mapper.
> I added another to the reducer.
>
> I set up a simple "standalone" distribution with a 1 master (no zookeeper)
> and a 1 worker.
>
> The main println and the reducer println print out from the driver program.
>
> The mapper one doesn't print in the shell I started the worker in, nor does
> it appear in any logs (in case stdout was redirected).
>
> However, the program executes (and the mapper executes on the worker) since
> I can see the job run in the logs and I get an answer.
>
> Where should I be looking?
>

It is in the executor logs. The executor is a process started by the worker
for your application, which runs your custom code. So the logs are on the
worker machines. The default location is within the Spark directory under
the "work" subdirectory. For example
~/spark-0.9.0-incubating/work/app-20140418164503-0025/0/stdout.

It is often much easier to access this log from the UI. Click on the
application ID on the master, or on a worker, and then you see the
stdout/stderr links.

Thanks. Apologies if this is a dumb question - I searched for answer but
> only found a reference to another list posting where the user thought his
> driver print statement should have printed on the worker.
>

I think it's a perfectly good question, and you might want to make a
suggestion for adding it to the documentation somewhere.

Jim
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/stdout-in-workers-tp4537.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: sc.makeRDD bug with NumericRange

2014-04-18 Thread Daniel Darabos
To make up for mocking Scala, I've filed a bug (
https://issues.scala-lang.org/browse/SI-8518) and will try to patch this.


On Fri, Apr 18, 2014 at 9:24 PM, Daniel Darabos <
daniel.dara...@lynxanalytics.com> wrote:

> Looks like NumericRange in Scala is just a joke.
>
> scala> val x = 0.0 to 1.0 by 0.1
> x: scala.collection.immutable.NumericRange[Double] = NumericRange(0.0,
> 0.1, 0.2, 0.30004, 0.4, 0.5, 0.6, 0.7, 0.7999,
> 0.8999, 0.)
>
> scala> x.take(3)
> res1: scala.collection.immutable.NumericRange[Double] = NumericRange(0.0,
> 0.1, 0.2)
>
> scala> x.drop(3)
> res2: scala.collection.immutable.NumericRange[Double] =
> NumericRange(0.30004, 0.4, 0.5, 0.6, 0.7, 0.7999,
> 0.8999, 0.)
>
> So far so good.
>
> scala> x.drop(3).take(3)
> res3: scala.collection.immutable.NumericRange[Double] =
> NumericRange(0.30004, 0.4)
>
> Why only two values? Where's 0.5?
>
> scala> x.drop(6)
> res4: scala.collection.immutable.NumericRange[Double] =
> NumericRange(0.6001, 0.7001, 0.8, 0.9)
>
> And where did the last value disappear now?
>
> You have to approach Scala with a healthy amount of distrust. You're on
> the right track with toArray.
>
>
> On Fri, Apr 18, 2014 at 8:01 PM, Mark Hamstra wrote:
>
>> Please file an issue: Spark Project 
>> JIRA<https://issues.apache.org/jira/browse/SPARK>
>>
>>
>>
>> On Fri, Apr 18, 2014 at 10:25 AM, Aureliano Buendia > > wrote:
>>
>>> Hi,
>>>
>>> I just notices that sc.makeRDD() does not make all values given with
>>> input type of NumericRange, try this in spark shell:
>>>
>>>
>>> $ MASTER=local[4] bin/spark-shell
>>>
>>> scala> sc.makeRDD(0.0 to 1 by 0.1).collect().length
>>>
>>> *8*
>>>
>>>
>>> The expected length is 11. This works correctly when lanching spark with
>>> only one core:
>>>
>>>
>>> $ MASTER=local[1] bin/spark-shell
>>>
>>> scala> sc.makeRDD(0.0 to 1 by 0.1).collect().length
>>>
>>> *11*
>>>
>>>
>>> This also works correctly when using toArray():
>>>
>>> $ MASTER=local[4] bin/spark-shell
>>>
>>> scala> sc.makeRDD((0.0 to 1 by 0.1).*toArray*).collect().length
>>>
>>> *8*
>>>
>>
>>
>


Re: sc.makeRDD bug with NumericRange

2014-04-18 Thread Daniel Darabos
Looks like NumericRange in Scala is just a joke.

scala> val x = 0.0 to 1.0 by 0.1
x: scala.collection.immutable.NumericRange[Double] = NumericRange(0.0, 0.1,
0.2, 0.30004, 0.4, 0.5, 0.6, 0.7, 0.7999,
0.8999, 0.)

scala> x.take(3)
res1: scala.collection.immutable.NumericRange[Double] = NumericRange(0.0,
0.1, 0.2)

scala> x.drop(3)
res2: scala.collection.immutable.NumericRange[Double] =
NumericRange(0.30004, 0.4, 0.5, 0.6, 0.7, 0.7999,
0.8999, 0.)

So far so good.

scala> x.drop(3).take(3)
res3: scala.collection.immutable.NumericRange[Double] =
NumericRange(0.30004, 0.4)

Why only two values? Where's 0.5?

scala> x.drop(6)
res4: scala.collection.immutable.NumericRange[Double] =
NumericRange(0.6001, 0.7001, 0.8, 0.9)

And where did the last value disappear now?

You have to approach Scala with a healthy amount of distrust. You're on the
right track with toArray.


On Fri, Apr 18, 2014 at 8:01 PM, Mark Hamstra wrote:

> Please file an issue: Spark Project 
> JIRA
>
>
>
> On Fri, Apr 18, 2014 at 10:25 AM, Aureliano Buendia 
> wrote:
>
>> Hi,
>>
>> I just notices that sc.makeRDD() does not make all values given with
>> input type of NumericRange, try this in spark shell:
>>
>>
>> $ MASTER=local[4] bin/spark-shell
>>
>> scala> sc.makeRDD(0.0 to 1 by 0.1).collect().length
>>
>> *8*
>>
>>
>> The expected length is 11. This works correctly when lanching spark with
>> only one core:
>>
>>
>> $ MASTER=local[1] bin/spark-shell
>>
>> scala> sc.makeRDD(0.0 to 1 by 0.1).collect().length
>>
>> *11*
>>
>>
>> This also works correctly when using toArray():
>>
>> $ MASTER=local[4] bin/spark-shell
>>
>> scala> sc.makeRDD((0.0 to 1 by 0.1).*toArray*).collect().length
>>
>> *8*
>>
>
>


Re: Continuously running non-streaming jobs

2014-04-17 Thread Daniel Darabos
I'm quite new myself (just subscribed to the mailing list today :)), but
this happens to be something we've had success with. So let me know if you
hit any problems with this sort of usage.


On Thu, Apr 17, 2014 at 9:11 PM, Jim Carroll  wrote:

> Daniel,
>
> I'm new to Spark but I thought that thread hinted at the right answer.
>
> Thanks,
> Jim
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Continuously-running-non-streaming-jobs-tp4391p4397.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: Continuously running non-streaming jobs

2014-04-17 Thread Daniel Darabos
The linked thread does a good job answering your question. You should
create a SparkContext at startup and re-use it for all of your queries. For
example we create a SparkContext in a web server at startup, and are then
able to use the Spark cluster for serving Ajax queries with latency of a
second or less. The executors keep running during this time, so there is
minimal overhead to starting a job.


On Thu, Apr 17, 2014 at 8:02 PM, Jim Carroll  wrote:

> Is there a way to create continuously-running, or at least
> continuously-loaded, jobs that can be 'invoked' rather than 'sent' to to
> avoid the job creation overhead of a couple seconds?
>
> I read through the following:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Job-initialization-performance-of-Spark-standalone-mode-vs-YARN-td2016.html
>
> Thanks.
> Jim
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Continuously-running-non-streaming-jobs-tp4391.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: confused by reduceByKey usage

2014-04-17 Thread Daniel Darabos
Here's a way to debug something like this:

scala> d5.keyBy(_.split(" ")(0)).reduceByKey((v1,v2) => {
   println("v1: " + v1)
   println("v2: " + v2)
   (v1.split(" ")(1).toInt + v2.split(" ")(1).toInt).toString
   }).collect

You get:
v1: 1 2 3 4 5
v2: 1 2 3 4 5
v1: 4
v2: 1 2 3 4 5
java.lang.ArrayIndexOutOfBoundsException: 1

reduceByKey() works kind of like regular Scala reduce(). So it will call
the function on the first two values, then on the result of that and the
next value, then the result of that and the next value, and so on. First
you add 2+2 and get 4. Then your function is called with v1="4" and v2 is
the third line.

What you could do instead:

scala> d5.keyBy(_.split(" ")(0)).mapValues(_.split("
")(1).toInt).reduceByKey((v1, v2) => v1 + v2).collect


On Thu, Apr 17, 2014 at 6:29 PM, 诺铁  wrote:

> HI,
>
> I am new to spark,when try to write some simple tests in spark shell, I
> met following problem.
>
> I create a very small text file,name it as 5.txt
> 1 2 3 4 5
> 1 2 3 4 5
> 1 2 3 4 5
>
> and experiment in spark shell:
>
> scala> val d5 = sc.textFile("5.txt").cache()
> d5: org.apache.spark.rdd.RDD[String] = MappedRDD[91] at textFile at
> :12
>
> scala> d5.keyBy(_.split(" ")(0)).reduceByKey((v1,v2) => (v1.split("
> ")(1).toInt + v2.split(" ")(1).toInt).toString).first
>
> then error occurs:
> 14/04/18 00:20:11 ERROR Executor: Exception in task ID 36
> java.lang.ArrayIndexOutOfBoundsException: 1
> at $line60.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(:15)
>  at $line60.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(:15)
> at
> org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:120)
>
> when I delete 1 line in the file, and make it 2 lines,the result is
> correct, I don't understand what's the problem, please help me,thanks.
>
>