LynxKite is now open-source

2020-06-24 Thread Daniel Darabos
Hi,
LynxKite is a graph analytics application built on Apache Spark. (From very
early on, like Spark 0.9.) We have talked about it on occasion on Spark
Summits.

So I wanted to let you know that it's now open-source!

https://github.com/lynxkite/lynxkite

You should totally check it out if you work with graphs. But even if you
only use SparkSQL you may find it a nice (graphical) environment for data
exploration.

We're going to do development completely in the open and with the community
from now on, so any bug reports and pull requests are extremely welcome.
Cheers,
Daniel


LynxKite is now open-source

2020-06-24 Thread Daniel Darabos
Hi,
LynxKite is a graph analytics application built on Apache Spark. (From very
early on, like Spark 0.9.) We have talked about it on occasion on Spark
Summits.

So I wanted to let you know that it's now open-source!

https://github.com/lynxkite/lynxkite

You should totally check it out if you work with graphs. But even if you
only use SparkSQL you may find it a nice (graphical) environment for data
exploration.

We're going to do development completely in the open and with the community
from now on, so any bug reports and pull requests are extremely welcome.
Cheers,
Daniel


Re: Quick one on evaluation

2017-08-04 Thread Daniel Darabos
On Fri, Aug 4, 2017 at 4:36 PM, Jean Georges Perrin  wrote:

> Thanks Daniel,
>
> I like your answer for #1. It makes sense.
>
> However, I don't get why you say that there are always pending
> transformations... After you call an action, you should be "clean" from
> pending transformations, no?
>

Nope. Say you have val df = spark.read.csv("data.csv"); println(df.count +
df.count). The first "df.count" reads in the file and counts the rows. The
action was executed, but "df" still represents the same pending
transformations. The second "df.count" again reads in the file and counts
the rows. Actions do not modify DataFrames/RDDs. (The only exception is
"cache()".)


Re: Quick one on evaluation

2017-08-03 Thread Daniel Darabos
On Wed, Aug 2, 2017 at 2:16 PM, Jean Georges Perrin  wrote:

> Hi Sparkians,
>
> I understand the lazy evaluation mechanism with transformations and
> actions. My question is simpler: 1) are show() and/or printSchema()
> actions? I would assume so...
>

show() is an action (it prints data) but printSchema() is not an action.
Spark can tell you the schema of the result without computing the result.

and optional question: 2) is there a way to know if there are
> transformations "pending"?
>

There are always transformations pending :). An RDD or DataFrame is a
series of pending transformations. If you say val df =
spark.read.csv("foo.csv"), that is a pending transformation. Even
spark.emptyDataFrame is best understood as a pending transformation: it
does not do anything on the cluster, but records locally what it will have
to do on the cluster.


Re: Strongly Connected Components

2016-11-11 Thread Daniel Darabos
Hi Shreya,
GraphFrames just calls the GraphX strongly connected components code. (
https://github.com/graphframes/graphframes/blob/release-0.2.0/src/main/scala/org/graphframes/lib/StronglyConnectedComponents.scala#L51
)

For choosing the number of iterations: If the number of iterations is less
than the diameter of the graph, you may get an incorrect result. But
running for more iterations than that buys you nothing. The algorithm is
basically to broadcast your ID to all your neighbors in the first round,
and then broadcast the smallest ID that you have seen so far in the next
rounds. So with only 1 round you will get a wrong result unless each vertex
is connected to the vertex with the lowest ID in that component. (Unlikely
in a real graph.)

See
https://github.com/apache/spark/blob/v2.0.2/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala
for the actual implementation.

A better algorithm exists for this problem that only requires O(log(N))
iterations when N is the largest component diameter. (It is described in "A
Model of Computation for MapReduce",
http://www.sidsuri.com/Publications_files/mrc.pdf.) This outperforms
GraphX's implementation immensely. (See the last slide of
http://www.slideshare.net/SparkSummit/interactive-graph-analytics-daniel-darabos#33.)
The large advantage is due to the lower number of necessary iterations.

For why this is failing even with one iteration: I would first check your
partitioning. Too many or too few partitions could equally cause the issue.
If you are lucky, there is no overlap between the "too many" and "too few"
domains :).

On Fri, Nov 11, 2016 at 7:39 PM, Shreya Agarwal <shrey...@microsoft.com>
wrote:

> Tried GraphFrames. Still faced the same – job died after a few hours . The
> errors I see (And I see tons of them) are –
>
> (I ran with 3 times the partitions as well, which was 12 times number of
> executors , but still the same.)
>
>
>
> -
>
> ERROR NativeAzureFileSystem: Encountered Storage Exception for write on
> Blob : hdp/spark2-events/application_1478717432179_0021.inprogress
> Exception details: null Error Code : RequestBodyTooLarge
>
>
>
> -
>
>
>
> 16/11/11 09:21:46 ERROR TransportResponseHandler: Still have 3 requests
> outstanding when connection from /10.0.0.95:43301 is closed
>
> 16/11/11 09:21:46 INFO RetryingBlockFetcher: Retrying fetch (1/3) for 2
> outstanding blocks after 5000 ms
>
> 16/11/11 09:21:46 INFO ShuffleBlockFetcherIterator: Getting 1500 non-empty
> blocks out of 1500 blocks
>
> 16/11/11 09:21:46 ERROR OneForOneBlockFetcher: Failed while starting block
> fetches
>
> java.io.IOException: Connection from /10.0.0.95:43301 closed
>
>
>
> -
>
>
>
> 16/11/11 09:21:46 ERROR OneForOneBlockFetcher: Failed while starting block
> fetches
>
> java.lang.RuntimeException: java.io.FileNotFoundException:
> /mnt/resource/hadoop/yarn/local/usercache/shreyagrssh/
> appcache/application_1478717432179_0021/blockmgr-b1dde30d-359e-4932-b7a4-
> a5e138a52360/37/shuffle_1346_21_0.index (No such file or directory)
>
>
>
> -
>
>
>
> org.apache.spark.SparkException: Exception thrown in awaitResult
>
> at org.apache.spark.rpc.RpcTimeout$$anonfun$1.
> applyOrElse(RpcTimeout.scala:77)
>
> at org.apache.spark.rpc.RpcTimeout$$anonfun$1.
> applyOrElse(RpcTimeout.scala:75)
>
> at scala.runtime.AbstractPartialFunction.apply(
> AbstractPartialFunction.scala:36)
>
> at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.
> applyOrElse(RpcTimeout.scala:59)
>
> at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.
> applyOrElse(RpcTimeout.scala:59)
>
> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
>
> at org.apache.spark.rpc.RpcTimeout.awaitResult(
> RpcTimeout.scala:83)
>
> at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(
> RpcEndpointRef.scala:102)
>
> at org.apache.spark.executor.Executor.org$apache$spark$
> executor$Executor$$reportHeartBeat(Executor.scala:518)
>
> at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$
> 1.apply$mcV$sp(Executor.scala:547)
>
> at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$
> 1.apply(Executor.scala:547)
>
> at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$
> 1.apply(Executor.scala:547)
>
> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.
> scala:1857)
>
> at org.apache.spark.executor.Executor$$anon$1.run(Executor.

Re: Spark 2.0 - DataFrames vs Dataset performance

2016-10-24 Thread Daniel Darabos
Hi Antoaneta,
I believe the difference is not due to Datasets being slower (DataFrames
are just an alias to Datasets now), but rather using a user defined
function for filtering vs using Spark builtins. The builtin can use tricks
from Project Tungsten, such as only deserializing the "event_type" column.
The user-defined function on the other hand has to be called with a full
case class, so the whole object needs to be deserialized for each row.

Well, that is my understanding from reading some slides. Hopefully someone
with a more direct knowledge of the code will correct me if I'm wrong.

On Mon, Oct 24, 2016 at 2:50 PM, Antoaneta Marinova <
antoaneta.vmarin...@gmail.com> wrote:

> Hello,
>
> I am using Spark 2.0 for performing filtering, grouping and counting
> operations on events data saved in parquet files. As the events schema has
> very nested structure I wanted to read them as scala beans to simplify the
> code but I noticed a severe performance degradation. Below you can find
> simple comparison of the same operation using DataFrame and Dataset.
>
> val data = session.read.parquet("events_data/")
>
> // Using Datasets with custom class
>
> //Case class matching the events schema
>
> case class CustomEvent(event_id: Option[String],
>
> event_type: Option[String]
>context : Option[Context],
>
> ….
>time: Option[BigInt]) extends Serializable {}
>
> scala> val start = System.currentTimeMillis ;
>
>   val count = data.as[CustomEvent].filter(event =>
> eventNames.contains(event.event_type.get)).count ;
>
>  val time =  System.currentTimeMillis - start
>
> count: Long = 5545
>
> time: Long = 11262
>
> // Using DataFrames
>
> scala>
>
> val start = System.currentTimeMillis ;
>
> val count = data.filter(col("event_type").isin(eventNames:_*)).count ;
>
> val time =  System.currentTimeMillis - start
>
> count: Long = 5545
>
> time: Long = 147
>
> The schema of the events is something like this:
>
> //events schma
>
> schemaroot
>
> |-- event_id: string (nullable = true)
>
> |-- event_type: string (nullable = true)
>
> |-- context: struct (nullable = true)
>
> ||-- environment_1: struct (nullable = true)
>
> |||-- filed1: integer (nullable = true)
>
> |||-- filed2: integer (nullable = true)
>
> |||-- filed3: integer (nullable = true)
>
> ||-- environment_2: struct (nullable = true)
>
> |||-- filed_1: string (nullable = true)
>
> 
>
> |||-- filed_n: string (nullable = true)
>
> |-- metadata: struct (nullable = true)
>
> ||-- elements: array (nullable = true)
>
> |||-- element: struct (containsNull = true)
>
> ||||-- config: string (nullable = true)
>
> ||||-- tree: array (nullable = true)
>
> |||||-- element: struct (containsNull = true)
>
> ||||||-- path: array (nullable = true)
>
> |||||||-- element: struct (containsNull = true)
>
> ||||||||-- key: string (nullable = true)
>
> ||||||||-- name: string (nullable = true)
>
> ||||||||-- level: integer (nullable = true)
>
> |-- time: long (nullable = true)
>
> Could you please advise me on the usage of the different abstractions and
> help me understand why using datasets with user defined class is so much
> slower.
>
> Thank you,
> Antoaneta
>


Re: how to run local[k] threads on a single core

2016-08-04 Thread Daniel Darabos
You could run the application in a Docker container constrained to one CPU
with --cpuset-cpus (
https://docs.docker.com/engine/reference/run/#/cpuset-constraint).

On Thu, Aug 4, 2016 at 8:51 AM, Sun Rui  wrote:

> I don’t think it possible as Spark does not support thread to CPU affinity.
> > On Aug 4, 2016, at 14:27, sujeet jog  wrote:
> >
> > Is there a way we can run multiple tasks concurrently on a single core
> in local mode.
> >
> > for ex :- i have 5 partition ~ 5 tasks, and only a single core , i want
> these tasks to run concurrently, and specifiy them to use /run on a single
> core.
> >
> > The machine itself is say 4 core, but i want to utilize only 1 core out
> of it,.
> >
> > Is it possible ?
> >
> > Thanks,
> > Sujeet
> >
>
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Spark 1.6.2 version displayed as 1.6.1

2016-07-25 Thread Daniel Darabos
Another possible explanation is that by accident you are still running
Spark 1.6.1. Which download are you using? This is what I see:

$ ~/spark-1.6.2-bin-hadoop2.6/bin/spark-shell
log4j:WARN No appenders could be found for logger
(org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for
more info.
Using Spark's repl log4j profile:
org/apache/spark/log4j-defaults-repl.properties
To adjust logging level use sc.setLogLevel("INFO")
Welcome to
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.6.2
  /_/


On Mon, Jul 25, 2016 at 7:45 AM, Sean Owen  wrote:

> Are you certain? looks like it was correct in the release:
>
>
> https://github.com/apache/spark/blob/v1.6.2/core/src/main/scala/org/apache/spark/package.scala
>
>
>
> On Mon, Jul 25, 2016 at 12:33 AM, Ascot Moss  wrote:
> > Hi,
> >
> > I am trying to upgrade spark from 1.6.1 to 1.6.2, from 1.6.2
> spark-shell, I
> > found the version is still displayed 1.6.1
> >
> > Is this a minor typo/bug?
> >
> > Regards
> >
> >
> >
> > ###
> >
> > Welcome to
> >
> >     __
> >
> >  / __/__  ___ _/ /__
> >
> > _\ \/ _ \/ _ `/ __/  '_/
> >
> >/___/ .__/\_,_/_/ /_/\_\   version 1.6.1
> >
> >   /_/
> >
> >
> >
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: spark.executor.cores

2016-07-15 Thread Daniel Darabos
Mich's invocation is for starting a Spark application against an already
running Spark standalone cluster. It will not start the cluster for you.

We used to not use "spark-submit", but we started using it when it solved
some problem for us. Perhaps that day has also come for you? :)

On Fri, Jul 15, 2016 at 5:14 PM, Jean Georges Perrin  wrote:

> I don't use submit: I start my standalone cluster and connect to it
> remotely. Is that a bad practice?
>
> I'd like to be able to it dynamically as the system knows whether it needs
> more or less resources based on its own  context
>
> On Jul 15, 2016, at 10:55 AM, Mich Talebzadeh 
> wrote:
>
> Hi,
>
> You can also do all this at env or submit time with spark-submit which I
> believe makes it more flexible than coding in.
>
> Example
>
> ${SPARK_HOME}/bin/spark-submit \
> --packages com.databricks:spark-csv_2.11:1.3.0 \
> --driver-memory 2G \
> --num-executors 2 \
> --executor-cores 3 \
> --executor-memory 2G \
> --master spark://50.140.197.217:7077 \
> --conf "spark.scheduler.mode=FAIR" \
> --conf
> "spark.executor.extraJavaOptions=-XX:+PrintGCDetails
> -XX:+PrintGCTimeStamps" \
> --jars
> /home/hduser/jars/spark-streaming-kafka-assembly_2.10-1.6.1.jar \
> --class "${FILE_NAME}" \
> --conf "spark.ui.port=${SP}" \
>
> HTH
>
> Dr Mich Talebzadeh
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
> http://talebzadehmich.wordpress.com
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 15 July 2016 at 13:48, Jean Georges Perrin  wrote:
>
>> Merci Nihed, this is one of the tests I did :( still not working
>>
>>
>>
>> On Jul 15, 2016, at 8:41 AM, nihed mbarek  wrote:
>>
>> can you try with :
>> SparkConf conf = new SparkConf().setAppName("NC Eatery app").set(
>> "spark.executor.memory", "4g")
>> .setMaster("spark://10.0.100.120:7077");
>> if (restId == 0) {
>> conf = conf.set("spark.executor.cores", "22");
>> } else {
>> conf = conf.set("spark.executor.cores", "2");
>> }
>> JavaSparkContext javaSparkContext = new JavaSparkContext(conf);
>>
>> On Fri, Jul 15, 2016 at 2:31 PM, Jean Georges Perrin  wrote:
>>
>>> Hi,
>>>
>>> Configuration: standalone cluster, Java, Spark 1.6.2, 24 cores
>>>
>>> My process uses all the cores of my server (good), but I am trying to
>>> limit it so I can actually submit a second job.
>>>
>>> I tried
>>>
>>> SparkConf conf = new SparkConf().setAppName("NC Eatery app").set(
>>> "spark.executor.memory", "4g")
>>> .setMaster("spark://10.0.100.120:7077");
>>> if (restId == 0) {
>>> conf = conf.set("spark.executor.cores", "22");
>>> } else {
>>> conf = conf.set("spark.executor.cores", "2");
>>> }
>>> JavaSparkContext javaSparkContext = new JavaSparkContext(conf);
>>>
>>> and
>>>
>>> SparkConf conf = new SparkConf().setAppName("NC Eatery app").set(
>>> "spark.executor.memory", "4g")
>>> .setMaster("spark://10.0.100.120:7077");
>>> if (restId == 0) {
>>> conf.set("spark.executor.cores", "22");
>>> } else {
>>> conf.set("spark.executor.cores", "2");
>>> }
>>> JavaSparkContext javaSparkContext = new JavaSparkContext(conf);
>>>
>>> but it does not seem to take it. Any hint?
>>>
>>> jg
>>>
>>>
>>>
>>
>>
>> --
>>
>> M'BAREK Med Nihed,
>> Fedora Ambassador, TUNISIA, Northern Africa
>> http://www.nihed.com
>>
>> 
>>
>>
>>
>
>


Re: How to Register Permanent User-Defined-Functions (UDFs) in SparkSQL

2016-07-12 Thread Daniel Darabos
Hi Lokesh,
There is no way to do that. SqlContext.newSession documentation says:

Returns a SQLContext as new session, with separated SQL configurations,
temporary tables, registered functions, but sharing the same SparkContext,
CacheManager, SQLListener and SQLTab.

You have two options: either use the same SQLContext instead of creating
new SQLContexts, or have a function for creating SQLContexts, and this
function can also register the UDFs in every created SQLContext.

On Sun, Jul 10, 2016 at 6:14 PM, Lokesh Yadav 
wrote:

> Hi
> with sqlContext we can register a UDF like
> this: sqlContext.udf.register("sample_fn", sample_fn _ )
> But this UDF is limited to that particular sqlContext only. I wish to make
> the registration persistent, so that I can access the same UDF in any
> subsequent sqlcontext.
> Or is there any other way to register UDFs in sparkSQL so that they remain
> persistent?
>
> Regards
> Lokesh
>


Re: What is the interpretation of Cores in Spark doc

2016-06-12 Thread Daniel Darabos
Spark is a software product. In software a "core" is something that a
process can run on. So it's a "virtual core". (Do not call these "threads".
A "thread" is not something a process can run on.)

local[*] uses java.lang.Runtime.availableProcessors()
.
Since Java is software, this also returns the number of virtual cores. (You
can test this easily.)


On Sun, Jun 12, 2016 at 9:23 PM, Mich Talebzadeh 
wrote:

>
> Hi,
>
> I was writing some docs on Spark P and came across this.
>
> It is about the terminology or interpretation of that in Spark doc.
>
> This is my understanding of cores and threads.
>
>  Cores are physical cores. Threads are virtual cores. Cores with 2 threads
> is called hyper threading technology so 2 threads per core makes the core
> work on two loads at same time. In other words, every thread takes care of
> one load.
>
> Core has its own memory. So if you have a dual core with hyper threading,
> the core works with 2 loads each at same time because of the 2 threads per
> core, but this 2 threads will share memory in that core.
>
> Some vendors as I am sure most of you aware charge licensing per core.
>
> For example on the same host that I have Spark, I have a SAP product that
> checks the licensing and shuts the application down if the license does not
> agree with the cores speced.
>
> This is what it says
>
> ./cpuinfo
> License hostid:00e04c69159a 0050b60fd1e7
> Detected 12 logical processor(s), 6 core(s), in 1 chip(s)
>
> So here I have 12 logical processors  and 6 cores and 1 chip. I call
> logical processors as threads so I have 12 threads?
>
> Now if I go and start worker process ${SPARK_HOME}/sbin/start-slaves.sh, I
> see this in GUI page
>
> [image: Inline images 1]
>
> it says 12 cores but I gather it is threads?
>
>
> Spark document
>  states
> and I quote
>
>
> [image: Inline images 2]
>
>
>
> OK the line local[k] adds  ..  *set this to the number of cores on your
> machine*
>
>
> But I know that it means threads. Because if I went and set that to 6, it
> would be only 6 threads as opposed to 12 threads.
>
>
> the next line local[*] seems to indicate it correctly as it refers to
> "logical cores" that in my understanding it is threads.
>
>
> I trust that I am not nitpicking here!
>
>
> Cheers,
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>


Re: [REPOST] Severe Spark Streaming performance degradation after upgrading to 1.6.1

2016-06-07 Thread Daniel Darabos
On Sun, Jun 5, 2016 at 9:51 PM, Daniel Darabos <
daniel.dara...@lynxanalytics.com> wrote:

> If you fill up the cache, 1.6.0+ will suffer performance degradation from
> GC thrashing. You can set spark.memory.useLegacyMode to true, or
> spark.memory.fraction to 0.66, or spark.executor.extraJavaOptions to
> -XX:NewRatio=3 to avoid this issue.
>
> I think my colleague filed a ticket for this issue, but I can't find it
> now. So treat it like unverified rumor for now, and try it for yourself if
> you're out of better ideas :). Good luck!
>

FYI there is a ticket for this issue now, with much more details:
https://issues.apache.org/jira/browse/SPARK-15796

On Sat, Jun 4, 2016 at 11:49 AM, Cosmin Ciobanu <ciob...@adobe.com> wrote:
>
>> Microbatch is 20 seconds. We’re not using window operations.
>>
>>
>>
>> The graphs are for a test cluster, and the entire load is artificially
>> generated by load tests (100k / 200k generated sessions).
>>
>>
>>
>> We’ve performed a few more performance tests. On the same 5 node cluster,
>> with the same application:
>>
>> · Spark 1.5.1 handled 170k+ generated sessions for 24hours with
>> no scheduling delay – the limit seems to be around 180k, above which
>> scheduling delay starts to increase;
>>
>> · Spark 1.6.1 had constant upward-trending scheduling delay from
>> the beginning for 100k+ generated sessions (this is also mentioned in the
>> initial post) – the load test was stopped after 25 minutes as scheduling
>> delay reached 3,5 minutes.
>>
>>
>>
>> P.S. Florin and I will be in SF next week, attending the Spark Summit on
>> Tuesday and Wednesday. We can meet and go into more details there - is
>> anyone working on Spark Streaming available?
>>
>>
>>
>> Cosmin
>>
>>
>>
>>
>>
>> *From: *Mich Talebzadeh <mich.talebza...@gmail.com>
>> *Date: *Saturday 4 June 2016 at 12:33
>> *To: *Florin Broască <florin.broa...@gmail.com>
>> *Cc: *David Newberger <david.newber...@wandcorp.com>, Adrian Tanase <
>> atan...@adobe.com>, "user@spark.apache.org" <user@spark.apache.org>,
>> ciobanu <ciob...@adobe.com>
>> *Subject: *Re: [REPOST] Severe Spark Streaming performance degradation
>> after upgrading to 1.6.1
>>
>>
>>
>> batch interval I meant
>>
>>
>>
>> thx
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn  
>> *https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>>
>>
>> On 4 June 2016 at 10:32, Mich Talebzadeh <mich.talebza...@gmail.com>
>> wrote:
>>
>> I may have missed these but:
>>
>>
>>
>> What is the windows interval, windowsLength and SlidingWindow
>>
>>
>>
>> Has the volume of ingest data (Kafka streaming) changed recently that you
>> may not be aware of?
>>
>>
>>
>> HTH
>>
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn  
>> *https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>>
>>
>> On 4 June 2016 at 09:50, Florin Broască <florin.broa...@gmail.com> wrote:
>>
>> Hi David,
>>
>>
>>
>> Thanks for looking into this. This is how the processing time looks like:
>>
>>
>>
>> [image: nline image 1]
>>
>>
>>
>> Appreciate any input,
>>
>> Florin
>>
>>
>>
>>
>>
>> On Fri, Jun 3, 2016 at 3:22 PM, David Newberger <
>> david.newber...@wandcorp.com> wrote:
>>
>> What does your processing time look like. Is it consistently within that
>> 20sec micro batch window?
>>
>>
>>
>> *David Newberger*
>>
>>
>>
>> *From:* Adrian Tanase [mailto:atan...@adobe.com]
>> *Sent:* Friday, June 3, 2016 8:14 AM
>> *To:* user@spark.apache.org
>> *Cc:* Cosmin Ciobanu
>> *Subject:* [REPOST] Severe Spark Streaming performance degradation after
>> upgrading to 1.6.1
>>
>>
>>
>> Hi all,
>>
>>
>>
>> Trying to repost this question from a colleague on my team, somehow his
>> subscription is not active:
>>
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/Severe-Spark-Streaming-performance-degradation-after-upgrading-to-1-6-1-td27056.html
>>
>>
>>
>> Appreciate any thoughts,
>>
>> -adrian
>>
>>
>>
>>
>>
>>
>>
>
>


Re: [REPOST] Severe Spark Streaming performance degradation after upgrading to 1.6.1

2016-06-05 Thread Daniel Darabos
If you fill up the cache, 1.6.0+ will suffer performance degradation from
GC thrashing. You can set spark.memory.useLegacyMode to true, or
spark.memory.fraction to 0.66, or spark.executor.extraJavaOptions to
-XX:NewRatio=3 to avoid this issue.

I think my colleague filed a ticket for this issue, but I can't find it
now. So treat it like unverified rumor for now, and try it for yourself if
you're out of better ideas :). Good luck!

On Sat, Jun 4, 2016 at 11:49 AM, Cosmin Ciobanu  wrote:

> Microbatch is 20 seconds. We’re not using window operations.
>
>
>
> The graphs are for a test cluster, and the entire load is artificially
> generated by load tests (100k / 200k generated sessions).
>
>
>
> We’ve performed a few more performance tests. On the same 5 node cluster,
> with the same application:
>
> · Spark 1.5.1 handled 170k+ generated sessions for 24hours with
> no scheduling delay – the limit seems to be around 180k, above which
> scheduling delay starts to increase;
>
> · Spark 1.6.1 had constant upward-trending scheduling delay from
> the beginning for 100k+ generated sessions (this is also mentioned in the
> initial post) – the load test was stopped after 25 minutes as scheduling
> delay reached 3,5 minutes.
>
>
>
> P.S. Florin and I will be in SF next week, attending the Spark Summit on
> Tuesday and Wednesday. We can meet and go into more details there - is
> anyone working on Spark Streaming available?
>
>
>
> Cosmin
>
>
>
>
>
> *From: *Mich Talebzadeh 
> *Date: *Saturday 4 June 2016 at 12:33
> *To: *Florin Broască 
> *Cc: *David Newberger , Adrian Tanase <
> atan...@adobe.com>, "user@spark.apache.org" ,
> ciobanu 
> *Subject: *Re: [REPOST] Severe Spark Streaming performance degradation
> after upgrading to 1.6.1
>
>
>
> batch interval I meant
>
>
>
> thx
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn  
> *https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
>
>
> On 4 June 2016 at 10:32, Mich Talebzadeh 
> wrote:
>
> I may have missed these but:
>
>
>
> What is the windows interval, windowsLength and SlidingWindow
>
>
>
> Has the volume of ingest data (Kafka streaming) changed recently that you
> may not be aware of?
>
>
>
> HTH
>
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn  
> *https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
>
>
> On 4 June 2016 at 09:50, Florin Broască  wrote:
>
> Hi David,
>
>
>
> Thanks for looking into this. This is how the processing time looks like:
>
>
>
> [image: nline image 1]
>
>
>
> Appreciate any input,
>
> Florin
>
>
>
>
>
> On Fri, Jun 3, 2016 at 3:22 PM, David Newberger <
> david.newber...@wandcorp.com> wrote:
>
> What does your processing time look like. Is it consistently within that
> 20sec micro batch window?
>
>
>
> *David Newberger*
>
>
>
> *From:* Adrian Tanase [mailto:atan...@adobe.com]
> *Sent:* Friday, June 3, 2016 8:14 AM
> *To:* user@spark.apache.org
> *Cc:* Cosmin Ciobanu
> *Subject:* [REPOST] Severe Spark Streaming performance degradation after
> upgrading to 1.6.1
>
>
>
> Hi all,
>
>
>
> Trying to repost this question from a colleague on my team, somehow his
> subscription is not active:
>
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Severe-Spark-Streaming-performance-degradation-after-upgrading-to-1-6-1-td27056.html
>
>
>
> Appreciate any thoughts,
>
> -adrian
>
>
>
>
>
>
>


Re: best way to do deep learning on spark ?

2016-03-19 Thread Daniel Darabos
On Thu, Mar 17, 2016 at 3:51 AM, charles li  wrote:

> Hi, Alexander,
>
> that's awesome, and when will that feature be released ? Since I want to
> know the opportunity cost between waiting for that release and use caffe or
> tensorFlow ?
>

I don't expect MLlib will be able to compete with major players like Caffe
or TensorFlow, at least not in the short term.

Check out https://github.com/amplab/SparkNet. It's from AMPLab (like
Spark), and it runs Caffe or TensorFlow on Spark. I think it's the state of
the art for deep learning on Spark.

great thanks again
>
> On Thu, Mar 17, 2016 at 10:32 AM, Ulanov, Alexander <
> alexander.ula...@hpe.com> wrote:
>
>> Hi Charles,
>>
>>
>>
>> There is an implementation of multilayer perceptron in Spark (since 1.5):
>>
>>
>> https://spark.apache.org/docs/latest/ml-classification-regression.html#multilayer-perceptron-classifier
>>
>>
>>
>> Other features such as autoencoder, convolutional layers, etc. are
>> currently under development. Please refer to
>> https://issues.apache.org/jira/browse/SPARK-5575
>>
>>
>>
>> Best regards, Alexander
>>
>>
>>
>> *From:* charles li [mailto:charles.up...@gmail.com]
>> *Sent:* Wednesday, March 16, 2016 7:01 PM
>> *To:* user 
>> *Subject:* best way to do deep learning on spark ?
>>
>>
>>
>>
>>
>> Hi, guys, I'm new to MLlib on spark, after reading the document, it seems
>> that MLlib does not support deep learning, I want to know is there any way
>> to implement deep learning on spark ?
>>
>>
>>
>> *Do I must use 3-party package like caffe or tensorflow ?*
>>
>>
>>
>> or
>>
>>
>>
>> *Does deep learning module list in the MLlib development plan?*
>>
>>
>>
>>
>> great thanks
>>
>>
>>
>> --
>>
>> *--*
>>
>> a spark lover, a quant, a developer and a good man.
>>
>>
>>
>> http://github.com/litaotao
>>
>
>
>
> --
> *--*
> a spark lover, a quant, a developer and a good man.
>
> http://github.com/litaotao
>


Re: coalesce and executor memory

2016-02-13 Thread Daniel Darabos
On Fri, Feb 12, 2016 at 11:10 PM, Koert Kuipers  wrote:

> in spark, every partition needs to fit in the memory available to the core
> processing it.
>

That does not agree with my understanding of how it works. I think you
could do sc.textFile("input").coalesce(1).map(_.replace("A",
"B")).saveAsTextFile("output") on a 1 TB local file and it would work fine.
(I just tested this with a 3 GB file with a 1 GB executor.)

RDDs are mostly implemented using iterators. For example map() operates
line-by-line, never pulling in the whole partition into memory. coalesce()
also just concatenates the iterators of the parent partitions into a new
iterator.

Some operations, like reduceByKey(), need to have the whole contents of the
partition to work. But they typically use ExternalAppendOnlyMap, so they
spill to disk instead of filling up the memory.

I know I'm not helping to answer Christopher's issue. Christopher, can you
perhaps give us an example that we can easily try in spark-shell to see the
same problem?


Re: coalesce and executor memory

2016-02-13 Thread Daniel Darabos
On Fri, Feb 12, 2016 at 11:10 PM, Koert Kuipers  wrote:

> in spark, every partition needs to fit in the memory available to the core
> processing it.
>

That does not agree with my understanding of how it works. I think you
could do sc.textFile("input").coalesce(1).map(_.replace("A",
"B")).saveAsTextFile("output") on a 1 TB local file and it would work fine.
(I just tested this with a 3 GB file with a 1 GB executor.)

RDDs are mostly implemented using iterators. For example map() operates
line-by-line, never pulling in the whole partition into memory. coalesce()
also just concatenates the iterators of the parent partitions into a new
iterator.

Some operations, like reduceByKey(), need to have the whole contents of the
partition to work. But they typically use ExternalAppendOnlyMap, so they
spill to disk instead of filling up the memory.

I know I'm not helping to answer Christopher's issue. Christopher, can you
perhaps give us an example that we can easily try in spark-shell to see the
same problem?


Re: Dataframe, Spark SQL - Drops First 8 Characters of String on Amazon EMR

2016-01-29 Thread Daniel Darabos
Hi Andrew,

If you still see this with Spark 1.6.0, it would be very helpful if you
could file a bug about it at https://issues.apache.org/jira/browse/SPARK with
as much detail as you can. This issue could be a nasty source of silent
data corruption in a case where some intermediate data loses 8 characters
but it is not obvious in the final output. Thanks!

On Fri, Jan 29, 2016 at 7:53 AM, Jonathan Kelly <jonathaka...@gmail.com>
wrote:

> Just FYI, Spark 1.6 was released on emr-4.3.0 a couple days ago:
> https://aws.amazon.com/blogs/aws/emr-4-3-0-new-updated-applications-command-line-export/
>
> On Thu, Jan 28, 2016 at 7:30 PM Andrew Zurn <awz...@gmail.com> wrote:
>
>> Hey Daniel,
>>
>> Thanks for the response.
>>
>> After playing around for a bit, it looks like it's probably the something
>> similar to the first situation you mentioned, with the Parquet format
>> causing issues. Both programmatically created dataset and a dataset pulled
>> off the internet (rather than out of S3 and put into HDFS/Hive) acted with
>> DataFrames as one would expect (printed out everything, grouped properly,
>> etc.)
>>
>> It looks like there is more than likely an outstanding bug that causes
>> issues with data coming from S3 and is converted in the parquet format
>> (found an article here highlighting it was around in 1.4, and I guess it
>> wouldn't be out of the realm of things for it still to exist. Link to
>> article:
>> https://www.appsflyer.com/blog/the-bleeding-edge-spark-parquet-and-s3/
>>
>> Hopefully a little more stability will come out with the upcoming Spark
>> 1.6 release on EMR (I think that is happening sometime soon).
>>
>> Thanks again for the advice on where to dig further into. Much
>> appreciated.
>>
>> Andrew
>>
>> On Tue, Jan 26, 2016 at 9:18 AM, Daniel Darabos <
>> daniel.dara...@lynxanalytics.com> wrote:
>>
>>> Have you tried setting spark.emr.dropCharacters to a lower value? (It
>>> defaults to 8.)
>>>
>>> :) Just joking, sorry! Fantastic bug.
>>>
>>> What data source do you have for this DataFrame? I could imagine for
>>> example that it's a Parquet file and on EMR you are running with two wrong
>>> version of the Parquet library and it messes up strings. It should be easy
>>> enough to try a different data format. You could also try what happens if
>>> you just create the DataFrame programmatically, e.g.
>>> sc.parallelize(Seq("asdfasdfasdf")).toDF.
>>>
>>> To understand better at which point the characters are lost you could
>>> try grouping by a string attribute. I see "education" ends up either as ""
>>> (empty string) or "y" in the printed output. But are the characters already
>>> lost when you try grouping by the attribute? Will there be a single ""
>>> category, or will you have separate categories for "primary" and "tertiary"?
>>>
>>> I think the correct output through the RDD suggests that the issue
>>> happens at the very end. So it will probably happen also with different
>>> data sources, and grouping will create separate groups for "primary" and
>>> "tertiary" even though they are printed as the same string at the end. You
>>> should also check the data from "take(10)" to rule out any issues with
>>> printing. You could try the same "groupBy" trick after "take(10)". Or you
>>> could print the lengths of the strings.
>>>
>>> Good luck!
>>>
>>> On Tue, Jan 26, 2016 at 3:53 AM, awzurn <awz...@gmail.com> wrote:
>>>
>>>> Sorry for the bump, but wondering if anyone else has seen this before.
>>>> We're
>>>> hoping to either resolve this soon, or move on with further steps to
>>>> move
>>>> this into an issue.
>>>>
>>>> Thanks in advance,
>>>>
>>>> Andrew Zurn
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context:
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Dataframe-Spark-SQL-Drops-First-8-Characters-of-String-on-Amazon-EMR-tp26022p26065.html
>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>>
>>>> -
>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>
>>>>
>>>
>>


Re: Dataframe, Spark SQL - Drops First 8 Characters of String on Amazon EMR

2016-01-26 Thread Daniel Darabos
Have you tried setting spark.emr.dropCharacters to a lower value? (It
defaults to 8.)

:) Just joking, sorry! Fantastic bug.

What data source do you have for this DataFrame? I could imagine for
example that it's a Parquet file and on EMR you are running with two wrong
version of the Parquet library and it messes up strings. It should be easy
enough to try a different data format. You could also try what happens if
you just create the DataFrame programmatically, e.g.
sc.parallelize(Seq("asdfasdfasdf")).toDF.

To understand better at which point the characters are lost you could try
grouping by a string attribute. I see "education" ends up either as ""
(empty string) or "y" in the printed output. But are the characters already
lost when you try grouping by the attribute? Will there be a single ""
category, or will you have separate categories for "primary" and "tertiary"?

I think the correct output through the RDD suggests that the issue happens
at the very end. So it will probably happen also with different data
sources, and grouping will create separate groups for "primary" and
"tertiary" even though they are printed as the same string at the end. You
should also check the data from "take(10)" to rule out any issues with
printing. You could try the same "groupBy" trick after "take(10)". Or you
could print the lengths of the strings.

Good luck!

On Tue, Jan 26, 2016 at 3:53 AM, awzurn  wrote:

> Sorry for the bump, but wondering if anyone else has seen this before.
> We're
> hoping to either resolve this soon, or move on with further steps to move
> this into an issue.
>
> Thanks in advance,
>
> Andrew Zurn
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Dataframe-Spark-SQL-Drops-First-8-Characters-of-String-on-Amazon-EMR-tp26022p26065.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: spark 1.6.0 on ec2 doesn't work

2016-01-18 Thread Daniel Darabos
Hi,

How do you know it doesn't work? The log looks roughly normal to me. Is
Spark not running at the printed address? Can you not start jobs?

On Mon, Jan 18, 2016 at 11:51 AM, Oleg Ruchovets 
wrote:

> Hi ,
>I try to follow the spartk 1.6.0 to install spark on EC2.
>
> It doesn't work properly -  got exceptions and at the end standalone spark
> cluster installed.
>

The purpose of the script is to install a standalone Spark cluster. So
that's not an error :).


> here is log information:
>
> Any suggestions?
>
> Thanks
> Oleg.
>
> oleg@robinhood:~/install/spark-1.6.0-bin-hadoop2.6/ec2$ ./spark-ec2
> --key-pair=CC-ES-Demo
>  
> --identity-file=/home/oleg/work/entity_extraction_framework/ec2_pem_key/CC-ES-Demo.pem
> --region=us-east-1 --zone=us-east-1a --spot-price=0.05   -s 5
> --spark-version=1.6.0launch entity-extraction-spark-cluster
> Setting up security groups...
> Searching for existing cluster entity-extraction-spark-cluster in region
> us-east-1...
> Spark AMI: ami-5bb18832
> Launching instances...
> Requesting 5 slaves as spot instances with price $0.050
> Waiting for spot instances to be granted...
> 0 of 5 slaves granted, waiting longer
> 0 of 5 slaves granted, waiting longer
> 0 of 5 slaves granted, waiting longer
> 0 of 5 slaves granted, waiting longer
> 0 of 5 slaves granted, waiting longer
> 0 of 5 slaves granted, waiting longer
> 0 of 5 slaves granted, waiting longer
> 0 of 5 slaves granted, waiting longer
> 0 of 5 slaves granted, waiting longer
> All 5 slaves granted
> Launched master in us-east-1a, regid = r-9384033f
> Waiting for AWS to propagate instance metadata...
> Waiting for cluster to enter 'ssh-ready' state..
>
> Warning: SSH connection error. (This could be temporary.)
> Host: ec2-52-90-186-83.compute-1.amazonaws.com
> SSH return code: 255
> SSH output: ssh: connect to host ec2-52-90-186-83.compute-1.amazonaws.com
> port 22: Connection refused
>
> .
>
> Warning: SSH connection error. (This could be temporary.)
> Host: ec2-52-90-186-83.compute-1.amazonaws.com
> SSH return code: 255
> SSH output: ssh: connect to host ec2-52-90-186-83.compute-1.amazonaws.com
> port 22: Connection refused
>
> .
>
> Warning: SSH connection error. (This could be temporary.)
> Host: ec2-52-90-186-83.compute-1.amazonaws.com
> SSH return code: 255
> SSH output: ssh: connect to host ec2-52-90-186-83.compute-1.amazonaws.com
> port 22: Connection refused
>
> .
> Cluster is now in 'ssh-ready' state. Waited 442 seconds.
> Generating cluster's SSH key on master...
> Warning: Permanently added 
> 'ec2-52-90-186-83.compute-1.amazonaws.com,52.90.186.83'
> (ECDSA) to the list of known hosts.
> Connection to ec2-52-90-186-83.compute-1.amazonaws.com closed.
> Warning: Permanently added 
> 'ec2-52-90-186-83.compute-1.amazonaws.com,52.90.186.83'
> (ECDSA) to the list of known hosts.
> Transferring cluster's SSH key to slaves...
> ec2-54-165-243-74.compute-1.amazonaws.com
> Warning: Permanently added 
> 'ec2-54-165-243-74.compute-1.amazonaws.com,54.165.243.74'
> (ECDSA) to the list of known hosts.
> ec2-54-88-245-107.compute-1.amazonaws.com
> Warning: Permanently added 
> 'ec2-54-88-245-107.compute-1.amazonaws.com,54.88.245.107'
> (ECDSA) to the list of known hosts.
> ec2-54-172-29-47.compute-1.amazonaws.com
> Warning: Permanently added 
> 'ec2-54-172-29-47.compute-1.amazonaws.com,54.172.29.47'
> (ECDSA) to the list of known hosts.
> ec2-54-165-131-210.compute-1.amazonaws.com
> Warning: Permanently added 
> 'ec2-54-165-131-210.compute-1.amazonaws.com,54.165.131.210'
> (ECDSA) to the list of known hosts.
> ec2-54-172-46-184.compute-1.amazonaws.com
> Warning: Permanently added 
> 'ec2-54-172-46-184.compute-1.amazonaws.com,54.172.46.184'
> (ECDSA) to the list of known hosts.
> Cloning spark-ec2 scripts from
> https://github.com/amplab/spark-ec2/tree/branch-1.5 on master...
> Warning: Permanently added 
> 'ec2-52-90-186-83.compute-1.amazonaws.com,52.90.186.83'
> (ECDSA) to the list of known hosts.
> Cloning into 'spark-ec2'...
> remote: Counting objects: 2068, done.
> remote: Total 2068 (delta 0), reused 0 (delta 0), pack-reused 2068
> Receiving objects: 100% (2068/2068), 349.76 KiB, done.
> Resolving deltas: 100% (796/796), done.
> Connection to ec2-52-90-186-83.compute-1.amazonaws.com closed.
> Deploying files to master...
> Warning: Permanently added 
> 'ec2-52-90-186-83.compute-1.amazonaws.com,52.90.186.83'
> (ECDSA) to the list of known hosts.
> sending incremental file list
> root/spark-ec2/ec2-variables.sh
>
> sent 1,835 bytes  received 40 bytes  416.67 bytes/sec
> total size is 1,684  speedup is 0.90
> Running setup on master...
> Warning: Permanently added 
> 'ec2-52-90-186-83.compute-1.amazonaws.com,52.90.186.83'
> (ECDSA) to the list of known hosts.
> Connection to ec2-52-90-186-83.compute-1.amazonaws.com closed.
> Warning: Permanently added 
> 'ec2-52-90-186-83.compute-1.amazonaws.com,52.90.186.83'
> (ECDSA) to the list of known hosts.
> Setting up Spark on 

Re: spark 1.6.0 on ec2 doesn't work

2016-01-18 Thread Daniel Darabos
On Mon, Jan 18, 2016 at 5:24 PM, Oleg Ruchovets 
wrote:

> I thought script tries to install hadoop / hdfs also. And it looks like it
> failed. Installation is only standalone spark without hadoop. Is it correct
> behaviour?
>

Yes, it also sets up two HDFS clusters. Are they not working? Try to see if
Spark is working by running some simple jobs on it. (See
http://spark.apache.org/docs/latest/ec2-scripts.html.)

There is no program called Hadoop. If you mean YARN, then indeed the script
does not set up YARN. It sets up standalone Spark.


> Also errors in the log:
>ERROR: Unknown Tachyon version
>Error: Could not find or load main class crayondata.com.log
>

As long as Spark is working fine, you can ignore all output from the EC2
script :).


Re: Using JDBC clients with "Spark on Hive"

2016-01-15 Thread Daniel Darabos
Does Hive JDBC work if you are not using Spark as a backend? I just had
very bad experience with Hive JDBC in general. E.g. half the JDBC protocol
is not implemented (https://issues.apache.org/jira/browse/HIVE-3175, filed
in 2012).

On Fri, Jan 15, 2016 at 2:15 AM, sdevashis  wrote:

> Hello Experts,
>
> I am getting started with Hive with Spark as the query engine. I built the
> package from sources. I am able to invoke Hive CLI and run queries and see
> in Ambari that Spark application are being created confirming hive is using
> Spark as the engine.
>
> However other than Hive CLI, I am not able to run queries from any other
> clients that use the JDBC to connect to hive through thrift. I tried
> Squirrel, Aginity Netezza workbench, and even Hue.
>
> No yarn applications are getting created, the query times out after
> sometime. Nothing gets into /tmp/user/hive.log Am I missing something?
>
> Again I am using Hive on Spark and not spark SQL.
>
> Version Info:
> Spark 1.4.1 built for Hadoop 2.4
>
>
> Thank you in advance for any pointers.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Using-JDBC-clients-with-Spark-on-Hive-tp25976.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: stopping a process usgin an RDD

2016-01-04 Thread Daniel Darabos
You can cause a failure by throwing an exception in the code running on the
executors. The task will be retried (if spark.task.maxFailures > 1), and
then the stage is failed. No further tasks are processed after that, and an
exception is thrown on the driver. You could catch the exception and see if
it was caused by your own special exception.

On Mon, Jan 4, 2016 at 1:05 PM, domibd  wrote:

> Hello,
>
> Is there a way to stop under a condition a process (like map-reduce) using
> an RDD ?
>
> (this could be use if the process does not always need to
>  explore all the RDD)
>
> thanks
>
> Dominique
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/stopping-a-process-usgin-an-RDD-tp25870.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Is coalesce smart while merging partitions?

2015-10-08 Thread Daniel Darabos
> For example does spark try to merge the small partitions first or the
election of partitions to merge is random?

It is quite smart as Iulian has pointed out. But it does not try to merge
small partitions first. Spark doesn't know the size of partitions. (The
partitions are represented as Iterators. You cannot know its size without
destroying it.)


Re: Meaning of local[2]

2015-08-17 Thread Daniel Darabos
Hi Praveen,

On Mon, Aug 17, 2015 at 12:34 PM, praveen S mylogi...@gmail.com wrote:

 What does this mean in .setMaster(local[2])

Local mode (executor in the same JVM) with 2 executor threads.

 Is this applicable only for standalone Mode?

It is not applicable for standalone mode, only for local.

 Can I do this in a cluster setup, eg:
 . setMaster(hostname:port[2])..

No. It's faster to try than to ask a mailing list, actually. Also it's
documented at
http://spark.apache.org/docs/latest/submitting-applications.html#master-urls
.

 Is it number of threads per worker node?

You can control the number of total threads with
spark-submit's --total-executor-cores parameter, if that's what you're
looking for.


Re: Research ideas using spark

2015-07-14 Thread Daniel Darabos
Hi Shahid,
To be honest I think this question is better suited for Stack Overflow than
for a PhD thesis.

On Tue, Jul 14, 2015 at 7:42 AM, shahid ashraf sha...@trialx.com wrote:

 hi

 I have a 10 node cluster  i loaded the data onto hdfs, so the no. of
 partitions i get is 9. I am running a spark application , it gets stuck on
 one of tasks, looking at the UI it seems application is not using all nodes
 to do calculations. attached is the screen shot of tasks, it seems tasks
 are put on each node more then once. looking at tasks 8 tasks get completed
 under 7-8 minutes and one task takes around 30 minutes so causing the delay
 in results.


 On Tue, Jul 14, 2015 at 10:48 AM, Shashidhar Rao 
 raoshashidhar...@gmail.com wrote:

 Hi,

 I am doing my PHD thesis on large scale machine learning e.g  Online
 learning, batch and mini batch learning.

 Could somebody help me with ideas especially in the context of Spark and
 to the above learning methods.

 Some ideas like improvement to existing algorithms, implementing new
 features especially the above learning methods and algorithms that have not
 been implemented etc.

 If somebody could help me with some ideas it would really accelerate my
 work.

 Plus few ideas on research papers regarding Spark or Mahout.

 Thanks in advance.

 Regards




 --
 with Regards
 Shahid Ashraf


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



Re: S3 vs HDFS

2015-07-09 Thread Daniel Darabos
I recommend testing it for yourself. Even if you have no application, you
can just run the spark-ec2 script, log in, run spark-shell and try reading
files from an S3 bucket and from hdfs://master IP:9000/. (This is the
ephemeral HDFS cluster, which uses SSD.)

I just tested our application this way yesterday and found the SSD-based
HDFS to outperform S3 by a factor of 2. I don't know the cause. It may be
locality like Akhil suggests, or SSD vs HDD (assuming S3 is HDD-backed). Or
the HDFS client library and protocol are just better than the S3 versions
(which is HTTP-based and uses some 6-year-old libraries).

On Thu, Jul 9, 2015 at 9:54 AM, Sujee Maniyam su...@sujee.net wrote:

 latency is much bigger for S3 (if that matters)
 And with HDFS you'd get data-locality that will boost your app performance.

 I did some light experimenting on this.
 see my presentation here for some benchmark numbers ..etc
 http://www.slideshare.net/sujee/hadoop-to-sparkv2
 from slide# 34

 cheers
 Sujee Maniyam (http://sujee.net | http://www.linkedin.com/in/sujeemaniyam
 )
 teaching Spark
 http://elephantscale.com/training/spark-for-developers/?utm_source=mailinglistutm_medium=emailutm_campaign=signature


 On Wed, Jul 8, 2015 at 11:35 PM, Brandon White bwwintheho...@gmail.com
 wrote:

 Are there any significant performance differences between reading text
 files from S3 and hdfs?





Re: Split RDD into two in a single pass

2015-07-06 Thread Daniel Darabos
This comes up so often. I wonder if the documentation or the API could be
changed to answer this question.

The solution I found is from
http://stackoverflow.com/questions/23995040/write-to-multiple-outputs-by-key-spark-one-spark-job.
You basically write the items into two directories in a single pass through
the RDD. Then you read back the two directories as two RDDs.

It avoids traversing the RDD twice, but writing and reading to the file
system is also costly. It may not worth it always.


On Mon, Jul 6, 2015 at 9:32 AM, Anand Nalya anand.na...@gmail.com wrote:

 Hi,

 I've a RDD which I want to split into two disjoint RDDs on with a boolean
 function. I can do this with the following

 val rdd1 = rdd.filter(f)
 val rdd2 = rdd.filter(fnot)

 I'm assuming that each of the above statement will traverse the RDD once
 thus resulting in 2 passes.

 Is there a way of doing this in a single pass over the RDD so that when f
 returns true, the element goes to rdd1 and to rdd2 otherwise.

 Regards,
 Anand



Re: .NET on Apache Spark?

2015-07-02 Thread Daniel Darabos
Indeed Spark does not have .NET bindings.

On Thu, Jul 2, 2015 at 10:33 AM, Zwits daniel.van...@ortec-finance.com
wrote:

 I'm currently looking into a way to run a program/code (DAG) written in
 .NET
 on a cluster using Spark. However I ran into problems concerning the coding
 language, Spark has no .NET API.
 I tried looking into IronPython because Spark does have a Python API, but i
 couldn't find a way to use this.

 Is there a way to implement a DAG of jobs on a cluster using .NET
 programming language?




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/NET-on-Apache-Spark-tp23578.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: map vs mapPartitions

2015-06-25 Thread Daniel Darabos
Spark creates a RecordReader and uses next() on it when you call
input.next(). (See
https://github.com/apache/spark/blob/v1.4.0/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala#L215)
How
the RecordReader works is an HDFS question, but it's safe to say there is
no difference between using map and mapPartitions.

On Thu, Jun 25, 2015 at 1:28 PM, Shushant Arora shushantaror...@gmail.com
wrote:

 say source is HDFS,And file is divided in 10 partitions. so what will be
  input contains.

 public IterableInteger call(IteratorString input)

 say I have 10 executors in job each having single partition.

 will it have some part of partition or complete. And if some when I call
 input.next() - it will fetch rest or how is it handled ?





 On Thu, Jun 25, 2015 at 3:11 PM, Sean Owen so...@cloudera.com wrote:

 No, or at least, it depends on how the source of the partitions was
 implemented.

 On Thu, Jun 25, 2015 at 12:16 PM, Shushant Arora
 shushantaror...@gmail.com wrote:
  Does mapPartitions keep complete partitions in memory of executor as
  iterable.
 
  JavaRDDString rdd = jsc.textFile(path);
  JavaRDDInteger output = rdd.mapPartitions(new
  FlatMapFunctionIteratorString, Integer() {
 
  public IterableInteger call(IteratorString input)
  throws Exception {
  ListInteger output = new ArrayListInteger();
  while(input.hasNext()){
  output.add(input.next().length());
  }
  return output;
  }
 
  });
 
 
  Here does input is present in memory and can contain complete partition
 of
  gbs ?
  Will this function call(IteratorString input) is called only for no of
  partitions(say if I have 10 in this example) times. Not no of lines
  times(say 1000) .
 
 
  And whats the use of mapPartitionsWithIndex ?
 
  Thanks
 





Re: Different Sorting RDD methods in Apache Spark

2015-06-09 Thread Daniel Darabos
It would be even faster to load the data on the driver and sort it there
without using Spark :). Using reduce() is cheating, because it only works
as long as the data fits on one machine. That is not the targeted use case
of a distributed computation system. You can repeat your test with more
data (that doesn't fit on one machine) to see what I mean.

On Tue, Jun 9, 2015 at 8:30 AM, raggy raghav0110...@gmail.com wrote:

 For a research project, I tried sorting the elements in an RDD. I did this
 in
 two different approaches.

 In the first method, I applied a mapPartitions() function on the RDD, so
 that it would sort the contents of the RDD, and provide a result RDD that
 contains the sorted list as the only record in the RDD. Then, I applied a
 reduce function which basically merges sorted lists.

 I ran these experiments on an EC2 cluster containing 30 nodes. I set it up
 using the spark ec2 script. The data file was stored in HDFS.

 In the second approach I used the sortBy method in Spark.

 I performed these operation on the US census data(100MB) found here

 A single lines looks like this

 9, Not in universe, 0, 0, Children, 0, Not in universe, Never married, Not
 in universe or children, Not in universe, White, All other, Female, Not in
 universe, Not in universe, Children or Armed Forces, 0, 0, 0, Nonfiler, Not
 in universe, Not in universe, Child 18 never marr not in subfamily, Child
 under 18 never married, 1758.14, Nonmover, Nonmover, Nonmover, Yes, Not in
 universe, 0, Both parents present, United-States, United-States,
 United-States, Native- Born in the United States, 0, Not in universe, 0, 0,
 94, - 5.
 I sorted based on the 25th value in the CSV. In this line that is 1758.14.

 I noticed that sortBy performs worse than the other method. Is this the
 expected scenario? If it is, why wouldn't the mapPartitions() and reduce()
 be the default sorting approach?

 Here is my implementation

 public static void sortBy(JavaSparkContext sc){
 JavaRDDString rdd = sc.textFile(/data.txt,32);
 long start = System.currentTimeMillis();
 rdd.sortBy(new FunctionString, Double(){

 @Override
 public Double call(String v1) throws Exception {
   // TODO Auto-generated method stub
   String [] arr = v1.split(,);
   return Double.parseDouble(arr[24]);
 }
 }, true, 9).collect();
 long end = System.currentTimeMillis();
 System.out.println(SortBy:  + (end - start));
   }

 public static void sortList(JavaSparkContext sc){
 JavaRDDString rdd = sc.textFile(/data.txt,32); //parallelize(l,
 8);
 long start = System.currentTimeMillis();
 JavaRDDLinkedListlt;Tuple2lt;Double, String rdd3 =
 rdd.mapPartitions(new FlatMapFunctionIteratorlt;String,
 LinkedListTuple2lt;Double, String(){

 @Override
 public IterableLinkedListlt;Tuple2lt;Double, String
 call(IteratorString t)
 throws Exception {
   // TODO Auto-generated method stub
   LinkedListTuple2lt;Double, String lines = new
 LinkedListTuple2lt;Double, String();
   while(t.hasNext()){
 String s = t.next();
 String arr1[] = s.split(,);
 Tuple2Double, String t1 = new Tuple2Double,
 String(Double.parseDouble(arr1[24]),s);
 lines.add(t1);
   }
   Collections.sort(lines, new IncomeComparator());
   LinkedListLinkedListlt;Tuple2lt;Double, String list = new
 LinkedListLinkedListlt;Tuple2lt;Double, String();
   list.add(lines);
   return list;
 }



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Different-Sorting-RDD-methods-in-Apache-Spark-tp23214.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: coGroup on RDDPojos

2015-06-08 Thread Daniel Darabos
I suggest you include your code and the error message! It's not even
immediately clear what programming language you mean to ask about.

On Mon, Jun 8, 2015 at 2:50 PM, elbehery elbeherymust...@gmail.com wrote:

 Hi,

 I have two datasets of customer types, and I would like to apply coGrouping
 on them.

 I could not find example for that on the website, and I have tried to apply
 this on my code, but the compiler complained ..

 Any suggestions ?




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/coGroup-on-RDD-Pojos-tp23206.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark error in execution

2015-01-06 Thread Daniel Darabos
Hello! I just had a very similar stack trace. It was caused by an Akka
version mismatch. (From trying to use Play 2.3 with Spark 1.1 by accident
instead of 1.2.)

On Mon, Nov 24, 2014 at 7:15 PM, Blackeye black...@iit.demokritos.gr
wrote:

 I created an application in spark. When I run it with spark, everything
 works
 fine. But when I export my application with the libraries (via sbt), and
 trying to run it as an executable jar, I get the following error:

 14/11/24 20:06:11 ERROR OneForOneStrategy: exception during creation
 akka.actor.ActorInitializationException: exception during creation
 at akka.actor.ActorInitializationException$.apply(Actor.scala:164)
 at akka.actor.ActorCell.create(ActorCell.scala:596)
 at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:456)
 at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
 at
 akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263)
 at akka.dispatch.Mailbox.run(Mailbox.scala:219)
 at

 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
 at
 scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at

 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at

 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 Caused by: java.lang.reflect.InvocationTargetException
 at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
 Method)
 at

 sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
 at

 sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
 at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
 at akka.util.Reflect$.instantiate(Reflect.scala:66)
 at akka.actor.ArgsReflectConstructor.produce(Props.scala:349)
 at akka.actor.Props.newActor(Props.scala:249)
 at akka.actor.ActorCell.newActor(ActorCell.scala:552)
 at akka.actor.ActorCell.create(ActorCell.scala:578)
 ... 9 more
 Caused by: java.lang.AbstractMethodError:

 akka.remote.RemoteActorRefProvider$RemotingTerminator.akka$actor$FSM$_setter_$Event_$eq(Lakka/actor/FSM$Event$;)V
 at akka.actor.FSM$class.$init$(FSM.scala:272)
 at

 akka.remote.RemoteActorRefProvider$RemotingTerminator.init(RemoteActorRefProvider.scala:36)
 ... 18 more
 14/11/24 20:06:11 ERROR ActorSystemImpl: Uncaught fatal error from thread
 [sparkDriver-akka.actor.default-dispatcher-2] shutting down ActorSystem
 [sparkDriver]
 java.lang.AbstractMethodError
 at akka.actor.ActorCell.create(ActorCell.scala:580)
 at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:456)
 at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
 at
 akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263)
 at akka.dispatch.Mailbox.run(Mailbox.scala:219)
 at

 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
 at
 scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at

 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at

 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 [ERROR] [11/24/2014 20:06:11.478]
 [sparkDriver-akka.actor.default-dispatcher-4] [ActorSystem(sparkDriver)]
 Uncaught fatal error from thread
 [sparkDriver-akka.actor.default-dispatcher-4] shutting down ActorSystem
 [sparkDriver]
 java.lang.AbstractMethodError
 at

 akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210)
 at
 akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
 at akka.actor.ActorCell.terminate(ActorCell.scala:369)
 at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462)
 at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
 at
 akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263)
 at akka.dispatch.Mailbox.run(Mailbox.scala:219)
 at

 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
 at
 scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at

 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at

 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

 [ERROR] [11/24/2014 20:06:11.481]
 [sparkDriver-akka.actor.default-dispatcher-3] [ActorSystem(sparkDriver)]
 Uncaught fatal error from thread
 

Re: when will the spark 1.3.0 be released?

2014-12-17 Thread Daniel Darabos
Spark 1.2.0 is coming in the next 48 hours according to
http://apache-spark-developers-list.1001551.n3.nabble.com/RESULT-VOTE-Release-Apache-Spark-1-2-0-RC2-tc9815.html

On Wed, Dec 17, 2014 at 10:11 AM, Madabhattula Rajesh Kumar 
mrajaf...@gmail.com wrote:

 Hi All,

 When will the Spark 1.2.0 be released? and What are the features in Spark
 1.2.0

 Regards,
 Rajesh

 On Wed, Dec 17, 2014 at 11:14 AM, Andrew Ash and...@andrewash.com wrote:

 Releases are roughly every 3mo so you should expect around March if the
 pace stays steady.

 2014-12-16 22:56 GMT-05:00 Marco Shaw marco.s...@gmail.com:

 When it is ready.



  On Dec 16, 2014, at 11:43 PM, 张建轶 zhangjia...@youku.com wrote:
 
  Hi £¡
 
  when will the spark 1.3.0 be released£¿
  I want to use new LDA feature.
  Thank you!
 B‹CB• È
 [œÝXœØÜšX™K  K[XZ[ ˆ \Ù\‹][œÝXœØÜšX™P Ü \šË˜\ XÚ K›Ü™ÃB‘›Üˆ Y  ] [Û˜[
 ÛÛ[X[™ Ë  K[XZ[ ˆ \Ù\‹Z [   Ü \šË˜\ XÚ K›Ü™ÃBƒB

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Can spark job have sideeffects (write files to FileSystem)

2014-12-11 Thread Daniel Darabos
Yes, this is perfectly legal. This is what RDD.foreach() is for! You may
be encountering an IO exception while writing, and maybe using() suppresses
it. (?) I'd try writing the files with java.nio.file.Files.write() -- I'd
expect there is less that can go wrong with that simple call.

On Thu, Dec 11, 2014 at 12:50 PM, Paweł Szulc paul.sz...@gmail.com wrote:

 Imagine simple Spark job, that will store each line of the RDD to a
 separate file


 val lines = sc.parallelize(1 to 100).map(n = sthis is line $n)
 lines.foreach(line = writeToFile(line))

 def writeToFile(line: String) = {
 def filePath = file://...
 val file = new File(new URI(path).getPath)
 // using function simply closes the output stream
 using(new FileOutputStream(file)) { output =
   output.write(value)
 }
 }


 Now, example above works 99,9% of a time. Files are generated for each
 line, each file contains that particular line.

 However, when dealing with large number of data, we encounter situations
 where some of the files are empty! Files are generated, but there is no
 content inside of them (0 bytes).

 Now the question is: can Spark job have side effects. Is it even legal to
 write such code?
 If no, then what other choice do we have when we want to save data from
 our RDD?
 If yes, then do you guys see what could be the reason of this job acting
 in this strange manner 0.1% of the time?


 disclaimer: we are fully aware of .saveAsTextFile method in the API,
 however the example above is a simplification of our code - normally we
 produce PDF files.


 Best regards,
 Paweł Szulc









Re: How can I create an RDD with millions of entries created programmatically

2014-12-09 Thread Daniel Darabos
Ah... I think you're right about the flatMap then :). Or you could use
mapPartitions. (I'm not sure if it makes a difference.)

On Mon, Dec 8, 2014 at 10:09 PM, Steve Lewis lordjoe2...@gmail.com wrote:

 looks good but how do I say that in Java
 as far as I can see sc.parallelize (in Java)  has only one implementation
 which takes a List - requiring an in memory representation

 On Mon, Dec 8, 2014 at 12:06 PM, Daniel Darabos 
 daniel.dara...@lynxanalytics.com wrote:

 Hi,
 I think you have the right idea. I would not even worry about flatMap.

 val rdd = sc.parallelize(1 to 100, numSlices = 1000).map(x =
 generateRandomObject(x))

 Then when you try to evaluate something on this RDD, it will happen
 partition-by-partition. So 1000 random objects will be generated at a time
 per executor thread.

 On Mon, Dec 8, 2014 at 8:05 PM, Steve Lewis lordjoe2...@gmail.com
 wrote:

  I have a function which generates a Java object and I want to explore
 failures which only happen when processing large numbers of these object.
 the real code is reading a many gigabyte file but in the test code I can
 generate similar objects programmatically. I could create a small list,
 parallelize it and then use flatmap to inflate it several times by a factor
 of 1000 (remember I can hold a list of 1000 items in memory but not a
 million)
 Are there better ideas - remember I want to create more objects than can
 be held in memory at once.





 --
 Steven M. Lewis PhD
 4221 105th Ave NE
 Kirkland, WA 98033
 206-384-1340 (cell)
 Skype lordjoe_com




Re: Efficient self-joins

2014-12-08 Thread Daniel Darabos
I do not see how you hope to generate all incoming edge pairs without
repartitioning the data by dstID. You need to perform this shuffle for
joining too. Otherwise two incoming edges could be in separate partitions
and never meet. Am I missing something?

On Mon, Dec 8, 2014 at 3:53 PM, Theodore Vasiloudis 
theodoros.vasilou...@gmail.com wrote:

 Using groupByKey was our first approach, and as noted in the docs is
 highly inefficient due to the need to shuffle all the data. See
 http://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html

 On Mon, Dec 8, 2014 at 3:47 PM, Daniel Darabos 
 daniel.dara...@lynxanalytics.com wrote:

 Could you not use a groupByKey instead of the join? I mean something like
 this:

 val byDst = rdd.map { case (src, dst, w) = dst - (src, w) }
 byDst.groupByKey.map { case (dst, edges) =
   for {
 (src1, w1) - edges
 (src2, w2) - edges
   } {
 ??? // Do something.
   }
   ??? // Return something.
 }

 On Mon, Dec 8, 2014 at 3:28 PM, Koert Kuipers ko...@tresata.com wrote:

 spark can do efficient joins if both RDDs have the same partitioner. so
 in case of self join I would recommend to create an rdd that has explicit
 partitioner and has been cached.
 On Dec 8, 2014 8:52 AM, Theodore Vasiloudis 
 theodoros.vasilou...@gmail.com wrote:

 Hello all,

 I am working on a graph problem using vanilla Spark (not GraphX) and at
 some
 point I would like to do a
 self join on an edges RDD[(srcID, dstID, w)] on the dst key, in order
 to get
 all pairs of incoming edges.

 Since this is the performance bottleneck for my code, I was wondering if
 there any steps to take before
 performing the self-join in order to make it as efficient as possible.

 In the  Learning Spark book
 
 https://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch04.html
 
 for example, in the Data partitioning section they recommend
 performing .partitionBy(new HashPartitioner(100)) on an RDD before
 joining
 it with another.

 Are there any guidelines for optimizing self-join performance?

 Regards,
 Theodore




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Efficient-self-joins-tp20576.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






Re: Efficient self-joins

2014-12-08 Thread Daniel Darabos
On Mon, Dec 8, 2014 at 5:26 PM, Theodore Vasiloudis 
theodoros.vasilou...@gmail.com wrote:

 @Daniel
 It's true that the first map in your code is needed, i.e. mapping so that
 dstID is the new RDD key.


You wrote groupByKey is highly inefficient due to the need to shuffle all
the data, but you seem to agree that the shuffle cannot be avoided. Both
approaches cause 1 shuffle.

I still don't see why you expect a speedup from doing this with a join. But
don't let me discourage you or anything. I'm not an expert, just trying to
learn.

The self-join on the dstKey will then create all the pairs of incoming
 edges (plus self-referential and duplicates that need to be filtered out).

 @Koert
 Are there any guidelines about setting the number of partitions in
 HashParitioner then?

 What I know about my data is that the distribution of indegree value
 (number of incoming edges for a vertex) will be similar to a power law
 https://en.wikipedia.org/wiki/Power_law, i.e.
 there will be a small number of keys with a high number of incoming edges,
 while most of the keys will
 have incoming few edges.

 What is a good partitioning strategy for a self-join on an RDD with
 unbalanced key distributions?



 On Mon, Dec 8, 2014 at 3:59 PM, Daniel Darabos 
 daniel.dara...@lynxanalytics.com wrote:

 I do not see how you hope to generate all incoming edge pairs without
 repartitioning the data by dstID. You need to perform this shuffle for
 joining too. Otherwise two incoming edges could be in separate partitions
 and never meet. Am I missing something?

 On Mon, Dec 8, 2014 at 3:53 PM, Theodore Vasiloudis 
 theodoros.vasilou...@gmail.com wrote:

 Using groupByKey was our first approach, and as noted in the docs is
 highly inefficient due to the need to shuffle all the data. See
 http://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html

 On Mon, Dec 8, 2014 at 3:47 PM, Daniel Darabos 
 daniel.dara...@lynxanalytics.com wrote:

 Could you not use a groupByKey instead of the join? I mean something
 like this:

 val byDst = rdd.map { case (src, dst, w) = dst - (src, w) }
 byDst.groupByKey.map { case (dst, edges) =
   for {
 (src1, w1) - edges
 (src2, w2) - edges
   } {
 ??? // Do something.
   }
   ??? // Return something.
 }

 On Mon, Dec 8, 2014 at 3:28 PM, Koert Kuipers ko...@tresata.com
 wrote:

 spark can do efficient joins if both RDDs have the same partitioner.
 so in case of self join I would recommend to create an rdd that has
 explicit partitioner and has been cached.
 On Dec 8, 2014 8:52 AM, Theodore Vasiloudis 
 theodoros.vasilou...@gmail.com wrote:

 Hello all,

 I am working on a graph problem using vanilla Spark (not GraphX) and
 at some
 point I would like to do a
 self join on an edges RDD[(srcID, dstID, w)] on the dst key, in order
 to get
 all pairs of incoming edges.

 Since this is the performance bottleneck for my code, I was wondering
 if
 there any steps to take before
 performing the self-join in order to make it as efficient as possible.

 In the  Learning Spark book
 
 https://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch04.html
 
 for example, in the Data partitioning section they recommend
 performing .partitionBy(new HashPartitioner(100)) on an RDD before
 joining
 it with another.

 Are there any guidelines for optimizing self-join performance?

 Regards,
 Theodore




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Efficient-self-joins-tp20576.html
 Sent from the Apache Spark User List mailing list archive at
 Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org








Re: How can I create an RDD with millions of entries created programmatically

2014-12-08 Thread Daniel Darabos
Hi,
I think you have the right idea. I would not even worry about flatMap.

val rdd = sc.parallelize(1 to 100, numSlices = 1000).map(x =
generateRandomObject(x))

Then when you try to evaluate something on this RDD, it will happen
partition-by-partition. So 1000 random objects will be generated at a time
per executor thread.

On Mon, Dec 8, 2014 at 8:05 PM, Steve Lewis lordjoe2...@gmail.com wrote:

  I have a function which generates a Java object and I want to explore
 failures which only happen when processing large numbers of these object.
 the real code is reading a many gigabyte file but in the test code I can
 generate similar objects programmatically. I could create a small list,
 parallelize it and then use flatmap to inflate it several times by a factor
 of 1000 (remember I can hold a list of 1000 items in memory but not a
 million)
 Are there better ideas - remember I want to create more objects than can
 be held in memory at once.




Re: akka.remote.transport.Transport$InvalidAssociationException: The remote system terminated the association because it is shutting down

2014-12-05 Thread Daniel Darabos
Hi, Alexey,
I'm getting the same error on startup with Spark 1.1.0. Everything works
fine fortunately.

The error is mentioned in the logs in
https://issues.apache.org/jira/browse/SPARK-4498, so maybe it will also be
fixed in Spark 1.2.0 and 1.1.2. I have no insight into it unfortunately.

On Tue, Dec 2, 2014 at 1:38 PM, Alexey Romanchuk alexey.romanc...@gmail.com
 wrote:

 Any ideas? Anyone got the same error?

 On Mon, Dec 1, 2014 at 2:37 PM, Alexey Romanchuk 
 alexey.romanc...@gmail.com wrote:

 Hello spark users!

 I found lots of strange messages in driver log. Here it is:

 2014-12-01 11:54:23,849 [sparkDriver-akka.actor.default-dispatcher-25]
 ERROR
 akka.remote.EndpointWriter[akka://sparkDriver/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FsparkExecutor%40data1.hadoop%3A17372-5/endpointWriter]
 - AssociationError [akka.tcp://sparkDriver@10.54.87.173:55034] -
 [akka.tcp://sparkExecutor@data1.hadoop:17372]: Error [Shut down address:
 akka.tcp://sparkExecutor@data1.hadoop:17372] [
 akka.remote.ShutDownAssociation: Shut down address:
 akka.tcp://sparkExecutor@data1.hadoop:17372
 Caused by: akka.remote.transport.Transport$InvalidAssociationException:
 The remote system terminated the association because it is shutting down.
 ]

 I got this message for every worker twice. First - for driverPropsFetcher
 and next for sparkExecutor. Looks like spark shutdown remote akka system
 incorrectly or there is some race condition in this process and driver sent
 some data to worker, but worker's actor system already in shutdown state.

 Except for this message everything works fine. But this is ERROR level
 message and I found it in my ERROR only log.

 Do you have any idea is it configuration issue, bug in spark or akka or
 something else?

 Thanks!





Re: Increasing the number of retry in case of job failure

2014-12-05 Thread Daniel Darabos
It is controlled by spark.task.maxFailures. See
http://spark.apache.org/docs/latest/configuration.html#scheduling.

On Fri, Dec 5, 2014 at 11:02 AM, shahab shahab.mok...@gmail.com wrote:

 Hello,

 By some (unknown) reasons some of my tasks, that fetch data from
 Cassandra, are failing so often, and apparently the master removes a tasks
 which fails more than 4 times (in my case).

 Is there any way to increase the number of re-tries ?

 best,
 /Shahab



Re: SPARK LIMITATION - more than one case class is not allowed !!

2014-12-05 Thread Daniel Darabos
On Fri, Dec 5, 2014 at 7:12 AM, Tobias Pfeiffer t...@preferred.jp wrote:

 Rahul,

 On Fri, Dec 5, 2014 at 2:50 PM, Rahul Bindlish 
 rahul.bindl...@nectechnologies.in wrote:

 I have done so thats why spark is able to load objectfile [e.g.
 person_obj]
 and spark has maintained serialVersionUID [person_obj].

 Next time when I am trying to load another objectfile [e.g. office_obj]
 and
 I think spark is matching serialVersionUID [person_obj] with previous
 serialVersionUID [person_obj] and giving mismatch error.

 In my first post, I have give statements which can be executed easily to
 replicate this issue.


 Can you post the Scala source for your case classes? I have tried the
 following in spark-shell:

 case class Dog(name: String)
 case class Cat(age: Int)
 val dogs = sc.parallelize(Dog(foo) :: Dog(bar) :: Nil)
 val cats = sc.parallelize(Cat(1) :: Cat(2) :: Nil)
 dogs.saveAsObjectFile(test_dogs)
 cats.saveAsObjectFile(test_cats)

 This gives two directories test_dogs/ and test_cats/. Then I restarted
 spark-shell and entered:

 case class Dog(name: String)
 case class Cat(age: Int)
 val dogs = sc.objectFile(test_dogs)
 val cats = sc.objectFile(test_cats)

 I don't get an exception, but:

 dogs: org.apache.spark.rdd.RDD[Nothing] = FlatMappedRDD[1] at objectFile
 at console:12


You need to specify the type of the RDD. The compiler does not know what is
in test_dogs.

val dogs = sc.objectFile[Dog](test_dogs)
val cats = sc.objectFile[Cat](test_cats)

It's an easy mistake to make... I wonder if an assertion could be
implemented that makes sure the type parameter is present.


Re: How to enforce RDD to be cached?

2014-12-03 Thread Daniel Darabos
On Wed, Dec 3, 2014 at 10:52 AM, shahab shahab.mok...@gmail.com wrote:

 Hi,

 I noticed that rdd.cache() is not happening immediately rather due to lazy
 feature of Spark, it is happening just at the moment  you perform some
 map/reduce actions. Is this true?


Yes, this is correct.

If this is the case, how can I enforce Spark to cache immediately at its
 cache() statement? I need this to perform some benchmarking and I need to
 separate rdd caching and rdd transformation/action processing time.


The typical solution I think is to run rdd.foreach(_ = ()) to trigger a
calculation.


Re: Is sorting persisted after pair rdd transformations?

2014-11-19 Thread Daniel Darabos
Akhil, I think Aniket uses the word persisted in a different way than
what you mean. I.e. not in the RDD.persist() way. Aniket asks if running
combineByKey on a sorted RDD will result in a sorted RDD. (I.e. the sorting
is preserved.)

I think the answer is no. combineByKey uses AppendOnlyMap, which is a
hashmap. That will shuffle your keys. You can quickly verify it in
spark-shell:

scala sc.parallelize(7 to 8).map(_ - 1).reduceByKey(_ + _).collect
res0: Array[(Int, Int)] = Array((8,1), (7,1))

(The initial size of the AppendOnlyMap seems to be 8, so 8 is the first
number that demonstrates this.)

On Wed, Nov 19, 2014 at 9:05 AM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 If something is persisted you can easily see them under the Storage tab in
 the web ui.

 Thanks
 Best Regards

 On Tue, Nov 18, 2014 at 7:26 PM, Aniket Bhatnagar 
 aniket.bhatna...@gmail.com wrote:

 I am trying to figure out if sorting is persisted after applying Pair RDD
 transformations and I am not able to decisively tell after reading the
 documentation.

 For example:
 val numbers = .. // RDD of numbers
 val pairedNumbers = numbers.map(number = (number % 100, number))
 val sortedPairedNumbers = pairedNumbers.sortBy(pairedNumber =
 pairedNumber._2) // Sort by values in the pair
 val aggregates = sortedPairedNumbers.combineByKey(..)

 In this example, will the combine functions see values in sorted order?
 What if I had done groupByKey and then combineByKey? What transformations
 can unsort an already sorted data?





Re: Is sorting persisted after pair rdd transformations?

2014-11-19 Thread Daniel Darabos
Ah, so I misunderstood you too :).

My reading of org/ apache/spark/Aggregator.scala is that your function will
always see the items in the order that they are in the input RDD. An RDD
partition is always accessed as an iterator, so it will not be read out of
order.

On Wed, Nov 19, 2014 at 2:28 PM, Aniket Bhatnagar 
aniket.bhatna...@gmail.com wrote:

 Thanks Daniel. I can understand that the keys will not be in sorted order
 but what I am trying to understanding is whether the functions are passed
 values in sorted order in a given partition.

 For example:

 sc.parallelize(1 to 8).map(i = (1, i)).sortBy(t = t._2).foldByKey(0)((a,
 b) = b).collect
 res0: Array[(Int, Int)] = Array((1,8))

 The fold always given me last value as 8 which suggests values preserve
 sorting earlier defined in stage in DAG?

 On Wed Nov 19 2014 at 18:10:11 Daniel Darabos 
 daniel.dara...@lynxanalytics.com wrote:

 Akhil, I think Aniket uses the word persisted in a different way than
 what you mean. I.e. not in the RDD.persist() way. Aniket asks if running
 combineByKey on a sorted RDD will result in a sorted RDD. (I.e. the sorting
 is preserved.)

 I think the answer is no. combineByKey uses AppendOnlyMap, which is a
 hashmap. That will shuffle your keys. You can quickly verify it in
 spark-shell:

 scala sc.parallelize(7 to 8).map(_ - 1).reduceByKey(_ + _).collect
 res0: Array[(Int, Int)] = Array((8,1), (7,1))

 (The initial size of the AppendOnlyMap seems to be 8, so 8 is the first
 number that demonstrates this.)

 On Wed, Nov 19, 2014 at 9:05 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 If something is persisted you can easily see them under the Storage tab
 in the web ui.

 Thanks
 Best Regards

 On Tue, Nov 18, 2014 at 7:26 PM, Aniket Bhatnagar 
 aniket.bhatna...@gmail.com wrote:

 I am trying to figure out if sorting is persisted after applying Pair
 RDD transformations and I am not able to decisively tell after reading the
 documentation.

 For example:
 val numbers = .. // RDD of numbers
 val pairedNumbers = numbers.map(number = (number % 100, number))
 val sortedPairedNumbers = pairedNumbers.sortBy(pairedNumber =
 pairedNumber._2) // Sort by values in the pair
 val aggregates = sortedPairedNumbers.combineByKey(..)

 In this example, will the combine functions see values in sorted order?
 What if I had done groupByKey and then combineByKey? What transformations
 can unsort an already sorted data?






Task duration graph on Spark stage UI

2014-11-06 Thread Daniel Darabos
Even though the stage UI has min, 25th%, median, 75th%, and max durations,
I am often still left clueless about the distribution. For example, 100 out
of 200 tasks (started at the same time) have completed in 1 hour. How much
longer do I have to wait? I cannot guess well based on the five numbers.

A graph of the durations will not answer the question either, but I think
it gives a better idea. I can hopefully see if the distribution is linearly
sloped or bimodal or exponentially slowing down, etc.

It's easy to draw this graph, so I set it up as a Chrome extension:

https://chrome.google.com/webstore/detail/spark-distributions/hhgnppbenlghmcimkmiccfiemdohdgoo

And here's the complete source code that you can throw in the JavaScript
console for the same results:

var x = $('table:eq(2)').find('td:nth-child(8)').map(function (i, e) {
return parseInt($(e).attr('sorttable_customkey')); });
x.sort(function(a, b) { return a - b; });
var w = x.length;
var h = x[w - 1];
var W = 180;
var H = 80;
var canvas = $('canvas width=' + W + ' height=' + H + '');
canvas.css({ position: 'absolute', top: '100px', left: '500px' });
$('body').append(canvas);
var ctx = canvas[0].getContext('2d');
ctx.fillStyle = 'orange';
ctx.beginPath();
ctx.moveTo(0, H);
for (var i = 0; i  w; ++i) {
  ctx.lineTo(i * W / (w - 1), H - x[i] * H / h);
}
ctx.lineTo(W, H);
ctx.fill();

It should not be much work to add this to the stage status page itself
either, if there is interest.


Re: registering Array of CompactBuffer to Kryo

2014-10-02 Thread Daniel Darabos
How about this?

Class.forName([Lorg.apache.spark.util.collection.CompactBuffer;)

On Tue, Sep 30, 2014 at 5:33 PM, Andras Barjak
andras.bar...@lynxanalytics.com wrote:
 Hi,

 what is the correct scala code to register an Array of this private spark
 class to Kryo?

 java.lang.IllegalArgumentException: Class is not registered:
 org.apache.spark.util.collection.CompactBuffer[]
 Note: To register this class use:
 kryo.register(org.apache.spark.util.collection.CompactBuffer[].class);

 thanks,

 András Barják

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark RDD member of class loses it's value when the class being used as graph attribute

2014-06-30 Thread Daniel Darabos
Can you share some example code of what you are doing?

BTW Gmail puts down your mail as spam, saying it cannot verify it came from
yahoo.com. Might want to check your mail client settings. (It could be a
Gmail or Yahoo bug too of course.)


On Fri, Jun 27, 2014 at 4:29 PM, harsh2005_7 harsh200...@yahoo.com wrote:

 Hi,

 I have a scenario where I am having a class X with constructor parameter as
 (RDD,Double).When I am initializing the the class object with corresponding
 RDD and double value (of name say x1) and *putting it as a vertex attribute
 in graph* , I am losing my RDD value . The Double value remains intact . I
 tried accessing simultaneously the RDD from instance variable (x1) and i
 see
 it intact there but for some reason it's not available when i take graph
 vertex attribute and access the RDD. Please help me to understand which
 concept I am missing here ? And whats the correct way to do it.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-RDD-member-of-class-loses-it-s-value-when-the-class-being-used-as-graph-attribute-tp8420.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: What is the best way to handle transformations or actions that takes forever?

2014-06-17 Thread Daniel Darabos
I think you need to implement a timeout in your code. As far as I know,
Spark will not interrupt the execution of your code as long as the driver
is connected. Might be an idea though.


On Tue, Jun 17, 2014 at 7:54 PM, Peng Cheng pc...@uow.edu.au wrote:

 I've tried enabling the speculative jobs, this seems partially solved the
 problem, however I'm not sure if it can handle large-scale situations as it
 only start when 75% of the job is finished.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/What-is-the-best-way-to-handle-transformations-or-actions-that-takes-forever-tp7664p7752.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: join operation is taking too much time

2014-06-17 Thread Daniel Darabos
I've been wondering about this. Is there a difference in performance
between these two?

val rdd1 = sc.textFile(files.mkString(,)) val rdd2 = sc.union(files.map(sc
.textFile(_)))

I don't know about your use-case, Meethu, but it may be worth trying to see
if reading all the files into one RDD (like rdd1) would perform better in
the join. (If this is possible in your situation.)



On Tue, Jun 17, 2014 at 6:45 PM, Andrew Or and...@databricks.com wrote:

 How long does it get stuck for? This is a common sign for the OS thrashing
 due to out of memory exceptions. If you keep it running longer, does it
 throw an error?

 Depending on how large your other RDD is (and your join operation), memory
 pressure may or may not be the problem at all. It could be that spilling
 your shuffles
 to disk is slowing you down (but probably shouldn't hang your
 application). For the 5 RDDs case, what happens if you set
 spark.shuffle.spill to false?


 2014-06-17 5:59 GMT-07:00 MEETHU MATHEW meethu2...@yahoo.co.in:


  Hi all,

 I want  to do a recursive leftOuterJoin between an RDD (created from
  file) with 9 million rows(size of the file is 100MB) and 30 other
 RDDs(created from 30 diff files in each iteration of a loop) varying from 1
 to 6 million rows.
 When I run it for 5 RDDs,its running successfully  in 5 minutes.But when
 I increase it to 10 or 30 RDDs its gradually slowing down and finally
 getting stuck without showing any warning or error.

 I am running in standalone mode with 2 workers of 4GB each and a total
 of 16 cores .

 Any of you facing similar problems with JOIN  or is it a problem with my
 configuration.

 Thanks  Regards,
 Meethu M





Is shuffle stable?

2014-06-14 Thread Daniel Darabos
What I mean is, let's say I run this:

sc.parallelize(Seq(0-3, 0-2, 0-1), 3).partitionBy(HashPartitioner(3)).collect


Will the result always be Array((0,3), (0,2), (0,1))? Or could I
possibly get a different order?


I'm pretty sure the shuffle files are taken in the order of the source
partitions... But after much search, and the discussion on
http://stackoverflow.com/questions/24206660/does-groupbykey-in-spark-preserve-the-original-order
I still can't find the code that does this.


Thanks!


Re: Is shuffle stable?

2014-06-14 Thread Daniel Darabos
Thanks Matei!

In the example all three items have the same key, so they go to the same
partition:

scala sc.parallelize(Seq(0-3, 0-2, 0-1), 3).partitionBy(new
HashPartitioner(3)).glom.collect


Array(Array((0,3), (0,2), (0,1)), Array(), Array())


I guess the apparent stability is just due to the single-threaded local
master processing partitions in order, so (0,3) is produced first, etc.



On Sun, Jun 15, 2014 at 12:55 AM, Matei Zaharia matei.zaha...@gmail.com
wrote:

 The order is not guaranteed actually, only which keys end up in each
 partition. Reducers may fetch data from map tasks in an arbitrary order,
 depending on which ones are available first. If you’d like a specific
 order, you should sort each partition. Here you might be getting it because
 each partition only ends up having one element, and collect() does return
 the partitions in order.

 Matei

 On Jun 14, 2014, at 12:14 PM, Daniel Darabos 
 daniel.dara...@lynxanalytics.com wrote:

 What I mean is, let's say I run this:

 sc.parallelize(Seq(0-3, 0-2, 0-1), 
 3).partitionBy(HashPartitioner(3)).collect


 Will the result always be Array((0,3), (0,2), (0,1))? Or could I possibly get 
 a different order?


 I'm pretty sure the shuffle files are taken in the order of the source 
 partitions... But after much search, and the discussion on 
 http://stackoverflow.com/questions/24206660/does-groupbykey-in-spark-preserve-the-original-order
  I still can't find the code that does this.


 Thanks!





Re: list of persisted rdds

2014-06-13 Thread Daniel Darabos
Check out SparkContext.getPersistentRDDs!


On Fri, Jun 13, 2014 at 1:06 PM, mrm ma...@skimlinks.com wrote:

 Hi,

 How do I check the rdds that I have persisted? I have some code that looks
 like:
 rd1.cache()
 
 rd2.cache()
 ...
 
 rdN.cache()

 How can I unpersist all rdd's at once? And is it possible to get the names
 of the rdd's that are currently persisted (list = rd1, rd2, ..., rdN)?

 Thank you!




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/list-of-persisted-rdds-tp7565.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Hanging Spark jobs

2014-06-11 Thread Daniel Darabos
These stack traces come from the stuck node? Looks like it's waiting on
data in BlockFetcherIterator. Waiting for data from another node. But you
say all other nodes were done? Very curious.

Maybe you could try turning on debug logging, and try to figure out what
happens in BlockFetcherIterator (
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala).
I do not think it is supposed to get stuck indefinitely.

On Tue, Jun 10, 2014 at 8:22 PM, Hurwitz, Daniel dhurw...@ebay.com wrote:

  Hi,



 We are observing a recurring issue where our Spark jobs are hanging for
 several hours, even days, until we kill them.



 We are running Spark v0.9.1 over YARN.



 Our input is a list of edges of a graph on which we use Bagel to compute
 connected components using the following method:



 *class* CCMessage(*var* targetId: Long, *var* myComponentId: Long)
 *extends* Message[Long] *with* Serializable

 *def* compute(self: CC, msgs: Option[Array[CCMessage]], superstep: Int):
 (CC, Array[CCMessage]) = {

   *val* smallestComponentId = msgs.map(sq = *sq.map(_.*
 *myComponentId**)*.min).getOrElse(Long.MaxValue)

   *val* newComponentId = math.min(self.clusterID, smallestComponentId)

   *val* halt = (newComponentId == self.clusterID) || (superstep =
 maxIters)

   self.active = *if* (superstep == 0) *true* *else* !halt

   *val* outGoingMessages = *if* (halt  superstep  0)
 Array[CCMessage]()

   *else* self.edges.map(targetId = *new* CCMessage(targetId,
 newComponentId)).toArray

   self.clusterID = newComponentId



   (self, outGoingMessages)

 }



 Our output is a text file in which each line is a list of the node IDs in
 each component. The size of the output may be up to 6 GB.



 We see in the job tracker that most of the time jobs usually get stuck on
 the “saveAsTextFile” command, the final line in our code. In some cases,
 the job will hang during one of the iterations of Bagel during the
 computation of the connected components.



 Oftentimes, when we kill the job and re-execute it, it will finish
 successfully within an hour which is the expected duration. We notice that
 if our Spark jobs don’t finish after a few hours, they will never finish
 until they are killed, regardless of the load on our cluster.



 After consulting with our Hadoop support team, they noticed that after a
 particular hanging Spark job was running for 38 hours, all Spark processes
 on all nodes were completed except for one node which was running more than
 9 hours consuming very little CPU, then suddenly consuming 14s of CPU, then
 back to calm. Also, the other nodes were not relinquishing their resources
 until our Hadoop admin killed the process on that problematic node and
 suddenly the job finished and “success” was reported in the job tracker.
 The output seemed to be fine too. If it helps you understand the issue, the
 Hadoop admin suggested this was a Spark issue and sent us two stack dumps
 which I attached to this email: before killing the node’s Spark process
 (dump1.txt) and after (dump2.txt).



 Any advice on how to resolve this issue? How can we debug this?

  Thanks,

 ~Daniel





Re: Information on Spark UI

2014-06-11 Thread Daniel Darabos
About more succeeded tasks than total tasks:
 - This can happen if you have enabled speculative execution. Some
partitions can get processed multiple times.
 - More commonly, the result of the stage may be used in a later
calculation, and has to be recalculated. This happens if some of the
results were evicted from cache.


On Wed, Jun 11, 2014 at 2:23 AM, Shuo Xiang shuoxiang...@gmail.com wrote:

 Hi,
   Came up with some confusion regarding the information on SparkUI. The
 following info is gathered while factorizing a large matrix using ALS:
   1. some stages have more succeeded tasks than total tasks, which are
 displayed in the 5th column.
   2. duplicate stages with exactly same stageID (stage 1/3/7)
   3. Clicking into some stages, some executors cannot be addressed. Does
 that mean lost of executor or this does not matter?

   Any explanation are appreciated!





Re: Better line number hints for logging?

2014-06-05 Thread Daniel Darabos
On Wed, Jun 4, 2014 at 10:39 PM, Matei Zaharia matei.zaha...@gmail.com
wrote:

 That’s a good idea too, maybe we can change CallSiteInfo to do that.


I've filed an issue: https://issues.apache.org/jira/browse/SPARK-2035

Matei

 On Jun 4, 2014, at 8:44 AM, Daniel Darabos 
 daniel.dara...@lynxanalytics.com wrote:

 Oh, this would be super useful for us too!

 Actually wouldn't it be best if you could see the whole call stack on the
 UI, rather than just one line? (Of course you would have to click to expand
 it.)


 On Wed, Jun 4, 2014 at 2:38 AM, John Salvatier jsalvat...@gmail.com
 wrote:

 Ok, I will probably open a Jira.


 On Tue, Jun 3, 2014 at 5:29 PM, Matei Zaharia matei.zaha...@gmail.com
 wrote:

 You can use RDD.setName to give it a name. There’s also a creationSite
 field that is private[spark] — we may want to add a public setter for that
 later. If the name isn’t enough and you’d like this, please open a JIRA
 issue for it.

 Matei

 On Jun 3, 2014, at 5:22 PM, John Salvatier jsalvat...@gmail.com wrote:

 I have created some extension methods for RDDs in RichRecordRDD and
 these are working exceptionally well for me.

 However, when looking at the logs, its impossible to tell what's going
 on because all the line number hints point to RichRecordRDD.scala rather
 than the code that uses it. For example:

 INFO scheduler.DAGScheduler: Submitting Stage 122 (MappedRDD[1223] at
 map at RichRecordRDD.scala:633), which is now runnable

 Is there any way set up my extension methods class so that the logs will
 print a more useful line number?








Re: Strange problem with saveAsTextFile after upgrade Spark 0.9.0-1.0.0

2014-06-04 Thread Daniel Darabos
On Tue, Jun 3, 2014 at 8:46 PM, Marek Wiewiorka marek.wiewio...@gmail.com
wrote:

 Hi All,
 I've been experiencing a very strange error after upgrade from Spark 0.9
 to 1.0 - it seems that saveAsTestFile function is throwing
 java.lang.UnsupportedOperationException that I have never seen before.


In the stack trace you quoted, saveAsTextFile is not called. Is it really
throwing an exception? Do you have the stack trace from the executor
process? I think the exception originates from there, and the scheduler is
just reporting it here.


 Any hints appreciated.

 scheduler.TaskSetManager: Loss was due to
 java.lang.ClassNotFoundException:
 org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1 [duplicate 45]
 14/06/03 16:46:23 ERROR actor.OneForOneStrategy:
 java.lang.UnsupportedOperationException
 at
 org.apache.spark.scheduler.SchedulerBackend$class.killTask(SchedulerBackend.scala:32)
 at
 org.apache.spark.scheduler.cluster.mesos.MesosSchedulerBackend.killTask(MesosSchedulerBackend.scala:41)
 at
 org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$cancelTasks$3$$anonfun$apply$1.apply$mcVJ$sp(TaskSchedulerImpl.scala:185)
 at
 org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$cancelTasks$3$$anonfun$apply$1.apply(TaskSchedulerImpl.scala:183)
 at
 org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$cancelTasks$3$$anonfun$apply$1.apply(TaskSchedulerImpl.scala:183)
 at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
 at
 org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$cancelTasks$3.apply(TaskSchedulerImpl.scala:183)
 at
 org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$cancelTasks$3.apply(TaskSchedulerImpl.scala:176)
 at scala.Option.foreach(Option.scala:236)
 at
 org.apache.spark.scheduler.TaskSchedulerImpl.cancelTasks(TaskSchedulerImpl.scala:176)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages$1.apply$mcVI$sp(DAGScheduler.scala:1058)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages$1.apply(DAGScheduler.scala:1045)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages$1.apply(DAGScheduler.scala:1045)
 at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
 at org.apache.spark.scheduler.DAGScheduler.org
 http://org.apache.spark.scheduler.dagscheduler.org/
 $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1045)
 at
 org.apache.spark.scheduler.DAGScheduler.handleJobCancellation(DAGScheduler.scala:998)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$doCancelAllJobs$1.apply$mcVI$sp(DAGScheduler.scala:499)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$doCancelAllJobs$1.apply(DAGScheduler.scala:499)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$doCancelAllJobs$1.apply(DAGScheduler.scala:499)
 at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
 at
 org.apache.spark.scheduler.DAGScheduler.doCancelAllJobs(DAGScheduler.scala:499)
 at
 org.apache.spark.scheduler.DAGSchedulerActorSupervisor$$anonfun$2.applyOrElse(DAGScheduler.scala:1151)
 at
 org.apache.spark.scheduler.DAGSchedulerActorSupervisor$$anonfun$2.applyOrElse(DAGScheduler.scala:1147)
 at
 akka.actor.SupervisorStrategy.handleFailure(FaultHandling.scala:295)
 at
 akka.actor.dungeon.FaultHandling$class.handleFailure(FaultHandling.scala:253)
 at akka.actor.ActorCell.handleFailure(ActorCell.scala:338)
 at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:423)
 at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
 at
 akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
 at akka.dispatch.Mailbox.run(Mailbox.scala:218)
 at
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
 at
 scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

 Thanks,
 Marek



Re: Better line number hints for logging?

2014-06-04 Thread Daniel Darabos
Oh, this would be super useful for us too!

Actually wouldn't it be best if you could see the whole call stack on the
UI, rather than just one line? (Of course you would have to click to expand
it.)


On Wed, Jun 4, 2014 at 2:38 AM, John Salvatier jsalvat...@gmail.com wrote:

 Ok, I will probably open a Jira.


 On Tue, Jun 3, 2014 at 5:29 PM, Matei Zaharia matei.zaha...@gmail.com
 wrote:

 You can use RDD.setName to give it a name. There’s also a creationSite
 field that is private[spark] — we may want to add a public setter for that
 later. If the name isn’t enough and you’d like this, please open a JIRA
 issue for it.

 Matei

 On Jun 3, 2014, at 5:22 PM, John Salvatier jsalvat...@gmail.com wrote:

 I have created some extension methods for RDDs in RichRecordRDD and these
 are working exceptionally well for me.

 However, when looking at the logs, its impossible to tell what's going on
 because all the line number hints point to RichRecordRDD.scala rather than
 the code that uses it. For example:

 INFO scheduler.DAGScheduler: Submitting Stage 122 (MappedRDD[1223] at
 map at RichRecordRDD.scala:633), which is now runnable

 Is there any way set up my extension methods class so that the logs will
 print a more useful line number?






Persist and unpersist

2014-05-27 Thread Daniel Darabos
I keep bumping into a problem with persisting RDDs. Consider this (silly)
example:

def everySecondFromBehind(input: RDD[Int]): RDD[Int] = {
  val count = input.count
  if (count % 2 == 0) {
return input.filter(_ % 2 == 1)
  } else {
return input.filter(_ % 2 == 0)
  }
}


The situation is that we want to do two things with an RDD (a count and a
filter in the example). The input RDD may represent a very expensive
calculation. So it would make sense to add an input.cache() line at the
beginning. But where do we put input.unpersist()?

input.cache()val count = input.countval result = input.filter(...)
input.unpersist()return result


input.filter() is lazy, so this does not work as expected. We only want
to release input from the cache once nothing depends on it anymore. Maybe
result was garbage collected. Maybe result itself has been cached. But
there is no way to detect such conditions.

Our current approach is to just leave the RDD cached, and it will get
dumped at some point anyway. Is there a better solution? Thanks for any
tips.


Re: GraphX. How to remove vertex or edge?

2014-05-01 Thread Daniel Darabos
Graph.subgraph() allows you to apply a filter to edges and/or vertices.


On Thu, May 1, 2014 at 8:52 AM, Николай Кинаш peroksi...@gmail.com wrote:

 Hello.

 How to remove vertex or edges from graph in GraphX?



Re: My talk on Spark: The Next Top (Compute) Model

2014-05-01 Thread Daniel Darabos
Cool intro, thanks! One question. On slide 23 it says Standalone (local
mode). That sounds a bit confusing without hearing the talk.

Standalone mode is not local. It just does not depend on a cluster
software. I think it's the best mode for EC2/GCE, because they provide a
distributed filesystem anyway (S3/GCS). Why configure Hadoop if you don't
have to.


On Thu, May 1, 2014 at 12:25 AM, Dean Wampler deanwamp...@gmail.com wrote:

 I meant to post this last week, but this is a talk I gave at the Philly
 ETE conf. last week:

 http://www.slideshare.net/deanwampler/spark-the-next-top-compute-model

 Also here:

 http://polyglotprogramming.com/papers/Spark-TheNextTopComputeModel.pdf

 dean

 --
 Dean Wampler, Ph.D.
 Typesafe
 @deanwampler
 http://typesafe.com
 http://polyglotprogramming.com



Re: Shuffle Spill Issue

2014-04-30 Thread Daniel Darabos
Whoops, you are right. Sorry for the misinformation. Indeed reduceByKey
just calls combineByKey:

def reduceByKey(partitioner: Partitioner, func: (V, V) = V): RDD[(K, V)] =
{
  combineByKey[V]((v: V) = v, func, func, partitioner)
}

(I think I confused reduceByKey with groupByKey.)


On Wed, Apr 30, 2014 at 2:55 AM, Liu, Raymond raymond@intel.com wrote:

 Hi Daniel

 Thanks for your reply, While I think for reduceByKey, it will also
 do map side combine, thus extra the result is the same, say, for each
 partition, one entry per distinct word. In my case with javaserializer,
  240MB dataset yield to around 70MB shuffle data. Only that shuffle Spill (
 memory ) is abnormal, and sounds to me should not trigger at all. And, by
 the way, this behavior only occurs in map out side, on reduce / shuffle
 fetch side, this strange behavior won't happen.

 Best Regards,
 Raymond Liu

 From: Daniel Darabos [mailto:daniel.dara...@lynxanalytics.com]

 I have no idea why shuffle spill is so large. But this might make it
 smaller:

 val addition = (a: Int, b: Int) = a + b
 val wordsCount = wordsPair.combineByKey(identity, addition, addition)

 This way only one entry per distinct word will end up in the shuffle for
 each partition, instead of one entry per word occurrence.

 On Tue, Apr 29, 2014 at 7:48 AM, Liu, Raymond raymond@intel.com
 wrote:
 Hi  Patrick

 I am just doing simple word count , the data is generated by
 hadoop random text writer.

 This seems to me not quite related to compress , If I turn off
 compress on shuffle, the metrics is something like below for the smaller
 240MB Dataset.


 Executor ID Address Task Time   Total Tasks Failed Tasks
  Succeeded Tasks Shuffle ReadShuffle Write   Shuffle Spill (Memory)
  Shuffle Spill (Disk)
 10  sr437:48527 35 s8   0   8   0.0 B   2.5 MB
  2.2 GB  1291.2 KB
 12  sr437:46077 34 s8   0   8   0.0 B   2.5 MB
  1822.6 MB   1073.3 KB
 13  sr434:37896 31 s8   0   8   0.0 B   2.4 MB
  1099.2 MB   621.2 KB
 15  sr438:52819 31 s8   0   8   0.0 B   2.5 MB
  1898.8 MB   1072.6 KB
 16  sr434:37103 32 s8   0   8   0.0 B   2.4 MB
  1638.0 MB   1044.6 KB


 And the program pretty simple:

 val files = sc.textFile(args(1))
 val words = files.flatMap(_.split( ))
 val wordsPair = words.map(x = (x, 1))

 val wordsCount = wordsPair.reduceByKey(_ + _)
 val count = wordsCount.count()

 println(Number of words =  + count)


 Best Regards,
 Raymond Liu

 From: Patrick Wendell [mailto:pwend...@gmail.com]

 Could you explain more what your job is doing and what data types you are
 using? These numbers alone don't necessarily indicate something is wrong.
 The relationship between the in-memory and on-disk shuffle amount is
 definitely a bit strange, the data gets compressed when written to disk,
 but unless you have a weird dataset (E.g. all zeros) I wouldn't expect it
 to compress _that_ much.

 On Mon, Apr 28, 2014 at 1:18 AM, Liu, Raymond raymond@intel.com
 wrote:
 Hi


 I am running a simple word count program on spark standalone
 cluster. The cluster is made up of 6 node, each run 4 worker and each
 worker own 10G memory and 16 core thus total 96 core and 240G memory. (
 well, also used to configed as 1 worker with 40G memory on each node )

 I run a very small data set (2.4GB on HDFS on total) to confirm
 the problem here as below:

 As you can read from part of the task metrics as below, I noticed
 that the shuffle spill part of metrics indicate that there are something
 wrong.

 Executor ID Address Task Time   Total Tasks Failed Tasks
  Succeeded Tasks Shuffle ReadShuffle Write   Shuffle Spill (Memory)
  Shuffle Spill (Disk)
 0   sr437:42139 29 s4   0   4   0.0 B   4.3 MB
  23.6 GB 4.3 MB
 1   sr433:46935 1.1 min 4   0   4   0.0 B   4.2 MB
  19.0 GB 3.4 MB
 10  sr436:53277 26 s4   0   4   0.0 B   4.3 MB
  25.6 GB 4.6 MB
 11  sr437:58872 32 s4   0   4   0.0 B   4.3 MB
  25.0 GB 4.4 MB
 12  sr435:48358 27 s4   0   4   0.0 B   4.3 MB
  25.1 GB 4.4 MB


 You can see that the Shuffle Spill (Memory) is pretty high, almost 5000x
 of the actual shuffle data and Shuffle Spill (Disk), and also it seems to
 me that by no means that the spill should trigger, since the memory is not
 used up at all.

 To verify that I further reduce the data size to 240MB on total

 And here is the result:


 Executor ID Address Task Time   Total Tasks Failed Tasks
  Succeeded Tasks Shuffle ReadShuffle Write   Shuffle Spill (Memory)
  Shuffle Spill (Disk)
 0   sr437:50895 15 s4   0   4   0.0 B   703.0 KB
  80.0 MB 43.2 KB
 1   sr433:50207 17 s4   0   4   0.0 B   704.7 KB
  389.5 MB90.2 KB

Re: Joining not-pair RDDs in Spark

2014-04-29 Thread Daniel Darabos
Create a key and join on that.

val callPricesByHour = callPrices.map(p = ((p.year, p.month, p.day,
p.hour), p))
val callsByHour = calls.map(c = ((c.year, c.month, c.day, c.hour), c))
val bills = callPricesByHour.join(callsByHour).mapValues({ case (p, c) =
BillRow(c.customer, c.hour, c.minutes * p.basePrice) }).values

You should be able to expand this approach to three RDDs too.


On Tue, Apr 29, 2014 at 11:55 AM, jsantos jsan...@tecsisa.com wrote:

 In the context of telecom industry, let's supose we have several existing
 RDDs populated from some tables in Cassandra:

 val callPrices: RDD[PriceRow]
 val calls: RDD[CallRow]
 val offersInCourse: RDD[OfferRow]

 where types are defined as follows,

 /** Represents the price per minute for a concrete hour */
 case class PriceRow(
 val year: Int,
 val month: Int,
 val day: Int,
 val hour: Int,
 val basePrice: Float)

 /** Call registries*/
 case class CallRow(
 val customer: String,
 val year: Int,
 val month: Int,
 val day: Int,
 val minutes: Int)

 /** Is there any discount that could be applicable here? */
 case class OfferRow(
 val offerName: String,
 val hour: Int,//[0..23]
 val discount: Float)//[0..1]

 Assuming we cannot use `flatMap` to mix these three RDDs like this way
 (since RDD is not really 'monadic'):

 /**
  * The final bill at a concrete hour for a call
  * is defined as {{{
  *def billPerHour(minutes: Int,basePrice:Float,discount:Float)
 =
  *  minutes * basePrice * discount
  * }}}
  */
 val bills: RDD[BillRow] = for{
 price - callPrices
 call - calls if call.hour==price.hour
 offer - offersInCourse if offer.hour==price.hour
 } yield BillRow(
 call.customer,
 call.hour,
 billPerHour(call.minutes,price.basePrice,offer.discount))

 case class BillRow(
 val customer: String,
 val hour: DateTime,
 val amount: Float)

 which is the best practise for generating a new RDD that join all these
 three RDDs and represents the bill for a concrete customer?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Joining-not-pair-RDDs-in-Spark-tp5034.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Strange lookup behavior. Possible bug?

2014-04-28 Thread Daniel Darabos
That is quite mysterious, and I do not think we have enough information to
answer. JavaPairRDDString, Tuple2.lookup() works fine on a remote Spark
cluster:

$ MASTER=spark://localhost:7077 bin/spark-shell
scala val rdd = org.apache.spark.api.java.JavaPairRDD.fromRDD(sc.makeRDD(0
until 10, 3).map(x = ((x%3).toString, (x, x%3
scala rdd.lookup(1)
res0: java.util.List[(Int, Int)] = [(1,1), (4,1), (7,1)]

You suggest maybe the driver does not receive a message from an executor. I
guess it is likely possible, though it has not happened to me. I would
recommend running on a single machine in the standalone setup. Start the
master and worker on the same machine, run the application there too. This
should eliminate network configuration problems.

If you still see the issue, I'd check whether the task has really
completed. What do you see on the web UI? Is the executor using CPU?

Good luck.




On Mon, Apr 28, 2014 at 2:35 AM, Yadid Ayzenberg ya...@media.mit.eduwrote:

 Can someone please suggest how I can move forward with this?
 My spark version is 0.9.1.
 The big challenge is that this issue is not recreated when running in
 local mode. What could be the difference?

 I would really appreciate any pointers, as currently the the job just
 hangs.




 On 4/25/14, 7:37 PM, Yadid Ayzenberg wrote:

 Some additional information - maybe this rings a bell with someone:

 I suspect this happens when the lookup returns more than one value.
 For 0 and 1 values, the function behaves as you would expect.

 Anyone ?



 On 4/25/14, 1:55 PM, Yadid Ayzenberg wrote:

 Hi All,

 Im running a lookup on a JavaPairRDDString, Tuple2.
 When running on local machine - the lookup is successfull. However, when
 running a standalone cluster with the exact same dataset - one of the tasks
 never ends (constantly in RUNNING status).
 When viewing the worker log, it seems that the task has finished
 successfully:

 14/04/25 13:40:38 INFO BlockManager: Found block rdd_2_0 locally
 14/04/25 13:40:38 INFO Executor: Serialized size of result for 2 is
 10896794
 14/04/25 13:40:38 INFO Executor: Sending result for 2 directly to driver
 14/04/25 13:40:38 INFO Executor: Finished task ID 2

 But it seems the driver is not aware of this, and hangs indefinitely.

 If I execute a count priot to the lookup - I get the correct number
 which suggests that the cluster is operating as expected.

 The exact same scenario works with a different type of key (Tuple2):
 JavaPairRDDTuple2, Tuple2.

 Any ideas on how to debug this problem ?

 Thanks,

 Yadid






Re: questions about debugging a spark application

2014-04-28 Thread Daniel Darabos
Good question! I am also new to the JVM and would appreciate some tips.

On Sun, Apr 27, 2014 at 5:19 AM, wxhsdp wxh...@gmail.com wrote:

 Hi, all
   i have some questions about debug in spark:
   1) when application finished, application UI is shut down, i can not see
 the details about the app, like
   shuffle size, duration time, stage information... there are not
 sufficient informations in the master UI.
  do i need to hang the application on?


We added a flag to our application that causes it to sleep indefinitely at
the end for exactly this reason. Admittedly the logs contain everything to
reconstruct the same data. But the web UI is easier to understand at a
glance.

  2) how to get details about each task the executor run? like memory
 usage...


I had success with using VisualVM on the executor to see details about its
memory and CPU use.

  3) since i'am not familiar with JVM. do i need to run the program step by
 step or hang on the program
   to use JVM utilities like jstack, jmap...



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/questions-about-debugging-a-spark-application-tp4891.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: ERROR TaskSchedulerImpl: Lost an executor

2014-04-23 Thread Daniel Darabos
With the right program you can always exhaust any amount of memory :).
There is no silver bullet. You have to figure out what is happening in your
code that causes a high memory use and address that. I spent all of last
week doing this for a simple program of my own. Lessons I learned that may
or may not apply to your case:

 - If you don't cache (persist) an RDD, it is not stored. This can save
memory at the cost of possibly repeating computation. (I read around a TB
of files twice, for example, rather than cache them.)
 - Use combineByKey instead of groupByKey if you can process values one by
one. This means they do not need to be all stored.
 - If you have a lot of keys per partition, set mapSideCombine=false for
combineByKey. This avoids creating a large map per partition.
 - If you have a key with a disproportionate number of values (like the
empty string for a missing name), discard it before the computation.
 - Read https://spark.apache.org/docs/latest/tuning.html for more (and more
accurate) information.

Good luck.


On Wed, Apr 23, 2014 at 1:25 AM, jaeholee jho...@lbl.gov wrote:

 Ok. I tried setting the partition number to 128 and numbers greater than
 128,
 and now I get another error message about Java heap space. Is it possible
 that there is something wrong with the setup of my Spark cluster to begin
 with? Or is it still an issue with partitioning my data? Or do I just need
 more worker nodes?


 ERROR TaskSetManager: Task 194.0:14 failed 4 times; aborting job
 org.apache.spark.SparkException: Job aborted: Task 194.0:14 failed 4 times
 (most recent failure: Exception failure: java.lang.OutOfMemoryError: Java
 heap space)
 at

 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020)
 at

 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018)
 at

 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at
 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at
 org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018)
 at

 org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
 at

 org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
 at scala.Option.foreach(Option.scala:236)
 at

 org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:604)
 at

 org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
 at akka.actor.ActorCell.invoke(ActorCell.scala:456)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
 at akka.dispatch.Mailbox.run(Mailbox.scala:219)
 at

 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
 at
 scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at

 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at

 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/ERROR-TaskSchedulerImpl-Lost-an-executor-tp4566p4623.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: GraphX: .edges.distinct().count() is 10?

2014-04-23 Thread Daniel Darabos
This is caused by https://issues.apache.org/jira/browse/SPARK-1188. I think
the fix will be in the next release. But until then, do:

g.edges.map(_.copy()).distinct.count



On Wed, Apr 23, 2014 at 2:26 AM, Ryan Compton compton.r...@gmail.comwrote:

 Try this: https://www.dropbox.com/s/xf34l0ta496bdsn/.txt

 This code:

 println(g.numEdges)
 println(g.numVertices)
 println(g.edges.distinct().count())

 gave me

 1
 9294
 2



 On Tue, Apr 22, 2014 at 5:14 PM, Ankur Dave ankurd...@gmail.com wrote:
  I wasn't able to reproduce this with a small test file, but I did change
 the
  file parsing to use x(1).toLong instead of x(2).toLong. Did you mean to
 take
  the third column rather than the second?
 
  If so, would you mind posting a larger sample of the file, or even the
 whole
  file if possible?
 
  Here's the test that succeeded:
 
test(graph.edges.distinct.count) {
  withSpark { sc =
val edgeFullStrRDD: RDD[String] = sc.parallelize(List(
  394365859\t136153151, 589404147\t1361045425))
val edgeTupRDD = edgeFullStrRDD.map(x = x.split(\t))
  .map(x = (x(0).toLong, x(1).toLong))
val g = Graph.fromEdgeTuples(edgeTupRDD, defaultValue = 123,
  uniqueEdges = Option(CanonicalRandomVertexCut))
assert(edgeTupRDD.distinct.count() === 2)
assert(g.numEdges === 2)
assert(g.edges.distinct.count() === 2)
  }
}
 
  Ankur



Re: stdout in workers

2014-04-22 Thread Daniel Darabos
On Mon, Apr 21, 2014 at 7:59 PM, Jim Carroll jimfcarr...@gmail.com wrote:


 I'm experimenting with a few things trying to understand how it's working.
 I
 took the JavaSparkPi example as a starting point and added a few System.out
 lines.

 I added a system.out to the main body of the driver program (not inside of
 any Functions).
 I added another to the mapper.
 I added another to the reducer.

 I set up a simple standalone distribution with a 1 master (no zookeeper)
 and a 1 worker.

 The main println and the reducer println print out from the driver program.

 The mapper one doesn't print in the shell I started the worker in, nor does
 it appear in any logs (in case stdout was redirected).

 However, the program executes (and the mapper executes on the worker) since
 I can see the job run in the logs and I get an answer.

 Where should I be looking?


It is in the executor logs. The executor is a process started by the worker
for your application, which runs your custom code. So the logs are on the
worker machines. The default location is within the Spark directory under
the work subdirectory. For example
~/spark-0.9.0-incubating/work/app-20140418164503-0025/0/stdout.

It is often much easier to access this log from the UI. Click on the
application ID on the master, or on a worker, and then you see the
stdout/stderr links.

Thanks. Apologies if this is a dumb question - I searched for answer but
 only found a reference to another list posting where the user thought his
 driver print statement should have printed on the worker.


I think it's a perfectly good question, and you might want to make a
suggestion for adding it to the documentation somewhere.

Jim




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/stdout-in-workers-tp4537.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: ERROR TaskSchedulerImpl: Lost an executor

2014-04-22 Thread Daniel Darabos
Most likely the data is not just too big. For most operations the data is
processed partition by partition. The partitions may be too big. This is
what your last question hints at too:

 val numWorkers = 10
 val data = sc.textFile(somedirectory/data.csv, numWorkers)

This will work, but not quite what you want to do. The second parameter to
textFile is the number of partitions you want. Given the error you are
seeing, I'd recommend asking for more partitions -- they will be smaller.

Also make sure you set spark.executor.memory to the capacity of the worker
machines.


On Tue, Apr 22, 2014 at 11:09 PM, jaeholee jho...@lbl.gov wrote:

 Spark is running fine, but I get this message. Does this mean that my data
 is
 just too big?

 14/04/22 17:06:20 ERROR TaskSchedulerImpl: Lost executor 2 on WORKER#2:
 OutOfMemoryError
 14/04/22 17:06:20 ERROR TaskSetManager: Task 550.0:2 failed 4 times;
 aborting job
 org.apache.spark.SparkException: Job aborted: Task 550.0:2 failed 4 times
 (most recent failure: unknown)
 at

 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020)
 at

 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018)
 at

 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at
 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at
 org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018)
 at

 org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
 at

 org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
 at scala.Option.foreach(Option.scala:236)
 at

 org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:604)
 at

 org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
 at akka.actor.ActorCell.invoke(ActorCell.scala:456)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
 at akka.dispatch.Mailbox.run(Mailbox.scala:219)
 at

 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
 at
 scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at

 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at

 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/ERROR-TaskSchedulerImpl-Lost-an-executor-tp4566p4618.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: ERROR TaskSchedulerImpl: Lost an executor

2014-04-22 Thread Daniel Darabos
On Wed, Apr 23, 2014 at 12:06 AM, jaeholee jho...@lbl.gov wrote:

 How do you determine the number of partitions? For example, I have 16
 workers, and the number of cores and the worker memory set in spark-env.sh
 are:

 CORE = 8
 MEMORY = 16g


So you have the capacity to work on 16 * 8 = 128 tasks at a time. If you
use less than 128 partitions, some CPUs will go unused, and the job will
complete more slowly than it could. But depending on what you are actually
doing with the data, you may want to use much more partitions than that. I
find it fairly difficult to estimate the memory use -- it is much easier to
just try with more partitions and see if it is enough :). The overhead for
using more partitions than necessary is pretty small.

The .csv data I have is about 500MB, but I am eventually going to use a file
 that is about 15GB.

 Is the MEMORY variable in spark-env.sh different from spark.executor.memory
 that you mentioned? If they're different, how do I set
 spark.executor.memory?


It is probably the same thing. Sorry for confusing you. To be sure, check
the Spark master web UI while the application is connected. You will see
how much RAM is used per worker on the main page. You can set
spark.executor.memory through SparkConf.set(spark.executor.memory,
10g) when creating the SparkContext.


Re: sc.makeRDD bug with NumericRange

2014-04-18 Thread Daniel Darabos
Looks like NumericRange in Scala is just a joke.

scala val x = 0.0 to 1.0 by 0.1
x: scala.collection.immutable.NumericRange[Double] = NumericRange(0.0, 0.1,
0.2, 0.30004, 0.4, 0.5, 0.6, 0.7, 0.7999,
0.8999, 0.)

scala x.take(3)
res1: scala.collection.immutable.NumericRange[Double] = NumericRange(0.0,
0.1, 0.2)

scala x.drop(3)
res2: scala.collection.immutable.NumericRange[Double] =
NumericRange(0.30004, 0.4, 0.5, 0.6, 0.7, 0.7999,
0.8999, 0.)

So far so good.

scala x.drop(3).take(3)
res3: scala.collection.immutable.NumericRange[Double] =
NumericRange(0.30004, 0.4)

Why only two values? Where's 0.5?

scala x.drop(6)
res4: scala.collection.immutable.NumericRange[Double] =
NumericRange(0.6001, 0.7001, 0.8, 0.9)

And where did the last value disappear now?

You have to approach Scala with a healthy amount of distrust. You're on the
right track with toArray.


On Fri, Apr 18, 2014 at 8:01 PM, Mark Hamstra m...@clearstorydata.comwrote:

 Please file an issue: Spark Project 
 JIRAhttps://issues.apache.org/jira/browse/SPARK



 On Fri, Apr 18, 2014 at 10:25 AM, Aureliano Buendia 
 buendia...@gmail.comwrote:

 Hi,

 I just notices that sc.makeRDD() does not make all values given with
 input type of NumericRange, try this in spark shell:


 $ MASTER=local[4] bin/spark-shell

 scala sc.makeRDD(0.0 to 1 by 0.1).collect().length

 *8*


 The expected length is 11. This works correctly when lanching spark with
 only one core:


 $ MASTER=local[1] bin/spark-shell

 scala sc.makeRDD(0.0 to 1 by 0.1).collect().length

 *11*


 This also works correctly when using toArray():

 $ MASTER=local[4] bin/spark-shell

 scala sc.makeRDD((0.0 to 1 by 0.1).*toArray*).collect().length

 *8*





Re: sc.makeRDD bug with NumericRange

2014-04-18 Thread Daniel Darabos
To make up for mocking Scala, I've filed a bug (
https://issues.scala-lang.org/browse/SI-8518) and will try to patch this.


On Fri, Apr 18, 2014 at 9:24 PM, Daniel Darabos 
daniel.dara...@lynxanalytics.com wrote:

 Looks like NumericRange in Scala is just a joke.

 scala val x = 0.0 to 1.0 by 0.1
 x: scala.collection.immutable.NumericRange[Double] = NumericRange(0.0,
 0.1, 0.2, 0.30004, 0.4, 0.5, 0.6, 0.7, 0.7999,
 0.8999, 0.)

 scala x.take(3)
 res1: scala.collection.immutable.NumericRange[Double] = NumericRange(0.0,
 0.1, 0.2)

 scala x.drop(3)
 res2: scala.collection.immutable.NumericRange[Double] =
 NumericRange(0.30004, 0.4, 0.5, 0.6, 0.7, 0.7999,
 0.8999, 0.)

 So far so good.

 scala x.drop(3).take(3)
 res3: scala.collection.immutable.NumericRange[Double] =
 NumericRange(0.30004, 0.4)

 Why only two values? Where's 0.5?

 scala x.drop(6)
 res4: scala.collection.immutable.NumericRange[Double] =
 NumericRange(0.6001, 0.7001, 0.8, 0.9)

 And where did the last value disappear now?

 You have to approach Scala with a healthy amount of distrust. You're on
 the right track with toArray.


 On Fri, Apr 18, 2014 at 8:01 PM, Mark Hamstra m...@clearstorydata.comwrote:

 Please file an issue: Spark Project 
 JIRAhttps://issues.apache.org/jira/browse/SPARK



 On Fri, Apr 18, 2014 at 10:25 AM, Aureliano Buendia buendia...@gmail.com
  wrote:

 Hi,

 I just notices that sc.makeRDD() does not make all values given with
 input type of NumericRange, try this in spark shell:


 $ MASTER=local[4] bin/spark-shell

 scala sc.makeRDD(0.0 to 1 by 0.1).collect().length

 *8*


 The expected length is 11. This works correctly when lanching spark with
 only one core:


 $ MASTER=local[1] bin/spark-shell

 scala sc.makeRDD(0.0 to 1 by 0.1).collect().length

 *11*


 This also works correctly when using toArray():

 $ MASTER=local[4] bin/spark-shell

 scala sc.makeRDD((0.0 to 1 by 0.1).*toArray*).collect().length

 *8*






Re: Continuously running non-streaming jobs

2014-04-17 Thread Daniel Darabos
I'm quite new myself (just subscribed to the mailing list today :)), but
this happens to be something we've had success with. So let me know if you
hit any problems with this sort of usage.


On Thu, Apr 17, 2014 at 9:11 PM, Jim Carroll jimfcarr...@gmail.com wrote:

 Daniel,

 I'm new to Spark but I thought that thread hinted at the right answer.

 Thanks,
 Jim




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Continuously-running-non-streaming-jobs-tp4391p4397.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.