Re: application failed on large dataset

2015-09-15 Thread 周千昊
Hi,
  after check with the yarn logs, all the error stack looks like below:

15/09/15 19:58:23 ERROR shuffle.OneForOneBlockFetcher: Failed while
starting block fetches
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:192)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
at
io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:313)
at
io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881)
at
io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:242)
at
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
at
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at java.lang.Thread.run(Thread.java:745)

It seems that some error occurs when try to fetch the block, and
after several retries, the executor just dies with such error.
And for your question, I did not see any executor restart during
the job.
PS: the operator I am using during that stage if
rdd.glom().mapPartitions()


java8964 于2015年9月15日周二 下午11:44写道:

> When you saw this error, does any executor die due to whatever error?
>
> Do you check to see if any executor restarts during your job?
>
> It is hard to help you just with the stack trace. You need to tell us the
> whole picture when your jobs are running.
>
> Yong
>
> --
> From: qhz...@apache.org
> Date: Tue, 15 Sep 2015 15:02:28 +
> Subject: Re: application failed on large dataset
> To: user@spark.apache.org
>
>
> has anyone met the same problems?
> 周千昊 于2015年9月14日周一 下午9:07写道:
>
> Hi, community
>   I am facing a strange problem:
>   all executors does not respond, and then all of them failed with the
> ExecutorLostFailure.
>   when I look into yarn logs, there are full of such exception
>
> 15/09/14 04:35:33 ERROR shuffle.RetryingBlockFetcher: Exception while
> beginning fetch of 1 outstanding blocks (after 3 retries)
> java.io.IOException: Failed to connect to host/ip:port
> at
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:193)
> at
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
> at
> org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:88)
> at
> org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
> at
> org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
> at
> org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
> at java.util.concurrent.FutureTask.run(FutureTask.java:262)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.net.ConnectException: Connection refused: host/ip:port
> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> at
> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
> at
> io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:208)
> at
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:287)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
> at
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
> ... 1 more
>
>
>   The strange thing is that, if I reduce the input size, the problems
> just disappeared. I have found a similar issue in the mail-archive(
> 

Re: Replacing Esper with Spark Streaming?

2015-09-15 Thread Koert Kuipers
fair enough
i was trying to say that if esper were obsolete (which i am not suggesting)
than that does NOT mean espertech is dead...

On Tue, Sep 15, 2015 at 10:20 PM, Thomas Bernhardt <
bernhardt...@yahoo.com.invalid> wrote:

> Let me say first, I'm the Esper project lead.
> Esper is alive and well and not at all obsolete. Esper provides event
> series analysis by providing an SQL92-standards event processing language
> (EPL). It allows to express situation detection logic very concisely,
> usually much more concisely then any code you'd need to write and deploy
> yourself into a streaming container. This is because EPL has concepts such
> as joins, subqueries, patterns etc..
> By the way, there are so many streaming containers like spark streaming. I
> think I could list 20 or more relevant streaming logic containers, all
> incompatible to each other, most likely many of these new streaming logic
> containers will also be obsolete in the next few years.
> Best regards,
> Tom
>
> --
> *From:* Koert Kuipers 
> *To:* Bertrand Dechoux 
> *Cc:* Todd Nist ; Otis Gospodnetić <
> otis.gospodne...@gmail.com>; "user@spark.apache.org" <
> user@spark.apache.org>
> *Sent:* Tuesday, September 15, 2015 7:53 PM
> *Subject:* Re: Replacing Esper with Spark Streaming?
>
> obsolete is not the same as dead... we have a few very large tech
> companies to prove that point
>
>
>
> On Tue, Sep 15, 2015 at 4:32 PM, Bertrand Dechoux 
> wrote:
>
> The big question would be what feature of Esper your are using. Esper is a
> CEP solution. I doubt that Spark Streaming can do everything Esper does
> without any development. Spark (Streaming) is more a general-purpose
> platform.
>
> http://www.espertech.com/products/esper.php
>
> But I would be glad to be proven wrong (which also would implies EsperTech
> is dead, which I also doubt...)
>
> Bertrand
>
> On Mon, Sep 14, 2015 at 2:31 PM, Todd Nist  wrote:
>
> Stratio offers a CEP implementation based on Spark Streaming and the
> Siddhi CEP engine.  I have not used the below, but they may be of some
> value to you:
>
> http://stratio.github.io/streaming-cep-engine/
> https://github.com/Stratio/streaming-cep-engine
> HTH.
> -Todd
>
> On Sun, Sep 13, 2015 at 7:49 PM, Otis Gospodnetić <
> otis.gospodne...@gmail.com> wrote:
>
> Hi,
>
> I'm wondering if anyone has attempted to replace Esper with Spark
> Streaming or if anyone thinks Spark Streaming is/isn't a good tool for the
> (CEP) job?
>
> We are considering Akka or Spark Streaming as possible Esper replacements
> and would appreciate any input from people who tried to do that with either
> of them.
>
> Thanks,
> Otis
> --
> Monitoring * Alerting * Anomaly Detection * Centralized Log Management
> Solr & Elasticsearch Support * http://sematext.com/
>
>
>
>
>
>
>


GraphX, graph clustering, pattern matching

2015-09-15 Thread Alex Karargyris
I am new to Spark and I was wondering if anyone would help me on pointing
me to the right direction: Are there any algorithms/tutorials available on
Spark's GraphX for graph clustering and pattern matching?

More specifically I am interested in:
a) querying a small graph against a larger graph and getting the similar
subgraphs in that large graph,
b) performing clustering on a large graph and then classify an incoming
query graph to the clusters. I know that Mllib has K-means clustering but I
am not sure how to perform the clustering on a graph as well the
classification to these clusters.

Any tutorial/idea/direction would be much appreciated.


Re: Spark wastes a lot of space (tmp data) for iterative jobs

2015-09-15 Thread Alexis Gillain
You can try system.gc() considering that checkpointing is enabled by
default in graphx :

https://forums.databricks.com/questions/277/how-do-i-avoid-the-no-space-left-on-device-error.html

2015-09-15 22:42 GMT+08:00 Ali Hadian :

> Hi!
> We are executing the PageRank example from the Spark java examples package
> on a very large input graph. The code is available here
> .
> (Spark's github repo).
> During the execution, the framework generates huge amount of intermediate
> data per each iteration (i.e. the *contribs* RDD). The intermediate data
> is temporary, but Spark does not clear the intermediate data of previous
> iterations. That is to say, if we are in the middle of 20th iteration, all
> of the temporary data of all previous iterations (iteration 0 to 19) are
> still kept in the *tmp*  directory. As a result, the tmp directory grows
> linearly.
> It seems rational to keep the data from only the previous iteration,
> because if the current iteration fails, the job can be continued using the
> intermediate data from the previous iteration. Anyways, why does it keep
> the intermediate data for ALL previous iterations???
> How can we enforce Spark to clear these intermediate data *during* the
> execution of job?
>
> Kind regards,
> Ali hadian
>
>



-- 
Alexis GILLAIN


Re: How to convert dataframe to a nested StructType schema

2015-09-15 Thread Terry Hole
Hao,

For spark 1.4.1, you can try this:
val rowrdd = df.rdd.map(r => Row(Row(r(3)), Row(r(0), r(1), r(2
val newDF = sqlContext.createDataFrame(rowrdd, yourNewSchema)

Thanks!

- Terry

On Wed, Sep 16, 2015 at 2:10 AM, Hao Wang  wrote:

> Hi,
>
> I created a dataframe with 4 string columns (city, state, country,
> zipcode).
> I then applied the following nested schema to it by creating a custom
> StructType. When I run df.take(5), it gives the exception below as
> expected.
> The question is how I can convert the Rows in the dataframe to conform to
> this nested schema? Thanks!
>
> root
>  |-- ZipCode: struct (nullable = true)
>  ||-- zip: string (nullable = true)
>  |-- Address: struct (nullable = true)
>  ||-- city: string (nullable = true)
>  ||-- state: string (nullable = true)
>  ||-- country: string (nullable = true)
>
> [info]   org.apache.spark.SparkException: Job aborted due to stage failure:
> Task 0 in stage 6.0 failed 1 times, most recent failure: Lost task 0.0 in
> stage 6.0 (TID 6, localhost): scala.MatchError: 95123 (of class
> java.lang.String)
> [info] at
>
> org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$4.apply(CatalystTypeConverters.scala:178)
> [info] at
>
> org.apache.spark.sql.catalyst.CatalystTypeConverters$.convertRowWithConverters(CatalystTypeConverters.scala:348)
> [info] at
>
> org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$4.apply(CatalystTypeConverters.scala:180)
> [info] at
> org.apache.spark.sql.SQLContext$$anonfun$9.apply(SQLContext.scala:488)
> [info] at
> org.apache.spark.sql.SQLContext$$anonfun$9.apply(SQLContext.scala:488)
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-convert-dataframe-to-a-nested-StructType-schema-tp24703.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Avoiding SQL Injection in Spark SQL

2015-09-15 Thread V Dineshkumar
Hi,
I was looking for the support of bind variables as Ruslan pointed out.
I came around with a different workaround as we cannot use dataframes in
our project,we are more dependent on using the SQL queries.

val HC=new HiveContext(sc)
val query=HC.sql("select * from eici_view where customername='_:parameter'")
val plan=query.queryExecution.logical
val placeHolder="_:parameter"
val replaceValue="medium"
val newPlan= plan transformAllExpressions {
 case Literal(placeHolder,e) => Literal(replaceValue,e)
  }
val myDF=new DataFrame(HC,newPlan)

I am using a placeholder then using the logical plan getting created and
replacing the Literal node containing the placeholder value with a Literal
node which wraps the Users Input.

Will this workaround work in all cases??

Thanks,
Dinesh
Philips India

On Fri, Sep 11, 2015 at 6:32 AM, Ruslan Dautkhanov 
wrote:

> Using dataframe API is a good workaround.
>
> Another way would be to use bind variables. I don't think Spark SQL
> supports them.
> That's what Dinesh probably meant by "was not able to find any API for
> preparing the SQL statement safely avoiding injection".
>
> E.g.
>
> val sql_handler = sqlContext.sql("SELECT name FROM people WHERE age >= :var1 
> AND age <= :var2").parse()
>
> toddlers = sql_handler.execute("var1"->1, "var2"->3)
>
> teenagers = sql_handler.execute(13, 19)
>
>
> It's not possible to do a SQL Injection if Spark SQL would support bind
> variables, as parameter would be always treated as variables and not part
> of SQL. Also it's arguably easier for developers as you don't have to
> escape/quote.
>
>
> ps. Another advantage is Spark could parse and create plan once - but
> execute multiple times.
> http://www.akadia.com/services/ora_bind_variables.html
> This point is more relevant for OLTP-like queries which Spark is probably
> not yet good at (e.g. return a few rows quickly/ winthin a few ms).
>
>
>
> --
> Ruslan Dautkhanov
>
> On Thu, Sep 10, 2015 at 12:07 PM, Michael Armbrust  > wrote:
>
>> Either that or use the DataFrame API, which directly constructs query
>> plans and thus doesn't suffer from injection attacks (and runs on the same
>> execution engine).
>>
>> On Thu, Sep 10, 2015 at 12:10 AM, Sean Owen  wrote:
>>
>>> I don't think this is Spark-specific. Mostly you need to escape /
>>> quote user-supplied values as with any SQL engine.
>>>
>>> On Thu, Sep 10, 2015 at 7:32 AM, V Dineshkumar
>>>  wrote:
>>> > Hi,
>>> >
>>> > What is the preferred way of avoiding SQL Injection while using Spark
>>> SQL?
>>> > In our use case we have to take the parameters directly from the users
>>> and
>>> > prepare the SQL Statement.I was not able to find any API for preparing
>>> the
>>> > SQL statement safely avoiding injection.
>>> >
>>> > Thanks,
>>> > Dinesh
>>> > Philips India
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>


Re: Caching intermediate results in Spark ML pipeline?

2015-09-15 Thread Feynman Liang
If you're doing hyperparameter grid search, consider using
ml.tuning.CrossValidator which does cache the dataset

.

Otherwise, perhaps you can elaborate more on your particular use case for
caching intermediate results and if the current API doesn't support it we
can create a JIRA for it.

On Tue, Sep 15, 2015 at 10:26 PM, Jingchu Liu  wrote:

> Yeah I understand on the low-level we should do as you said.
>
> But since ML pipeline is a high-level API, it is pretty natural to expect
> the ability to recognize overlapping parameters between successive runs.
> (Actually, this happen A LOT when we have lots of hyper-params to search
> for)
>
> I can also imagine the implementation by appending parameter information
> to the cached results. Let's say if we implemented an "equal" method for
> param1. By comparing param1 with the previous run, the program will know
> data1 is reusable. And time used for generating data1 can be saved.
>
> Best,
> Lewis
>
> 2015-09-15 23:05 GMT+08:00 Feynman Liang :
>
>> Nope, and that's intentional. There is no guarantee that rawData did not
>> change between intermediate calls to searchRun, so reusing a cached data1
>> would be incorrect.
>>
>> If you want data1 to be cached between multiple runs, you have a few
>> options:
>> * cache it first and pass it in as an argument to searchRun
>> * use a creational pattern like singleton to ensure only one instantiation
>>
>> On Tue, Sep 15, 2015 at 12:49 AM, Jingchu Liu 
>> wrote:
>>
>>> Hey Feynman,
>>>
>>> I doubt DF persistence will work in my case. Let's use the following
>>> code:
>>> ==
>>> def searchRun( params = [param1, param2] )
>>>   data1 = hashing1.transform(rawData, param1)
>>>   data1.cache()
>>>   data2 = hashing2.transform(data1, param2)
>>>   data2.someAction()
>>> ==
>>> Say if we run "searchRun()" for 2 times with the same "param1" but
>>> different "param2". Will spark recognize that the two local variables
>>> "data1" in consecutive runs has the same content?
>>>
>>>
>>> Best,
>>> Lewis
>>>
>>> 2015-09-15 13:58 GMT+08:00 Feynman Liang :
>>>
 You can persist the transformed Dataframes, for example

 val data : DF = ...
 val hashedData = hashingTF.transform(data)
 hashedData.cache() // to cache DataFrame in memory

 Future usage of hashedData read from an in-memory cache now.

 You can also persist to disk, eg:

 hashedData.write.parquet(FilePath) // to write DataFrame in Parquet
 format to disk
 ...
 val savedHashedData = sqlContext.read.parquet(FilePath)

 Future uses of hash

 Like my earlier response, this will still require you call each
 PipelineStage's `transform` method (i.e. to NOT use the overall
 Pipeline.setStages API)

 On Mon, Sep 14, 2015 at 10:45 PM, Jingchu Liu 
 wrote:

> Hey Feynman,
>
> Thanks for your response, but I'm afraid "model save/load" is not
> exactly the feature I'm looking for.
>
> What I need to cache and reuse are the intermediate outputs of
> transformations, not transformer themselves. Do you know any related dev.
> activities or plans?
>
> Best,
> Lewis
>
> 2015-09-15 13:03 GMT+08:00 Feynman Liang :
>
>> Lewis,
>>
>> Many pipeline stages implement save/load methods, which can be used
>> if you instantiate and call the underlying pipeline stages `transform`
>> methods individually (instead of using the Pipeline.setStages API). See
>> associated JIRAs .
>>
>> Pipeline persistence is on the 1.6 roadmap, JIRA here
>> .
>>
>> Feynman
>>
>> On Mon, Sep 14, 2015 at 9:20 PM, Jingchu Liu 
>> wrote:
>>
>>> Hi all,
>>>
>>> I have a question regarding the ability of ML pipeline to cache
>>> intermediate results. I've posted this question on stackoverflow
>>> 
>>> but got no answer, hope someone here can help me out.
>>>
>>> ===
>>> Lately I'm planning to migrate my standalone python ML code to
>>> spark. The ML pipeline in spark.ml turns out quite handy, with
>>> streamlined API for chaining up algorithm stages and hyper-parameter 
>>> grid
>>> search.
>>>
>>> Still, I found its support for one important feature obscure in
>>> existing documents: caching of intermediate results. The importance of 
>>> this
>>> feature arise when the pipeline 

Re: Caching intermediate results in Spark ML pipeline?

2015-09-15 Thread Jingchu Liu
Yeah I understand on the low-level we should do as you said.

But since ML pipeline is a high-level API, it is pretty natural to expect
the ability to recognize overlapping parameters between successive runs.
(Actually, this happen A LOT when we have lots of hyper-params to search
for)

I can also imagine the implementation by appending parameter information to
the cached results. Let's say if we implemented an "equal" method for
param1. By comparing param1 with the previous run, the program will know
data1 is reusable. And time used for generating data1 can be saved.

Best,
Lewis

2015-09-15 23:05 GMT+08:00 Feynman Liang :

> Nope, and that's intentional. There is no guarantee that rawData did not
> change between intermediate calls to searchRun, so reusing a cached data1
> would be incorrect.
>
> If you want data1 to be cached between multiple runs, you have a few
> options:
> * cache it first and pass it in as an argument to searchRun
> * use a creational pattern like singleton to ensure only one instantiation
>
> On Tue, Sep 15, 2015 at 12:49 AM, Jingchu Liu 
> wrote:
>
>> Hey Feynman,
>>
>> I doubt DF persistence will work in my case. Let's use the following code:
>> ==
>> def searchRun( params = [param1, param2] )
>>   data1 = hashing1.transform(rawData, param1)
>>   data1.cache()
>>   data2 = hashing2.transform(data1, param2)
>>   data2.someAction()
>> ==
>> Say if we run "searchRun()" for 2 times with the same "param1" but
>> different "param2". Will spark recognize that the two local variables
>> "data1" in consecutive runs has the same content?
>>
>>
>> Best,
>> Lewis
>>
>> 2015-09-15 13:58 GMT+08:00 Feynman Liang :
>>
>>> You can persist the transformed Dataframes, for example
>>>
>>> val data : DF = ...
>>> val hashedData = hashingTF.transform(data)
>>> hashedData.cache() // to cache DataFrame in memory
>>>
>>> Future usage of hashedData read from an in-memory cache now.
>>>
>>> You can also persist to disk, eg:
>>>
>>> hashedData.write.parquet(FilePath) // to write DataFrame in Parquet
>>> format to disk
>>> ...
>>> val savedHashedData = sqlContext.read.parquet(FilePath)
>>>
>>> Future uses of hash
>>>
>>> Like my earlier response, this will still require you call each
>>> PipelineStage's `transform` method (i.e. to NOT use the overall
>>> Pipeline.setStages API)
>>>
>>> On Mon, Sep 14, 2015 at 10:45 PM, Jingchu Liu 
>>> wrote:
>>>
 Hey Feynman,

 Thanks for your response, but I'm afraid "model save/load" is not
 exactly the feature I'm looking for.

 What I need to cache and reuse are the intermediate outputs of
 transformations, not transformer themselves. Do you know any related dev.
 activities or plans?

 Best,
 Lewis

 2015-09-15 13:03 GMT+08:00 Feynman Liang :

> Lewis,
>
> Many pipeline stages implement save/load methods, which can be used if
> you instantiate and call the underlying pipeline stages `transform` 
> methods
> individually (instead of using the Pipeline.setStages API). See associated
> JIRAs .
>
> Pipeline persistence is on the 1.6 roadmap, JIRA here
> .
>
> Feynman
>
> On Mon, Sep 14, 2015 at 9:20 PM, Jingchu Liu 
> wrote:
>
>> Hi all,
>>
>> I have a question regarding the ability of ML pipeline to cache
>> intermediate results. I've posted this question on stackoverflow
>> 
>> but got no answer, hope someone here can help me out.
>>
>> ===
>> Lately I'm planning to migrate my standalone python ML code to spark.
>> The ML pipeline in spark.ml turns out quite handy, with streamlined
>> API for chaining up algorithm stages and hyper-parameter grid search.
>>
>> Still, I found its support for one important feature obscure in
>> existing documents: caching of intermediate results. The importance of 
>> this
>> feature arise when the pipeline involves computation intensive stages.
>>
>> For example, in my case I use a huge sparse matrix to perform
>> multiple moving averages on time series data in order to form input
>> features. The structure of the matrix is determined by some
>> hyper-parameter. This step turns out to be a bottleneck for the entire
>> pipeline because I have to construct the matrix in runtime.
>>
>> During parameter search, I usually have other parameters to examine
>> in addition to this "structure parameter". So if I can reuse the huge
>> matrix when the "structure parameter" is unchanged, I can save 

Difference between sparkDriver and "executor ID driver"

2015-09-15 Thread Muler
I'm running Spark in local mode and getting these two log messages who
appear to be similar. I want to understand what each is doing:


   1. [main] util.Utils (Logging.scala:logInfo(59)) - Successfully started
   service 'sparkDriver' on port 60782.
   2. [main] executor.Executor (Logging.scala:logInfo(59)) - Starting
   executor ID driver on host localhost

1. is created using:

val actorSystemName = if (isDriver) driverActorSystemName else
executorActorSystemName

val rpcEnv = RpcEnv.create(actorSystemName, hostname, port, conf,
securityManager)
val actorSystem = rpcEnv.asInstanceOf[AkkaRpcEnv].actorSystem

2. is created when:

 _taskScheduler.start()


What is the difference and what does each do?


Re: Replacing Esper with Spark Streaming?

2015-09-15 Thread Koert Kuipers
obsolete is not the same as dead... we have a few very large tech companies
to prove that point

On Tue, Sep 15, 2015 at 4:32 PM, Bertrand Dechoux 
wrote:

> The big question would be what feature of Esper your are using. Esper is a
> CEP solution. I doubt that Spark Streaming can do everything Esper does
> without any development. Spark (Streaming) is more a general-purpose
> platform.
>
> http://www.espertech.com/products/esper.php
>
> But I would be glad to be proven wrong (which also would implies EsperTech
> is dead, which I also doubt...)
>
> Bertrand
>
> On Mon, Sep 14, 2015 at 2:31 PM, Todd Nist  wrote:
>
>> Stratio offers a CEP implementation based on Spark Streaming and the
>> Siddhi CEP engine.  I have not used the below, but they may be of some
>> value to you:
>>
>> http://stratio.github.io/streaming-cep-engine/
>>
>> https://github.com/Stratio/streaming-cep-engine
>>
>> HTH.
>>
>> -Todd
>>
>> On Sun, Sep 13, 2015 at 7:49 PM, Otis Gospodnetić <
>> otis.gospodne...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I'm wondering if anyone has attempted to replace Esper with Spark
>>> Streaming or if anyone thinks Spark Streaming is/isn't a good tool for the
>>> (CEP) job?
>>>
>>> We are considering Akka or Spark Streaming as possible Esper
>>> replacements and would appreciate any input from people who tried to do
>>> that with either of them.
>>>
>>> Thanks,
>>> Otis
>>> --
>>> Monitoring * Alerting * Anomaly Detection * Centralized Log Management
>>> Solr & Elasticsearch Support * http://sematext.com/
>>>
>>>
>>
>


Re: Replacing Esper with Spark Streaming?

2015-09-15 Thread Thomas Bernhardt
Let me say first, I'm the Esper project lead.Esper is alive and well and not at 
all obsolete. Esper provides event series analysis by providing an 
SQL92-standards event processing language (EPL). It allows to express situation 
detection logic very concisely, usually much more concisely then any code you'd 
need to write and deploy yourself into a streaming container. This is because 
EPL has concepts such as joins, subqueries, patterns etc.. 
By the way, there are so many streaming containers like spark streaming. I 
think I could list 20 or more relevant streaming logic containers, all 
incompatible to each other, most likely many of these new streaming logic 
containers will also be obsolete in the next few years.
Best regards,Tom
  From: Koert Kuipers 
 To: Bertrand Dechoux  
Cc: Todd Nist ; Otis Gospodnetić 
; "user@spark.apache.org"  
 Sent: Tuesday, September 15, 2015 7:53 PM
 Subject: Re: Replacing Esper with Spark Streaming?
   
obsolete is not the same as dead... we have a few very large tech companies to 
prove that point



On Tue, Sep 15, 2015 at 4:32 PM, Bertrand Dechoux  wrote:

The big question would be what feature of Esper your are using. Esper is a CEP 
solution. I doubt that Spark Streaming can do everything Esper does without any 
development. Spark (Streaming) is more a general-purpose platform.

http://www.espertech.com/products/esper.php

But I would be glad to be proven wrong (which also would implies EsperTech is 
dead, which I also doubt...)

Bertrand

On Mon, Sep 14, 2015 at 2:31 PM, Todd Nist  wrote:

Stratio offers a CEP implementation based on Spark Streaming and the Siddhi CEP 
engine.  I have not used the below, but they may be of some value to you:

http://stratio.github.io/streaming-cep-engine/https://github.com/Stratio/streaming-cep-engineHTH.-Todd
On Sun, Sep 13, 2015 at 7:49 PM, Otis Gospodnetić  
wrote:

Hi,
I'm wondering if anyone has attempted to replace Esper with Spark Streaming or 
if anyone thinks Spark Streaming is/isn't a good tool for the (CEP) job?
We are considering Akka or Spark Streaming as possible Esper replacements and 
would appreciate any input from people who tried to do that with either of 
them. Thanks,Otis
--
Monitoring * Alerting * Anomaly Detection * Centralized Log ManagementSolr & 
Elasticsearch Support * http://sematext.com/









  

Re: hdfs-ha on mesos - odd bug

2015-09-15 Thread Marcelo Vanzin
On Mon, Sep 14, 2015 at 6:55 AM, Adrian Bridgett  wrote:
> 15/09/14 13:00:25 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0,
> 10.1.200.245): java.lang.IllegalArgumentException:
> java.net.UnknownHostException: nameservice1
> at
> org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:377)
> at
> org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:310)

This looks like you're trying to connect to an HA HDFS service but you
have not provided the proper hdfs-site.xml for your app; then, instead
of recognizing "nameservice1" as an HA nameservice, it thinks it's an
actual NN address, tries to connect to it, and fails.

If you provide the correct hdfs-site.xml to your app (by placing it in
$SPARK_HOME/conf or setting HADOOP_CONF_DIR to point to the conf
directory), it should work.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Change protobuf version or any other third party library version in Spark application

2015-09-15 Thread Lan Jiang
I am happy to report that after set spark.dirver.userClassPathFirst, I can use 
protobuf 3 with spark-shell. Looks like the classloading issue in the driver, 
not executor. 

Marcelo, thank you very much for the tip!

Lan


> On Sep 15, 2015, at 1:40 PM, Marcelo Vanzin  wrote:
> 
> Hi,
> 
> Just "spark.executor.userClassPathFirst" is not enough. You should
> also set "spark.driver.userClassPathFirst". Also not that I don't
> think this was really tested with the shell, but that should work with
> regular apps started using spark-submit.
> 
> If that doesn't work, I'd recommend shading, as others already have.
> 
> On Tue, Sep 15, 2015 at 9:19 AM, Lan Jiang  wrote:
>> I used the --conf spark.files.userClassPathFirst=true  in the spark-shell
>> option, it still gave me the eror: java.lang.NoSuchFieldError: unknownFields
>> if I use protobuf 3.
>> 
>> The output says spark.files.userClassPathFirst is deprecated and suggest
>> using spark.executor.userClassPathFirst. I tried that and it did not work
>> either.
> 
> -- 
> Marcelo


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



[ANNOUNCE] Apache Gora 0.6.1 Release

2015-09-15 Thread lewis john mcgibbney
Hi All,

The Apache Gora team are pleased to announce the immediate availability of
Apache Gora 0.6.1.

What is Gora?
Gora is a framework which provides an in-memory data model and persistence
for big data. Gora supports persisting to column stores, key value stores,
document stores and RDBMSs, and analyzing the data with extensive Apache
Hadoop™  MapReduce
 support. This
release also offers input and output formats for Apache Spark.

Whats in this release?

This release addresses a modest 21 issues  with
many improvements and bug fixes for the gora-mongodb
 module, resolution of a
major bug whilst flushing data to Apache Solr, a gora-gradle plugin
 and our Gora Spark backend
support . Drop by our
mailing lists and ask questions for information on any of the above.

We provide Gora support for the following projects

   - Apache Avro 1.7.6
   - Apache Hadoop 1.2.1 and 2.5.2
   - Apache HBase 0.98.8-hadoop2 (although also tested with 1.X)
   - Apache Cassandra 2.0.2
   - Apache Solr 4.10.3
   - MongoDB 2.6.X
   - Apache Accumlo 1.5.1
   - Apache Spark 1.4.1

Gora is released as both source code, downloads for which can be found at
our downloads page  as well as Maven
artifacts which can be found on Maven central
.
Thank you
Lewis
(On behalf of the Apache Gora PMC)

http://people.apache.org/~lewismc || @hectorMcSpector ||
http://www.linkedin.com/in/lmcgibbney

  Apache Gora V.P || Apache Nutch PMC || Apache Any23 V.P ||
Apache OODT PMC
   Apache Open Climate Workbench PMC || Apache Tika PMC || Apache TAC
Apache Usergrid || Apache HTrace (incubating) || Apache CommonsRDF
(incubating)


How to speed up MLlib LDA?

2015-09-15 Thread Marko Asplund
Hi,

I'm trying out MLlib LDA training with 100 topics, 105 K vocabulary size
and ~3.4 M documents using EMLDAOptimizer.

Training the model took ~2.5 hours with MLlib, whereas with Vowpal Wabbit
training with the same data and on the same system set took ~5 minutes.

I realize that there are differences in the LDA implementations, but which
parameters should I tweak to make the LDA implementations work with similar
operational parameters and thus make the results more comparable?

Any suggestions on how to speed up MLlib LDA and thoughts on speed-accuracy
tradeoffs?

The log includes the following message, which AFAIK, should mean
that netlib-java is using machine optimised native implementation:
"com.github.fommil.jni.JniLoader - successfully loaded
/tmp/jniloader4682745056459314976netlib-native_system-linux-x86_64.so"

The LDA model training proto code I'm using can be found here:
https://github.com/marko-asplund/tech-protos/blob/master/mllib-lda/src/main/scala/fi/markoa/proto/mllib/LDADemo.scala#L33-L47


marko


Re: A way to timeout and terminate a laggard 'Stage' ?

2015-09-15 Thread Akhil Das
As of now i think its a no. Not sure if its a naive approach, but yes you
can have a separate program to keep an eye in the webui (possibly parsing
the content) and make it trigger the kill task/job once it detects a lag.
(Again you will have to figure out the correct numbers before killing any
job)

Thanks
Best Regards

On Mon, Sep 14, 2015 at 10:40 PM, Dmitry Goldenberg <
dgoldenberg...@gmail.com> wrote:

> Is there a way in Spark to automatically terminate laggard "stage's", ones
> that appear to be hanging?   In other words, is there a timeout for
> processing of a given RDD?
>
> In the Spark GUI, I see the "kill" function for a given Stage under
> 'Details for Job <...>".
>
> Is there something in Spark that would identify and kill laggards
> proactively?
>
> Thanks.
>


Re: Spark Streaming Suggestion

2015-09-15 Thread srungarapu vamsi
The batch approach i had implemented takes about 10 minutes to complete all
the pre-computation tasks for the one hour worth of data. When i went
through my code, i figured out that most of the time consuming tasks are
the ones, which read data from cassandra and the places where i perform
sparkContex.union(Array[RDD]).
Now the ask is to get the pre computation tasks near real time. So i am
exploring the streaming approach.

My pre computation tasks not only include just finding the unique numbers
for a given device every minute, every hour, every day but it also includes
the following tasks:
1. Find the number of unique numbers across a set of devices every minute,
every hour, every day
2. Find the number of unique numbers which are commonly occurring across a
set of devices every minute, every hour, every day
3. Find (total time a number occurred across a set of devices)/(total
unique numbers occurred across the set of devices)
The above mentioned pre computation tasks are just a few of what i will be
needing and there are many more coming towards me :)
I see all these problems need more of data parallel approach and hence i am
interested to do this on the spark streaming end.


On Tue, Sep 15, 2015 at 11:04 AM, Jörn Franke  wrote:

> Why did you not stay with the batch approach? For me the architecture
> looks very complex for a simple thing you want to achieve. Why don't you
> process the data already in storm ?
>
> Le mar. 15 sept. 2015 à 6:20, srungarapu vamsi 
> a écrit :
>
>> I am pretty new to spark. Please suggest a better model for the following
>> use case.
>>
>> I have few (about 1500) devices in field which keep emitting about 100KB
>> of data every minute. The nature of data sent by the devices is just a list
>> of numbers.
>> As of now, we have Storm is in the architecture which receives this data,
>> sanitizes it and writes to cassandra.
>> Now, i have a requirement to process this data. The processing includes
>> finding unique numbers emitted by one or more devices for every minute,
>> every hour, every day, every month.
>> I had implemented this processing part as a batch job execution and now i
>> am interested in making it a streaming application. i.e calculating the
>> processed data as and when devices emit the data.
>>
>> I have the following two approaches:
>> 1. Storm writes the actual data to cassandra and writes a message on
>> Kafka bus that data corresponding to device D and minute M has been written
>> to cassandra
>>
>> Then Spark streaming reads this message from kafka , then reads the data
>> of Device D at minute M from cassandra and starts processing the data.
>>
>> 2. Storm writes the data to both cassandra and  kafka, spark reads the
>> actual data from kafka , processes the data and writes to cassandra.
>> The second approach avoids additional hit of reading from cassandra every
>> minute , a device has written data to cassandra at the cost of putting the
>> actual heavy messages instead of light events on  kafka.
>>
>> I am a bit confused among the two approaches. Please suggest which one is
>> better and if both are bad, how can i handle this use case?
>>
>>
>> --
>> /Vamsi
>>
>


-- 
/Vamsi


Re: hdfs-ha on mesos - odd bug

2015-09-15 Thread Adrian Bridgett
Hi Sam, in short, no, it's a traditional install as we plan to use spot 
instances and didn't want price spikes to kill off HDFS.


We're actually doing a bit of a hybrid, using spot instances for the 
mesos slaves, ondemand for the mesos masters.  So for the time being, 
putting hdfs on the masters (we'll probably move to multiple slave 
instance types to avoid losing too many when spot price spikes, but for 
now this is acceptable).   Masters running CDH5.


Using hdfs://current-hdfs-master:8020 works fine, however using 
hdfs://nameservice1 fails in the rather odd way described (well, more 
that the workaround actually works!)  I think there's some underlying 
bug here that's being exposed.



On 14/09/2015 22:27, Sam Bessalah wrote:
I don't know about the broken url. But are you running HDFS as a mesos 
framework? If so is it using mesos-dns?
Then you should resolve the namenode via hdfs:/// 



On Mon, Sep 14, 2015 at 3:55 PM, Adrian Bridgett 
> wrote:


I'm hitting an odd issue with running spark on mesos together with
HA-HDFS, with an even odder workaround.

In particular I get an error that it can't find the HDFS
nameservice unless I put in a _broken_ url (discovered that
workaround by mistake!).  core-site.xml, hdfs-site.xml is
distributed to the slave node - and that file is read since I
deliberately break the file then I get an error as you'd expect.

NB: This is a bit different to

http://mail-archives.us.apache.org/mod_mbox/spark-user/201402.mbox/%3c1392442185079-1549.p...@n3.nabble.com%3E


Spark 1.5.0:

t=sc.textFile("hdfs://nameservice1/tmp/issue")
t.count()
(fails)

t=sc.textFile("file://etc/passwd")
t.count()
(errors about bad url - should have an extra / of course)
t=sc.textFile("hdfs://nameservice1/tmp/issue")
t.count()
then it works!!!

I should say that using file:///etc/passwd or hdfs:///tmp/issue
both fail as well.  Unless preceded by a broken url.I've tried
setting spark.hadoop.cloneConf to true, no change.

Sample (broken) run:
15/09/14 13:00:14 DEBUG HadoopRDD: Creating new JobConf and
caching it for later re-use
15/09/14 13:00:14 DEBUG : address: ip-10-1-200-165/10.1.200.165
 isLoopbackAddress: false, with host
10.1.200.165 ip-10-1-200-165
15/09/14 13:00:14 DEBUG BlockReaderLocal:
dfs.client.use.legacy.blockreader.local = false
15/09/14 13:00:14 DEBUG BlockReaderLocal:
dfs.client.read.shortcircuit = false
15/09/14 13:00:14 DEBUG BlockReaderLocal:
dfs.client.domain.socket.data.traffic = false
15/09/14 13:00:14 DEBUG BlockReaderLocal: dfs.domain.socket.path =
/var/run/hdfs-sockets/dn
15/09/14 13:00:14 DEBUG HAUtil: No HA service delegation token
found for logical URI hdfs://nameservice1
15/09/14 13:00:14 DEBUG BlockReaderLocal:
dfs.client.use.legacy.blockreader.local = false
15/09/14 13:00:14 DEBUG BlockReaderLocal:
dfs.client.read.shortcircuit = false
15/09/14 13:00:14 DEBUG BlockReaderLocal:
dfs.client.domain.socket.data.traffic = false
15/09/14 13:00:14 DEBUG BlockReaderLocal: dfs.domain.socket.path =
/var/run/hdfs-sockets/dn
15/09/14 13:00:14 DEBUG RetryUtils: multipleLinearRandomRetry = null
15/09/14 13:00:14 DEBUG Server: rpcKind=RPC_PROTOCOL_BUFFER,
rpcRequestWrapperClass=class
org.apache.hadoop.ipc.ProtobufRpcEngine$RpcRequestWrapper,

rpcInvoker=org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker@6245f50b
15/09/14 13:00:14 DEBUG Client: getting client out of cache:
org.apache.hadoop.ipc.Client@267f0fd3
15/09/14 13:00:14 DEBUG NativeCodeLoader: Trying to load the
custom-built native-hadoop library...
15/09/14 13:00:14 DEBUG NativeCodeLoader: Loaded the native-hadoop
library
...
15/09/14 13:00:14 DEBUG Client: Connecting to
mesos-1.example.com/10.1.200.165:8020

15/09/14 13:00:14 DEBUG Client: IPC Client (1739425103) connection
to mesos-1.example.com/10.1.200.165:8020
 from ubuntu:
starting, having connections 1
15/09/14 13:00:14 DEBUG Client: IPC Client (1739425103) connection
to mesos-1.example.com/10.1.200.165:8020
 from ubuntu sending #0
15/09/14 13:00:14 DEBUG Client: IPC Client (1739425103) connection
to mesos-1.example.com/10.1.200.165:8020
 from ubuntu got
value #0
15/09/14 13:00:14 DEBUG ProtobufRpcEngine: Call: getFileInfo took 36ms
15/09/14 13:00:14 DEBUG FileInputFormat: Time taken to get
FileStatuses: 69
15/09/14 13:00:14 INFO FileInputFormat: Total input paths to
process : 1
15/09/14 13:00:14 DEBUG Client: IPC Client (1739425103) connection
to 

Re: why spark and kafka always crash

2015-09-15 Thread Akhil Das
Can you be more precise?

Thanks
Best Regards

On Tue, Sep 15, 2015 at 11:28 AM, Joanne Contact 
wrote:

> How to prevent it?
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Change protobuf version or any other third party library version in Spark application

2015-09-15 Thread Marcelo Vanzin
Hi,

Just "spark.executor.userClassPathFirst" is not enough. You should
also set "spark.driver.userClassPathFirst". Also not that I don't
think this was really tested with the shell, but that should work with
regular apps started using spark-submit.

If that doesn't work, I'd recommend shading, as others already have.

On Tue, Sep 15, 2015 at 9:19 AM, Lan Jiang  wrote:
> I used the --conf spark.files.userClassPathFirst=true  in the spark-shell
> option, it still gave me the eror: java.lang.NoSuchFieldError: unknownFields
> if I use protobuf 3.
>
> The output says spark.files.userClassPathFirst is deprecated and suggest
> using spark.executor.userClassPathFirst. I tried that and it did not work
> either.

-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: A way to timeout and terminate a laggard 'Stage' ?

2015-09-15 Thread Dmitry Goldenberg
Thanks, Mark, will look into that...

On Tue, Sep 15, 2015 at 12:33 PM, Mark Hamstra 
wrote:

> There is the Async API (
> https://github.com/clearstorydata/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala),
> which makes use of FutureAction (
> https://github.com/clearstorydata/spark/blob/master/core/src/main/scala/org/apache/spark/FutureAction.scala).
> You could also wrap up your Jobs in Futures on your own.
>
> On Mon, Sep 14, 2015 at 11:37 PM, Akhil Das 
> wrote:
>
>> As of now i think its a no. Not sure if its a naive approach, but yes you
>> can have a separate program to keep an eye in the webui (possibly parsing
>> the content) and make it trigger the kill task/job once it detects a lag.
>> (Again you will have to figure out the correct numbers before killing any
>> job)
>>
>> Thanks
>> Best Regards
>>
>> On Mon, Sep 14, 2015 at 10:40 PM, Dmitry Goldenberg <
>> dgoldenberg...@gmail.com> wrote:
>>
>>> Is there a way in Spark to automatically terminate laggard "stage's",
>>> ones that appear to be hanging?   In other words, is there a timeout for
>>> processing of a given RDD?
>>>
>>> In the Spark GUI, I see the "kill" function for a given Stage under
>>> 'Details for Job <...>".
>>>
>>> Is there something in Spark that would identify and kill laggards
>>> proactively?
>>>
>>> Thanks.
>>>
>>
>>
>


Re: Spark ANN

2015-09-15 Thread Ruslan Dautkhanov
Thank you Alexander.
Sounds like quite a lot of good and exciting changes slated for Spark's ANN.
Looking forward to it.



-- 
Ruslan Dautkhanov

On Wed, Sep 9, 2015 at 7:10 PM, Ulanov, Alexander 
wrote:

> Thank you, Feynman, this is helpful. The paper that I linked claims a big
> speedup with FFTs for large convolution size. Though as you mentioned there
> is no FFT transformer in Spark yet. Moreover, we would need a parallel
> version of FFTs to support batch computations. So it probably worth
> considering matrix-matrix multiplication for convolution optimization at
> least as a first version. It can also take advantage of data batches.
>
>
>
> *From:* Feynman Liang [mailto:fli...@databricks.com]
> *Sent:* Wednesday, September 09, 2015 12:56 AM
>
> *To:* Ulanov, Alexander
> *Cc:* Ruslan Dautkhanov; Nick Pentreath; user; na...@yandex.ru
> *Subject:* Re: Spark ANN
>
>
>
> My 2 cents:
>
>
>
> * There is frequency domain processing available already (e.g. spark.ml
> DCT transformer) but no FFT transformer yet because complex numbers are not
> currently a Spark SQL datatype
>
> * We shouldn't assume signals are even, so we need complex numbers to
> implement the FFT
>
> * I have not closely studied the relative performance tradeoffs, so please
> do let me know if there's a significant difference in practice
>
>
>
>
>
>
>
> On Tue, Sep 8, 2015 at 5:46 PM, Ulanov, Alexander <
> alexander.ula...@hpe.com> wrote:
>
> That is an option too. Implementing convolutions with FFTs should be
> considered as well http://arxiv.org/pdf/1312.5851.pdf.
>
>
>
> *From:* Feynman Liang [mailto:fli...@databricks.com]
> *Sent:* Tuesday, September 08, 2015 12:07 PM
> *To:* Ulanov, Alexander
> *Cc:* Ruslan Dautkhanov; Nick Pentreath; user; na...@yandex.ru
> *Subject:* Re: Spark ANN
>
>
>
> Just wondering, why do we need tensors? Is the implementation of convnets
> using im2col (see here )
> insufficient?
>
>
>
> On Tue, Sep 8, 2015 at 11:55 AM, Ulanov, Alexander <
> alexander.ula...@hpe.com> wrote:
>
> Ruslan, thanks for including me in the discussion!
>
>
>
> Dropout and other features such as Autoencoder were implemented, but not
> merged yet in order to have room for improving the internal Layer API. For
> example, there is an ongoing work with convolutional layer that
> consumes/outputs 2D arrays. We’ll probably need to change the Layer’s
> input/output type to tensors. This will influence dropout which will need
> some refactoring to handle tensors too. Also, all new components should
> have ML pipeline public interface. There is an umbrella issue for deep
> learning in Spark https://issues.apache.org/jira/browse/SPARK-5575 which
> includes various features of Autoencoder, in particular
> https://issues.apache.org/jira/browse/SPARK-10408. You are very welcome
> to join and contribute since there is a lot of work to be done.
>
>
>
> Best regards, Alexander
>
> *From:* Ruslan Dautkhanov [mailto:dautkha...@gmail.com]
> *Sent:* Monday, September 07, 2015 10:09 PM
> *To:* Feynman Liang
> *Cc:* Nick Pentreath; user; na...@yandex.ru
> *Subject:* Re: Spark ANN
>
>
>
> Found a dropout commit from avulanov:
>
>
> https://github.com/avulanov/spark/commit/3f25e26d10ef8617e46e35953fe0ad1a178be69d
>
>
>
> It probably hasn't made its way to MLLib (yet?).
>
>
>
>
>
> --
> Ruslan Dautkhanov
>
>
>
> On Mon, Sep 7, 2015 at 8:34 PM, Feynman Liang 
> wrote:
>
> Unfortunately, not yet... Deep learning support (autoencoders, RBMs) is on
> the roadmap for 1.6 
> though, and there is a spark package
>  for
> dropout regularized logistic regression.
>
>
>
>
>
> On Mon, Sep 7, 2015 at 3:15 PM, Ruslan Dautkhanov 
> wrote:
>
> Thanks!
>
>
>
> It does not look Spark ANN yet supports dropout/dropconnect or any other
> techniques that help avoiding overfitting?
>
> http://www.cs.toronto.edu/~rsalakhu/papers/srivastava14a.pdf
>
> https://cs.nyu.edu/~wanli/dropc/dropc.pdf
>
>
>
> ps. There is a small copy-paste typo in
>
>
> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/ann/BreezeUtil.scala#L43
>
> should read B :)
>
>
>
>
>
> --
> Ruslan Dautkhanov
>
>
>
> On Mon, Sep 7, 2015 at 12:47 PM, Feynman Liang 
> wrote:
>
> Backprop is used to compute the gradient here
> ,
> which is then optimized by SGD or LBFGS here
> 
>
>
>
> On Mon, Sep 7, 2015 at 11:24 AM, Nick Pentreath 
> wrote:
>
> Haven't checked the actual code but that doc says "MLPC employes
> backpropagation for learning the model. .."?
>
>
>
>
>
>
> —
> Sent from 

How to convert dataframe to a nested StructType schema

2015-09-15 Thread Hao Wang
Hi, 

I created a dataframe with 4 string columns (city, state, country, zipcode).
I then applied the following nested schema to it by creating a custom
StructType. When I run df.take(5), it gives the exception below as expected.
The question is how I can convert the Rows in the dataframe to conform to
this nested schema? Thanks! 

root 
 |-- ZipCode: struct (nullable = true) 
 ||-- zip: string (nullable = true) 
 |-- Address: struct (nullable = true) 
 ||-- city: string (nullable = true) 
 ||-- state: string (nullable = true) 
 ||-- country: string (nullable = true) 

[info]   org.apache.spark.SparkException: Job aborted due to stage failure:
Task 0 in stage 6.0 failed 1 times, most recent failure: Lost task 0.0 in
stage 6.0 (TID 6, localhost): scala.MatchError: 95123 (of class
java.lang.String) 
[info] at
org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$4.apply(CatalystTypeConverters.scala:178)
 
[info] at
org.apache.spark.sql.catalyst.CatalystTypeConverters$.convertRowWithConverters(CatalystTypeConverters.scala:348)
 
[info] at
org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$4.apply(CatalystTypeConverters.scala:180)
 
[info] at
org.apache.spark.sql.SQLContext$$anonfun$9.apply(SQLContext.scala:488) 
[info] at
org.apache.spark.sql.SQLContext$$anonfun$9.apply(SQLContext.scala:488) 




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-convert-dataframe-to-a-nested-StructType-schema-tp24703.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Managing scheduling delay in Spark Streaming

2015-09-15 Thread Michal Čizmazia
Hi,

I have a Reliable Custom Receiver storing messages into Spark. Is there way
how to prevent my receiver from storing more messages into Spark when the
Scheduling Delay reaches a certain threshold?

Possible approaches:
#1 Does Spark block on the Receiver.store(messages) call to prevent storing
more messages and overflowing the system?
#2 How to obtain the Scheduling Delay in the Custom Receiver, so that I can
implement the feature.

Thanks,

Mike


DStream flatMap "swallows" records

2015-09-15 Thread Jeffrey Jedele
Hi all,
I've got a problem with Spark Streaming (both 1.3.1 and 1.5). Following
situation:
There is a component which gets a DStream of URLs. For each of these URLs,
it should access it, retrieve several data elements and pass those on for
further processing.

The relevant code looks like this:
...
   val urls: DStream[HttpRequest] = ...

val documents = urls.flatMap { url =>
  val docs: Seq[(Label, Document)] = fetcher.retrieveContent(url)
  System.err.println("D1: " + docs.size + " " +
docs.map(_._2.source.timestamp))
  docs
}

documents.count().foreachRDD { rdd => System.err.println("D2: " +
rdd.collect().toList) }

// write content to kafka
documents.foreachRDD { rdd =>
  rdd.foreachPartition { rddPartition =>
val docs = rddPartition.toList
System.err.println("D3:" + docs.map {_._2.source.timestamp})
val messages = docs.map { t => ("raw", t._1, t._2) }
Kafka.getSink(zkConfig).accept(messages)
  }
}
...

I see following output when I run this in Sparks local mode (cut irrelevant
parts, "timestamp" is a unique sequence number to track documents):
D2: List(0)
D3:List()
D1: 10 List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
D2: List(10)
D1: 10 List(11, 12, 13, 14, 15, 16, 17, 18, 19, 20)
D3:List(11, 12, 13, 14, 15, 16, 17, 18, 19, 20)
D1: 10 List(21, 22, 23, 24, 25, 26, 27, 28, 29, 30)
D1: 10 List(31, 32, 33, 34, 35, 36, 37, 38, 39, 40)
D1: 10 List(41, 42, 43, 44, 45, 46, 47, 48, 49, 50)
D1: 10 List(51, 52, 53, 54, 55, 56, 57, 58, 59, 60)
D2: List(30)
D1: 10 List(61, 62, 63, 64, 65, 66, 67, 68, 69, 70)
D1: 10 List(71, 72, 73, 74, 75, 76, 77, 78, 79, 80)
D1: 10 List(81, 82, 83, 84, 85, 86, 87, 88, 89, 90)
D3:List(61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77,
78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90)
D1: 10 List(91, 92, 93, 94, 95, 96, 97, 98, 99, 100)
D1: 0 List()
D2: List(0)
D1: 0 List()
D3:List()
D1: 0 List()
D2: List(0)
D3:List()

When I look at the D1 lines (inside the flatMap function), I count 10
batches of 10 documents which is exactly as expected.
Then  I count the D1,2 lines though (after the flatMap function), there are
only 40 documents.

A document in my case is a key,value-tuple, the key objects in this case
being the same for all records.

Does anyone have an idea what might be happening to my other 60 documents?

Thank you so much in advance!

Regards,
Jeffrey


Re: Change protobuf version or any other third party library version in Spark application

2015-09-15 Thread Steve Loughran


On 15 Sep 2015, at 05:47, Lan Jiang 
> wrote:

Hi, there,

I am using Spark 1.4.1. The protobuf 2.5 is included by Spark 1.4.1 by default. 
However, I would like to use Protobuf 3 in my spark application so that I can 
use some new features such as Map support.  Is there anyway to do that?

Right now if I build a uber.jar with dependencies including protobuf 3 classes 
and pass to spark-shell through --jars option, during the execution, I got the 
error java.lang.NoSuchFieldError: unknownFields.


protobuf is an absolute nightmare version-wise, as protoc generates 
incompatible java classes even across point versions. Hadoop 2.2+ is and will 
always be protobuf 2.5 only; that applies transitively to downstream projects  
(the great protobuf upgrade of 2013 was actually pushed by the HBase team, and 
required a co-ordinated change across multiple projects)


Is there anyway to use a different version of Protobuf other than the default 
one included in the Spark distribution? I guess I can generalize and extend the 
question to any third party libraries. How to deal with version conflict for 
any third party libraries included in the Spark distribution?

maven shading is the strategy. Generally it is less needed, though the 
troublesome binaries are,  across the entire apache big data stack:

google protobuf
google guava
kryo
jackson

you can generally bump up the other versions, at least by point releases.


Re: hdfs-ha on mesos - odd bug

2015-09-15 Thread Steve Loughran

> On 15 Sep 2015, at 08:55, Adrian Bridgett  wrote:
> 
> Hi Sam, in short, no, it's a traditional install as we plan to use spot 
> instances and didn't want price spikes to kill off HDFS.
> 
> We're actually doing a bit of a hybrid, using spot instances for the mesos 
> slaves, ondemand for the mesos masters.  So for the time being, putting hdfs 
> on the masters (we'll probably move to multiple slave instance types to avoid 
> losing too many when spot price spikes, but for now this is acceptable).   
> Masters running CDH5.

It's incredibly dangerous using hdfs NNs on spot vms; a significant enough 
spike will lose all of them in one go, and there goes your entire filesystem. 
Have a static VM, maybe even backed by EBS.

If you look at Hadoop architectures from Hortonworks, Cloudera and Amazon 
themselves, the usual stance is HDFS on static nodes, spot instances for 
compute only

> 
> Using hdfs://current-hdfs-master:8020 works fine, however using 
> hdfs://nameservice1 fails in the rather odd way described (well, more that 
> the workaround actually works!)  I think there's some underlying bug here 
> that's being exposed. 


this sounds an issue orthogonal to spot instances. Maybe related to how JVMs 
cache DNS entries forever?

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark aggregateByKey Issues

2015-09-15 Thread biyan900116
Hi Alexis:

Of course, it’s very useful to me, specially about the operations after sort 
operation is done.

And, i still have one question:
How to set the decent number of partition,  if it need not to be equal to the 
number of keys ?

> 在 2015年9月15日,下午3:41,Alexis Gillain  写道:
> 
> Sorry I made a typo error in my previous message, you can't 
> sortByKey(youkey,date) and have all records of your keys in the same 
> partition.
>  
> So you can sortByKey(yourkey)  with a decent number of partition (doesnt have 
> to be the number of keys). After that you have records of a key grouped in a 
> partition but not sort by date.
> 
> Then you use mapPartitions to copy the partition in a List and you can sort 
> by (youkey, date) and use this list to compute whatever you want. The main 
> issue is that a partition must fit in memory.
> 
> Hope this help.
> 
> 2015-09-15 13:50 GMT+08:00  >:
> Hi Alexis:
> 
> Thank you for your replying.
> 
> My case is that each operation to one record need to depend on one value that 
> will be set by the operating to the last record. 
> 
> So your advise is that i can use “sortByKey”. “sortByKey” will put all 
> records with the same Key in one partition. Need I take the “numPartitions” 
> parameter ? Or even if i don’t , it still do that .
> 
> If it works, add “aggregate” to deal with my case, i think the comOp function 
> in parameter list of aggregate API won’t be executed.. Is my understanding 
> wrong ? 
>   
> 
>> 在 2015年9月15日,下午12:47,Alexis Gillain > > 写道:
>> 
>> I'm not sure about what you want to do.
>> 
>> You should try to sort the RDD by (yourKey, date), it ensures that all the 
>> keys are in the same partition.
>> 
>> You problem after that is you want to aggregate only on yourKey and if you 
>> change the Key of the sorted RDD you loose partitionning.
>> 
>> Depending of the size of the result you can use an aggregate bulding a map 
>> of results by youKey or use MapPartition to output a rdd (in this case set 
>> the number of partition high enough to allow the partition to fit in memory).
>> 
>> see 
>> http://apache-spark-user-list.1001560.n3.nabble.com/Folding-an-RDD-in-order-td16577.html
>>  
>> 
>> 
>> 2015-09-15 11:25 GMT+08:00 毕岩 > >:
>> Hi:
>> 
>> 
>> 
>> There is such one case about using reduce operation like that:
>> 
>> 
>> 
>> I Need to reduce large data made up of billions of records with a Key-Value 
>> pair.
>> 
>> For the following:
>> 
>> First,group by Key, and the records with the same Key need to be in 
>> order of one field called “date” in Value
>> 
>> Second, in records with the same Key, every operation to one recored 
>> need to depend on the result of dealing with the last one, and the first one 
>> is original state..
>> 
>> 
>> 
>> Some attempts:
>> 
>> 1. groupByKey + map :  because of the bad performance, CPU is to 100%, so 
>> executors heartbeat time out and throw errors “Lost executor”, or the 
>> application is hung…
>> 
>> 
>> 
>> 2. AggregateByKey:
>> 
>> def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,
>> 
>> combOp: (U, U) => U): RDD[(K, U)]
>> 
>> About aggregateByKey, is all the records with the same Key In the same 
>> partition ? Is the zeroValue applied to the first one in all records with 
>> the same Key, or in each partition ? If it is the former, comOp Function do 
>> nothing! 
>> 
>> I tried to take the second “numPartitions” parameter, pass the number of key 
>> to it. But, the number of key is so large to all the tasks be killed.
>> 
>> 
>> 
>> What should I do with this case ? 
>> 
>> I'm asking for advises online...
>> 
>> Thank you.
>> 
>> 
>> 
>> 
>> -- 
>> Alexis GILLAIN
> 
> 
> 
> 
> -- 
> Alexis GILLAIN



Relational Log Data

2015-09-15 Thread 328d95
I am trying to read logs which have many irrelevant lines and whose lines are
related by a thread number in each line.

Firstly, if I read from a text file using the textFile function and then
call multiple filter functions on that file will Spark apply all of the
filters using one read pass?

Eg will the second filter incur another read of log.txt?
val file = sc.textFile("log.txt")
val test = file.filter(some condition)
val test1 = file.filter(some other condition)

Secondly, if there are multiple reads I was thinking that I could apply a
filter that gets rid of all of the lines that I do not need and cache that
in a PairRDD. From that PairRDD I would need to remove keys that only appear
once, is there a recommended strategy for this? I was thinking about using
distinct to create another PairRDD and then using subtract, but this seems
inefficient.

Thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Relational-Log-Data-tp24696.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Relational Log Data

2015-09-15 Thread ayan guha
Spark functions are lazy, so none of them actually do anything until an
action is encountered. And no, your code will NOT read the file multiple
time.

On Tue, Sep 15, 2015 at 7:33 PM, 328d95 <20500...@student.uwa.edu.au> wrote:

> I am trying to read logs which have many irrelevant lines and whose lines
> are
> related by a thread number in each line.
>
> Firstly, if I read from a text file using the textFile function and then
> call multiple filter functions on that file will Spark apply all of the
> filters using one read pass?
>
> Eg will the second filter incur another read of log.txt?
> val file = sc.textFile("log.txt")
> val test = file.filter(some condition)
> val test1 = file.filter(some other condition)
>
> Secondly, if there are multiple reads I was thinking that I could apply a
> filter that gets rid of all of the lines that I do not need and cache that
> in a PairRDD. From that PairRDD I would need to remove keys that only
> appear
> once, is there a recommended strategy for this? I was thinking about using
> distinct to create another PairRDD and then using subtract, but this seems
> inefficient.
>
> Thanks
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Relational-Log-Data-tp24696.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Best Regards,
Ayan Guha


Re: Caching intermediate results in Spark ML pipeline?

2015-09-15 Thread Feynman Liang
Nope, and that's intentional. There is no guarantee that rawData did not
change between intermediate calls to searchRun, so reusing a cached data1
would be incorrect.

If you want data1 to be cached between multiple runs, you have a few
options:
* cache it first and pass it in as an argument to searchRun
* use a creational pattern like singleton to ensure only one instantiation

On Tue, Sep 15, 2015 at 12:49 AM, Jingchu Liu  wrote:

> Hey Feynman,
>
> I doubt DF persistence will work in my case. Let's use the following code:
> ==
> def searchRun( params = [param1, param2] )
>   data1 = hashing1.transform(rawData, param1)
>   data1.cache()
>   data2 = hashing2.transform(data1, param2)
>   data2.someAction()
> ==
> Say if we run "searchRun()" for 2 times with the same "param1" but
> different "param2". Will spark recognize that the two local variables
> "data1" in consecutive runs has the same content?
>
>
> Best,
> Lewis
>
> 2015-09-15 13:58 GMT+08:00 Feynman Liang :
>
>> You can persist the transformed Dataframes, for example
>>
>> val data : DF = ...
>> val hashedData = hashingTF.transform(data)
>> hashedData.cache() // to cache DataFrame in memory
>>
>> Future usage of hashedData read from an in-memory cache now.
>>
>> You can also persist to disk, eg:
>>
>> hashedData.write.parquet(FilePath) // to write DataFrame in Parquet
>> format to disk
>> ...
>> val savedHashedData = sqlContext.read.parquet(FilePath)
>>
>> Future uses of hash
>>
>> Like my earlier response, this will still require you call each
>> PipelineStage's `transform` method (i.e. to NOT use the overall
>> Pipeline.setStages API)
>>
>> On Mon, Sep 14, 2015 at 10:45 PM, Jingchu Liu 
>> wrote:
>>
>>> Hey Feynman,
>>>
>>> Thanks for your response, but I'm afraid "model save/load" is not
>>> exactly the feature I'm looking for.
>>>
>>> What I need to cache and reuse are the intermediate outputs of
>>> transformations, not transformer themselves. Do you know any related dev.
>>> activities or plans?
>>>
>>> Best,
>>> Lewis
>>>
>>> 2015-09-15 13:03 GMT+08:00 Feynman Liang :
>>>
 Lewis,

 Many pipeline stages implement save/load methods, which can be used if
 you instantiate and call the underlying pipeline stages `transform` methods
 individually (instead of using the Pipeline.setStages API). See associated
 JIRAs .

 Pipeline persistence is on the 1.6 roadmap, JIRA here
 .

 Feynman

 On Mon, Sep 14, 2015 at 9:20 PM, Jingchu Liu 
 wrote:

> Hi all,
>
> I have a question regarding the ability of ML pipeline to cache
> intermediate results. I've posted this question on stackoverflow
> 
> but got no answer, hope someone here can help me out.
>
> ===
> Lately I'm planning to migrate my standalone python ML code to spark.
> The ML pipeline in spark.ml turns out quite handy, with streamlined
> API for chaining up algorithm stages and hyper-parameter grid search.
>
> Still, I found its support for one important feature obscure in
> existing documents: caching of intermediate results. The importance of 
> this
> feature arise when the pipeline involves computation intensive stages.
>
> For example, in my case I use a huge sparse matrix to perform multiple
> moving averages on time series data in order to form input features. The
> structure of the matrix is determined by some hyper-parameter. This step
> turns out to be a bottleneck for the entire pipeline because I have to
> construct the matrix in runtime.
>
> During parameter search, I usually have other parameters to examine in
> addition to this "structure parameter". So if I can reuse the huge matrix
> when the "structure parameter" is unchanged, I can save tons of time. For
> this reason, I intentionally formed my code to cache and reuse these
> intermediate results.
>
> So my question is: can Spark's ML pipeline handle intermediate caching
> automatically? Or do I have to manually form code to do so? If so, is 
> there
> any best practice to learn from?
>
> P.S. I have looked into the official document and some other material,
> but none of them seems to discuss this topic.
>
>
>
> Best,
> Lewis
>


>>>
>>
>


Re: Change protobuf version or any other third party library version in Spark application

2015-09-15 Thread Lan Jiang
I used the --conf spark.files.userClassPathFirst=true  in the spark-shell 
option, it still gave me the eror: java.lang.NoSuchFieldError: unknownFields if 
I use protobuf 3. 

The output says spark.files.userClassPathFirst is deprecated and suggest using 
spark.executor.userClassPathFirst. I tried that and it did not work either. 

Lan



> On Sep 15, 2015, at 10:31 AM, java8964  wrote:
> 
> If you use Standalone mode, just start spark-shell like following:
> 
> spark-shell --jars your_uber_jar --conf spark.files.userClassPathFirst=true 
> 
> Yong
> 
> Date: Tue, 15 Sep 2015 09:33:40 -0500
> Subject: Re: Change protobuf version or any other third party library version 
> in Spark application
> From: ljia...@gmail.com
> To: java8...@hotmail.com
> CC: ste...@hortonworks.com; user@spark.apache.org
> 
> Steve,
> 
> Thanks for the input. You are absolutely right. When I use protobuf 2.6.1, I 
> also ran into method not defined errors. You suggest using Maven sharding 
> strategy, but I have already built the uber jar to package all my custom 
> classes and its dependencies including protobuf 3. The problem is how to 
> configure spark shell to use my uber jar first. 
> 
> java8964 -- appreciate the link and I will try the configuration. Looks 
> promising. However, the "user classpath first" attribute does not apply to 
> spark-shell, am I correct? 
> 
> Lan
> 
> On Tue, Sep 15, 2015 at 8:24 AM, java8964  > wrote:
> It is a bad idea to use the major version change of protobuf, as it most 
> likely won't work.
> 
> But you really want to give it a try, set the "user classpath first", so the 
> protobuf 3 coming with your jar will be used.
> 
> The setting depends on your deployment mode, check this for the parameter:
> 
> https://issues.apache.org/jira/browse/SPARK-2996 
> 
> 
> Yong
> 
> Subject: Re: Change protobuf version or any other third party library version 
> in Spark application
> From: ste...@hortonworks.com 
> To: ljia...@gmail.com 
> CC: user@spark.apache.org 
> Date: Tue, 15 Sep 2015 09:19:28 +
> 
> 
> 
> 
> On 15 Sep 2015, at 05:47, Lan Jiang  > wrote:
> 
> Hi, there,
> 
> I am using Spark 1.4.1. The protobuf 2.5 is included by Spark 1.4.1 by 
> default. However, I would like to use Protobuf 3 in my spark application so 
> that I can use some new features such as Map support.  Is there anyway to do 
> that? 
> 
> Right now if I build a uber.jar with dependencies including protobuf 3 
> classes and pass to spark-shell through --jars option, during the execution, 
> I got the error java.lang.NoSuchFieldError: unknownFields. 
> 
> 
> protobuf is an absolute nightmare version-wise, as protoc generates 
> incompatible java classes even across point versions. Hadoop 2.2+ is and will 
> always be protobuf 2.5 only; that applies transitively to downstream projects 
>  (the great protobuf upgrade of 2013 was actually pushed by the HBase team, 
> and required a co-ordinated change across multiple projects)
> 
> 
> Is there anyway to use a different version of Protobuf other than the default 
> one included in the Spark distribution? I guess I can generalize and extend 
> the question to any third party libraries. How to deal with version conflict 
> for any third party libraries included in the Spark distribution? 
> 
> maven shading is the strategy. Generally it is less needed, though the 
> troublesome binaries are,  across the entire apache big data stack:
> 
> google protobuf
> google guava
> kryo
> jackson
> 
> you can generally bump up the other versions, at least by point releases.



Re: A way to timeout and terminate a laggard 'Stage' ?

2015-09-15 Thread Mark Hamstra
There is the Async API (
https://github.com/clearstorydata/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala),
which makes use of FutureAction (
https://github.com/clearstorydata/spark/blob/master/core/src/main/scala/org/apache/spark/FutureAction.scala).
You could also wrap up your Jobs in Futures on your own.

On Mon, Sep 14, 2015 at 11:37 PM, Akhil Das 
wrote:

> As of now i think its a no. Not sure if its a naive approach, but yes you
> can have a separate program to keep an eye in the webui (possibly parsing
> the content) and make it trigger the kill task/job once it detects a lag.
> (Again you will have to figure out the correct numbers before killing any
> job)
>
> Thanks
> Best Regards
>
> On Mon, Sep 14, 2015 at 10:40 PM, Dmitry Goldenberg <
> dgoldenberg...@gmail.com> wrote:
>
>> Is there a way in Spark to automatically terminate laggard "stage's",
>> ones that appear to be hanging?   In other words, is there a timeout for
>> processing of a given RDD?
>>
>> In the Spark GUI, I see the "kill" function for a given Stage under
>> 'Details for Job <...>".
>>
>> Is there something in Spark that would identify and kill laggards
>> proactively?
>>
>> Thanks.
>>
>
>


Re: Change protobuf version or any other third party library version in Spark application

2015-09-15 Thread Guru Medasani
Hi Lan,

Reading the pull request below. Looks like you should be able to use the config 
to both drivers and executors. I would give it a try with the Spark-shell on 
Yarn client mode.

https://github.com/apache/spark/pull/3233 


Yarn's config option spark.yarn.user.classpath.first does not work the same way 
as
spark.files.userClassPathFirst; Yarn's version is a lot more dangerous, in that 
it
modifies the system classpath, instead of restricting the changes to the user's 
class
loader. So this change implements the behavior of the latter for Yarn, and 
deprecates
the more dangerous choice.

To be able to achieve feature-parity, I also implemented the option for drivers 
(the existing
option only applies to executors). So now there are two options, each 
controlling whether
to apply userClassPathFirst to the driver or executors. The old option was 
deprecated, and
aliased to the new one (spark.executor.userClassPathFirst).

The existing "child-first" class loader also had to be fixed. It didn't handle 
resources, and it
was also doing some things that ended up causing JVM errors depending on how 
things
were being called.


Guru Medasani
gdm...@gmail.com



> On Sep 15, 2015, at 9:33 AM, Lan Jiang  wrote:
> 
> Steve,
> 
> Thanks for the input. You are absolutely right. When I use protobuf 2.6.1, I 
> also ran into method not defined errors. You suggest using Maven sharding 
> strategy, but I have already built the uber jar to package all my custom 
> classes and its dependencies including protobuf 3. The problem is how to 
> configure spark shell to use my uber jar first. 
> 
> java8964 -- appreciate the link and I will try the configuration. Looks 
> promising. However, the "user classpath first" attribute does not apply to 
> spark-shell, am I correct? 
> 
> Lan
> 
> On Tue, Sep 15, 2015 at 8:24 AM, java8964  > wrote:
> It is a bad idea to use the major version change of protobuf, as it most 
> likely won't work.
> 
> But you really want to give it a try, set the "user classpath first", so the 
> protobuf 3 coming with your jar will be used.
> 
> The setting depends on your deployment mode, check this for the parameter:
> 
> https://issues.apache.org/jira/browse/SPARK-2996 
> 
> 
> Yong
> 
> Subject: Re: Change protobuf version or any other third party library version 
> in Spark application
> From: ste...@hortonworks.com 
> To: ljia...@gmail.com 
> CC: user@spark.apache.org 
> Date: Tue, 15 Sep 2015 09:19:28 +
> 
> 
> 
> 
> On 15 Sep 2015, at 05:47, Lan Jiang  > wrote:
> 
> Hi, there,
> 
> I am using Spark 1.4.1. The protobuf 2.5 is included by Spark 1.4.1 by 
> default. However, I would like to use Protobuf 3 in my spark application so 
> that I can use some new features such as Map support.  Is there anyway to do 
> that? 
> 
> Right now if I build a uber.jar with dependencies including protobuf 3 
> classes and pass to spark-shell through --jars option, during the execution, 
> I got the error java.lang.NoSuchFieldError: unknownFields. 
> 
> 
> protobuf is an absolute nightmare version-wise, as protoc generates 
> incompatible java classes even across point versions. Hadoop 2.2+ is and will 
> always be protobuf 2.5 only; that applies transitively to downstream projects 
>  (the great protobuf upgrade of 2013 was actually pushed by the HBase team, 
> and required a co-ordinated change across multiple projects)
> 
> 
> Is there anyway to use a different version of Protobuf other than the default 
> one included in the Spark distribution? I guess I can generalize and extend 
> the question to any third party libraries. How to deal with version conflict 
> for any third party libraries included in the Spark distribution? 
> 
> maven shading is the strategy. Generally it is less needed, though the 
> troublesome binaries are,  across the entire apache big data stack:
> 
> google protobuf
> google guava
> kryo
> jackson
> 
> you can generally bump up the other versions, at least by point releases.
> 



Getting parent RDD

2015-09-15 Thread Samya
Hi Team

I have the below situation.

val ssc = 
val msgStream = .   //SparkKafkaDirectAPI
val wordCountPair = TransformStream.transform(msgStream)
/wordCountPair.foreachRDD(rdd =>
  try{
//Some action that causes exception
  }catch {
case ex1 : Exception => {
   // *How to get hold of the msgStream, so that I can log the
actual message that caused the exception.*
  }
)/


Regards,
Sam



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Getting-parent-RDD-tp24701.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to speed up MLlib LDA?

2015-09-15 Thread Feynman Liang
Hi Marko,

I haven't looked into your case in much detail but one immediate thought
is: have you tried the OnlineLDAOptimizer? It's implementation and
resulting LDA model (LocalLDAModel) is quite different (doesn't depend on
GraphX, assumes the model fits on a single machine) so you may see
performance differences.

Feynman

On Tue, Sep 15, 2015 at 6:37 AM, Marko Asplund 
wrote:

>
> While doing some more testing I noticed that loading the persisted model
> from disk (~2 minutes) as well as querying LDA model topic distributions
> (~4 seconds for one document) are quite slow operations.
>
> Our application is querying LDA model topic distribution (for one doc at a
> time) as part of end-user operation execution flow, so a ~4 second
> execution time is very problematic. Am I using the MLlib LDA API correctly
> or is this just reflecting the current performance characteristics of the
> LDA implementation? My code can be found here:
>
>
> https://github.com/marko-asplund/tech-protos/blob/master/mllib-lda/src/main/scala/fi/markoa/proto/mllib/LDADemo.scala#L56-L57
>
> For what kinds of use cases are people currently using the LDA
> implementation?
>
>
> marko
>


Re: application failed on large dataset

2015-09-15 Thread 周千昊
has anyone met the same problems?
周千昊 于2015年9月14日周一 下午9:07写道:

> Hi, community
>   I am facing a strange problem:
>   all executors does not respond, and then all of them failed with the
> ExecutorLostFailure.
>   when I look into yarn logs, there are full of such exception
>
> 15/09/14 04:35:33 ERROR shuffle.RetryingBlockFetcher: Exception while
> beginning fetch of 1 outstanding blocks (after 3 retries)
> java.io.IOException: Failed to connect to host/ip:port
> at
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:193)
> at
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
> at
> org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:88)
> at
> org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
> at
> org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
> at
> org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
> at java.util.concurrent.FutureTask.run(FutureTask.java:262)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.net.ConnectException: Connection refused: host/ip:port
> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> at
> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
> at
> io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:208)
> at
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:287)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
> at
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
> ... 1 more
>
>
>   The strange thing is that, if I reduce the input size, the problems
> just disappeared. I have found a similar issue in the mail-archive(
> http://mail-archives.us.apache.org/mod_mbox/spark-user/201502.mbox/%3CCAOHP_tHRtuxDfWF0qmYDauPDhZ1=MAm5thdTfgAhXDN=7kq...@mail.gmail.com%3E),
> however I didn't see the solution. So I am wondering if anyone could help
> with that?
>
>   My env is:
>   hdp 2.2.6
>   spark(1.4.1)
>   mode: yarn-client
>   spark-conf:
>   spark.driver.extraJavaOptions -Dhdp.version=2.2.6.0-2800
>   spark.yarn.am.extraJavaOptions -Dhdp.version=2.2.6.0-2800
>   spark.executor.memory 6g
>   spark.storage.memoryFraction 0.3
>   spark.dynamicAllocation.enabled true
>   spark.shuffle.service.enabled true
>
>


RE: Change protobuf version or any other third party library version in Spark application

2015-09-15 Thread java8964
If you use Standalone mode, just start spark-shell like following:
spark-shell --jars your_uber_jar --conf spark.files.userClassPathFirst=true 
Yong
Date: Tue, 15 Sep 2015 09:33:40 -0500
Subject: Re: Change protobuf version or any other third party library version 
in Spark application
From: ljia...@gmail.com
To: java8...@hotmail.com
CC: ste...@hortonworks.com; user@spark.apache.org

Steve,
Thanks for the input. You are absolutely right. When I use protobuf 2.6.1, I 
also ran into method not defined errors. You suggest using Maven sharding 
strategy, but I have already built the uber jar to package all my custom 
classes and its dependencies including protobuf 3. The problem is how to 
configure spark shell to use my uber jar first. 
java8964 -- appreciate the link and I will try the configuration. Looks 
promising. However, the "user classpath first" attribute does not apply to 
spark-shell, am I correct? 

Lan
On Tue, Sep 15, 2015 at 8:24 AM, java8964  wrote:



It is a bad idea to use the major version change of protobuf, as it most likely 
won't work.
But you really want to give it a try, set the "user classpath first", so the 
protobuf 3 coming with your jar will be used.
The setting depends on your deployment mode, check this for the parameter:
https://issues.apache.org/jira/browse/SPARK-2996
Yong

Subject: Re: Change protobuf version or any other third party library version 
in Spark application
From: ste...@hortonworks.com
To: ljia...@gmail.com
CC: user@spark.apache.org
Date: Tue, 15 Sep 2015 09:19:28 +













On 15 Sep 2015, at 05:47, Lan Jiang  wrote:



Hi, there,



I am using Spark 1.4.1. The protobuf 2.5 is included by Spark 1.4.1 by default. 
However, I would like to use Protobuf 3 in my spark application so that I can 
use some new features such as Map support.  Is there anyway to do that? 



Right now if I build a uber.jar with dependencies including protobuf 3 classes 
and pass to spark-shell through --jars option, during the execution, I got the 
error java.lang.NoSuchFieldError: unknownFields. 









protobuf is an absolute nightmare version-wise, as protoc generates 
incompatible java classes even across point versions. Hadoop 2.2+ is and will 
always be protobuf 2.5 only; that applies transitively to downstream projects  
(the great protobuf upgrade
 of 2013 was actually pushed by the HBase team, and required a co-ordinated 
change across multiple projects)








Is there anyway to use a different version of Protobuf other than the default 
one included in the Spark distribution? I guess I can generalize and extend the 
question to any third party libraries. How to deal with version conflict for 
any third
 party libraries included in the Spark distribution? 







maven shading is the strategy. Generally it is less needed, though the 
troublesome binaries are,  across the entire apache big data stack:


google protobuf
google guava
kryo

jackson



you can generally bump up the other versions, at least by point releases.   
  

  

Re: hdfs-ha on mesos - odd bug

2015-09-15 Thread Iulian Dragoș
I've seen similar traces, but couldn't track down the failure completely.
You are using Kerberos for your HDFS cluster, right? AFAIK Kerberos isn't
supported in Mesos deployments.

Can you resolve that host name (nameservice1) from the driver machine (ping
nameservice1)? Can it be resolved from the other machines in the cluster?

Does it help if you read using `newAPIHadoopFile` instead of `textFile`?

On Mon, Sep 14, 2015 at 3:55 PM, Adrian Bridgett 
wrote:

> I'm hitting an odd issue with running spark on mesos together with
> HA-HDFS, with an even odder workaround.
>
> In particular I get an error that it can't find the HDFS nameservice
> unless I put in a _broken_ url (discovered that workaround by mistake!).
> core-site.xml, hdfs-site.xml is distributed to the slave node - and that
> file is read since I deliberately break the file then I get an error as
> you'd expect.
>
> NB: This is a bit different to
> http://mail-archives.us.apache.org/mod_mbox/spark-user/201402.mbox/%3c1392442185079-1549.p...@n3.nabble.com%3E
>
>
> Spark 1.5.0:
>
> t=sc.textFile("hdfs://nameservice1/tmp/issue")
> t.count()
> (fails)
>
> t=sc.textFile("file://etc/passwd")
> t.count()
> (errors about bad url - should have an extra / of course)
> t=sc.textFile("hdfs://nameservice1/tmp/issue")
> t.count()
> then it works!!!
>
> I should say that using file:///etc/passwd or hdfs:///tmp/issue both fail
> as well.  Unless preceded by a broken url.I've tried setting
> spark.hadoop.cloneConf to true, no change.
>
> Sample (broken) run:
> 15/09/14 13:00:14 DEBUG HadoopRDD: Creating new JobConf and caching it for
> later re-use
> 15/09/14 13:00:14 DEBUG : address: ip-10-1-200-165/10.1.200.165
> isLoopbackAddress: false, with host 10.1.200.165 ip-10-1-200-165
> 15/09/14 13:00:14 DEBUG BlockReaderLocal:
> dfs.client.use.legacy.blockreader.local = false
> 15/09/14 13:00:14 DEBUG BlockReaderLocal: dfs.client.read.shortcircuit =
> false
> 15/09/14 13:00:14 DEBUG BlockReaderLocal:
> dfs.client.domain.socket.data.traffic = false
> 15/09/14 13:00:14 DEBUG BlockReaderLocal: dfs.domain.socket.path =
> /var/run/hdfs-sockets/dn
> 15/09/14 13:00:14 DEBUG HAUtil: No HA service delegation token found for
> logical URI hdfs://nameservice1
> 15/09/14 13:00:14 DEBUG BlockReaderLocal:
> dfs.client.use.legacy.blockreader.local = false
> 15/09/14 13:00:14 DEBUG BlockReaderLocal: dfs.client.read.shortcircuit =
> false
> 15/09/14 13:00:14 DEBUG BlockReaderLocal:
> dfs.client.domain.socket.data.traffic = false
> 15/09/14 13:00:14 DEBUG BlockReaderLocal: dfs.domain.socket.path =
> /var/run/hdfs-sockets/dn
> 15/09/14 13:00:14 DEBUG RetryUtils: multipleLinearRandomRetry = null
> 15/09/14 13:00:14 DEBUG Server: rpcKind=RPC_PROTOCOL_BUFFER,
> rpcRequestWrapperClass=class
> org.apache.hadoop.ipc.ProtobufRpcEngine$RpcRequestWrapper,
> rpcInvoker=org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker@6245f50b
> 15/09/14 13:00:14 DEBUG Client: getting client out of cache:
> org.apache.hadoop.ipc.Client@267f0fd3
> 15/09/14 13:00:14 DEBUG NativeCodeLoader: Trying to load the custom-built
> native-hadoop library...
> 15/09/14 13:00:14 DEBUG NativeCodeLoader: Loaded the native-hadoop library
> ...
> 15/09/14 13:00:14 DEBUG Client: Connecting to
> mesos-1.example.com/10.1.200.165:8020
> 15/09/14 13:00:14 DEBUG Client: IPC Client (1739425103) connection to
> mesos-1.example.com/10.1.200.165:8020 from ubuntu: starting, having
> connections 1
> 15/09/14 13:00:14 DEBUG Client: IPC Client (1739425103) connection to
> mesos-1.example.com/10.1.200.165:8020 from ubuntu sending #0
> 15/09/14 13:00:14 DEBUG Client: IPC Client (1739425103) connection to
> mesos-1.example.com/10.1.200.165:8020 from ubuntu got value #0
> 15/09/14 13:00:14 DEBUG ProtobufRpcEngine: Call: getFileInfo took 36ms
> 15/09/14 13:00:14 DEBUG FileInputFormat: Time taken to get FileStatuses: 69
> 15/09/14 13:00:14 INFO FileInputFormat: Total input paths to process : 1
> 15/09/14 13:00:14 DEBUG Client: IPC Client (1739425103) connection to
> mesos-1.example.com/10.1.200.165:8020 from ubuntu sending #1
> 15/09/14 13:00:14 DEBUG Client: IPC Client (1739425103) connection to
> mesos-1.example.com/10.1.200.165:8020 from ubuntu got value #1
> 15/09/14 13:00:14 DEBUG ProtobufRpcEngine: Call: getBlockLocations took 1ms
> 15/09/14 13:00:14 DEBUG FileInputFormat: Total # of splits generated by
> getSplits: 2, TimeTaken: 104
> ...
> 15/09/14 13:00:24 DEBUG Client: IPC Client (1739425103) connection to
> mesos-1.example.com/10.1.200.165:8020 from ubuntu: closed
> 15/09/14 13:00:24 DEBUG Client: IPC Client (1739425103) connection to
> mesos-1.example.com/10.1.200.165:8020 from ubuntu: stopped, remaining
> connections 0
> 15/09/14 13:00:24 DEBUG
> AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] received
> message
> AkkaMessage(ExecutorRemoved(20150826-133446-3217621258-5050-4064-S1),true)
> from Actor[akka://sparkDriver/temp/$g]
> 15/09/14 13:00:24 DEBUG
> 

Re: Random Forest MLlib

2015-09-15 Thread Yasemin Kaya
Hi Maximo,

Is there a way getting precision and recall from pipeline? In MLlib version
I get precision and recall metrics from MulticlassMetrics but ML pipeLine
says only testErr.

Thanks
yasemin

2015-09-10 17:47 GMT+03:00 Yasemin Kaya :

> Hi Maximo,
> Thanks alot..
> Hi Yasemin,
>We had the same question and found this:
>
> https://issues.apache.org/jira/browse/SPARK-6884
>
> Thanks,
>Maximo
>
> On Sep 10, 2015, at 9:09 AM, Yasemin Kaya  wrote:
>
> Hi ,
>
> I am using Random Forest Alg. for recommendation system. I get users and
> users' response yes or no (1/0). But I want to learn the probability of the
> trees. Program says x user yes but with how much probability, I want to get
> these probabilities.
>
> Best,
> yasemin
> --
> hiç ender hiç
>
>
>


-- 
hiç ender hiç


How does driver memory utilized

2015-09-15 Thread Renu
Hi

I have query regarding driver memory

what are the tasks in which driver memory is used?

Please Help



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-does-driver-memory-utilized-tp24699.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Directly reading data from S3 to EC2 with PySpark

2015-09-15 Thread Cazen
Good day junHyeok

Did you set HADOOP_CONF_DIR? It seems that spark cannot find AWS key
properties

If it doesn't work after set, How about export AWS_ACCESS_KEY_ID,
AWS_SECRET_ACCESS_KEY before running py-spark shell?

BR



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Directly-reading-data-from-S3-to-EC2-with-PySpark-tp24638p24698.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark aggregateByKey Issues

2015-09-15 Thread Alexis Gillain
That's the tricky part.

If the volume of data is always the same you can test and learn one.
If the volume of data can vary you can use the number of records in your
file divide by the number of records you think can fit in memory.
Anyway the distribution of your records can still impact the result.

About aggregate, indeed the comOp won't be executed if you sort the records.

2015-09-15 16:34 GMT+08:00 :

> Hi Alexis:
>
> Of course, it’s very useful to me, specially about the operations after
> sort operation is done.
>
> And, i still have one question:
> How to set the decent number of partition,  if it need not to be equal to
> the number of keys ?
>
> 在 2015年9月15日,下午3:41,Alexis Gillain  写道:
>
> Sorry I made a typo error in my previous message, you can't
> sortByKey(youkey,date) and have all records of your keys in the same
> partition.
>
> So you can sortByKey(yourkey)  with a decent number of partition (doesnt
> have to be the number of keys). After that you have records of a key
> grouped in a partition but not sort by date.
>
> Then you use mapPartitions to copy the partition in a List and you can
> sort by (youkey, date) and use this list to compute whatever you want. The
> main issue is that a partition must fit in memory.
>
> Hope this help.
>
> 2015-09-15 13:50 GMT+08:00 :
>
>> Hi Alexis:
>>
>> Thank you for your replying.
>>
>> My case is that each operation to one record need to depend on one value
>> that will be set by the operating to the last record.
>>
>> So your advise is that i can use “sortByKey”. “sortByKey” will put all
>> records with the same Key in one partition. Need I take the “numPartitions”
>> parameter ? Or even if i don’t , it still do that .
>>
>> If it works, add “aggregate” to deal with my case, i think the comOp
>> function in parameter list of aggregate API won’t be executed.. Is my
>> understanding wrong ?
>>
>>
>> 在 2015年9月15日,下午12:47,Alexis Gillain  写道:
>>
>> I'm not sure about what you want to do.
>>
>> You should try to sort the RDD by (yourKey, date), it ensures that all
>> the keys are in the same partition.
>>
>> You problem after that is you want to aggregate only on yourKey and if
>> you change the Key of the sorted RDD you loose partitionning.
>>
>> Depending of the size of the result you can use an aggregate bulding a
>> map of results by youKey or use MapPartition to output a rdd (in this case
>> set the number of partition high enough to allow the partition to fit in
>> memory).
>>
>> see
>> http://apache-spark-user-list.1001560.n3.nabble.com/Folding-an-RDD-in-order-td16577.html
>>
>> 2015-09-15 11:25 GMT+08:00 毕岩 :
>>
>>> Hi:
>>>
>>>
>>> There is such one case about using reduce operation like that:
>>>
>>>
>>> I Need to reduce large data made up of billions of records with a
>>> Key-Value pair.
>>>
>>> For the following:
>>>
>>> *First,group by Key, and the records with the same Key need to be
>>> in order of one field called “date” in Value*
>>>
>>> *Second, in records with the same Key, every operation to one
>>> recored need to depend on the result of dealing with the last one, and the
>>> first one is original state..*
>>>
>>>
>>> Some attempts:
>>>
>>> 1. groupByKey + map :  because of the bad performance, CPU is to 100%,
>>> so executors heartbeat time out and throw errors “Lost executor”, or the
>>> application is hung…
>>>
>>>
>>> 2. AggregateByKey:
>>>
>>> * def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,*
>>>
>>> *combOp: (U, U) => U): RDD[(K, U)]*
>>>
>>> About aggregateByKey, is all the records with the same Key In the same
>>> partition ? Is the zeroValue applied to the first one in all records
>>> with the same Key, or in each partition ? If it is the former, comOp
>>> Function do nothing!
>>>
>>> I tried to take the second “numPartitions” parameter, pass the number of
>>> key to it. But, the number of key is so large to all the tasks be killed.
>>>
>>>
>>> What should I do with this case ?
>>>
>>> I'm asking for advises online...
>>>
>>> Thank you.
>>>
>>
>>
>>
>> --
>> Alexis GILLAIN
>>
>>
>>
>
>
> --
> Alexis GILLAIN
>
>
>


-- 
Alexis GILLAIN


Re: mappartition's FlatMapFunction help

2015-09-15 Thread Ankur Srivastava
Hi,

The signatures are perfect. I also tried same code on eclipse and for some
reason eclipse did not import java.util.Iterator. Once I imported it, it is
fine. Might be same issue with NetBeans.

Thanks
Ankur

On Tue, Sep 15, 2015 at 10:11 AM, dinizthiagobr 
wrote:

> Can't get this one to work and I have no idea why.
>
> JavaPairRDD> lel = gen.groupByKey();
>
> JavaRDD partitions = lel.mapPartitions(
> new FlatMapFunction>>,
> String> () {
>   public Iterable call(Iterator IterableString>>> it) {
>//return whatever
>}
> });
>
> Netbeans complains about mapPartitions not being applicable for the used
> arguments.
>
> Any idea what's wrong?
>
> Thank you.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/mappartition-s-FlatMapFunction-help-tp24702.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


mappartition's FlatMapFunction help

2015-09-15 Thread dinizthiagobr
Can't get this one to work and I have no idea why.

JavaPairRDD> lel = gen.groupByKey();

JavaRDD partitions = lel.mapPartitions(
new FlatMapFunction>>,
String> () {
  public Iterable call(Iterator>> it) {
   //return whatever
   }
});

Netbeans complains about mapPartitions not being applicable for the used
arguments.

Any idea what's wrong?

Thank you.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/mappartition-s-FlatMapFunction-help-tp24702.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: application failed on large dataset

2015-09-15 Thread java8964
When you saw this error, does any executor die due to whatever error?
Do you check to see if any executor restarts during your job?
It is hard to help you just with the stack trace. You need to tell us the whole 
picture when your jobs are running.
Yong

From: qhz...@apache.org
Date: Tue, 15 Sep 2015 15:02:28 +
Subject: Re: application failed on large dataset
To: user@spark.apache.org

has anyone met the same problems?
周千昊 于2015年9月14日周一 下午9:07写道:
Hi, community  I am facing a strange problem:  all executors does not 
respond, and then all of them failed with the ExecutorLostFailure.  when I 
look into yarn logs, there are full of such exception
15/09/14 04:35:33 ERROR shuffle.RetryingBlockFetcher: Exception while beginning 
fetch of 1 outstanding blocks (after 3 retries)java.io.IOException: Failed to 
connect to host/ip:portat 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:193)
at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
at 
org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:88)
at 
org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
at 
org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
at 
org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
   at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
   at java.lang.Thread.run(Thread.java:745)Caused by: 
java.net.ConnectException: Connection refused: host/ip:portat 
sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)at 
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
at 
io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:208)
at 
io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:287)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528) 
   at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)at 
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
... 1 more

  The strange thing is that, if I reduce the input size, the problems just 
disappeared. I have found a similar issue in the 
mail-archive(http://mail-archives.us.apache.org/mod_mbox/spark-user/201502.mbox/%3CCAOHP_tHRtuxDfWF0qmYDauPDhZ1=MAm5thdTfgAhXDN=7kq...@mail.gmail.com%3E),
 however I didn't see the solution. So I am wondering if anyone could help with 
that?
  My env is:  hdp 2.2.6  spark(1.4.1)  mode: yarn-client  
spark-conf:  spark.driver.extraJavaOptions -Dhdp.version=2.2.6.0-2800  
spark.yarn.am.extraJavaOptions -Dhdp.version=2.2.6.0-2800  
spark.executor.memory 6g  spark.storage.memoryFraction 0.3  
spark.dynamicAllocation.enabled true
  spark.shuffle.service.enabled true
  

Re: Dynamic Workflow Execution using Spark

2015-09-15 Thread Ted Yu
See this thread:
http://search-hadoop.com/m/q3RTtUz0cyiPjYX

On Tue, Sep 15, 2015 at 1:19 PM, Ashish Soni  wrote:

> Hi All ,
>
> Are there any framework which can be used to execute workflows with in
> spark or Is it possible to use ML Pipeline for workflow execution but not
> doing ML .
>
> Thanks,
> Ashish
>


Re: Replacing Esper with Spark Streaming?

2015-09-15 Thread Bertrand Dechoux
The big question would be what feature of Esper your are using. Esper is a
CEP solution. I doubt that Spark Streaming can do everything Esper does
without any development. Spark (Streaming) is more a general-purpose
platform.

http://www.espertech.com/products/esper.php

But I would be glad to be proven wrong (which also would implies EsperTech
is dead, which I also doubt...)

Bertrand

On Mon, Sep 14, 2015 at 2:31 PM, Todd Nist  wrote:

> Stratio offers a CEP implementation based on Spark Streaming and the
> Siddhi CEP engine.  I have not used the below, but they may be of some
> value to you:
>
> http://stratio.github.io/streaming-cep-engine/
>
> https://github.com/Stratio/streaming-cep-engine
>
> HTH.
>
> -Todd
>
> On Sun, Sep 13, 2015 at 7:49 PM, Otis Gospodnetić <
> otis.gospodne...@gmail.com> wrote:
>
>> Hi,
>>
>> I'm wondering if anyone has attempted to replace Esper with Spark
>> Streaming or if anyone thinks Spark Streaming is/isn't a good tool for the
>> (CEP) job?
>>
>> We are considering Akka or Spark Streaming as possible Esper replacements
>> and would appreciate any input from people who tried to do that with either
>> of them.
>>
>> Thanks,
>> Otis
>> --
>> Monitoring * Alerting * Anomaly Detection * Centralized Log Management
>> Solr & Elasticsearch Support * http://sematext.com/
>>
>>
>


Dynamic Workflow Execution using Spark

2015-09-15 Thread Ashish Soni
Hi All ,

Are there any framework which can be used to execute workflows with in
spark or Is it possible to use ML Pipeline for workflow execution but not
doing ML .

Thanks,
Ashish


Re: Spark Streaming Suggestion

2015-09-15 Thread ayan guha
I think you need to make up your mind about storm vs spark. Using both in
this context does not make much sense to me.
On 15 Sep 2015 22:54, "David Morales"  wrote:

> Hi there,
>
> This is exactly our goal in Stratio Sparkta, a real-time aggregation
> engine fully developed with spark streaming (and fully open source).
>
> Take a look at:
>
>
>- the docs: http://docs.stratio.com/modules/sparkta/development/
>- the repository: https://github.com/Stratio/sparkta
>- and some slides explaining how sparkta was born and what it makes:
>http://www.slideshare.net/Stratio/strata-sparkta
>
>
> Feel free to ask us anything about the project.
>
>
>
>
>
>
>
>
> 2015-09-15 8:10 GMT+02:00 srungarapu vamsi :
>
>> The batch approach i had implemented takes about 10 minutes to complete
>> all the pre-computation tasks for the one hour worth of data. When i went
>> through my code, i figured out that most of the time consuming tasks are
>> the ones, which read data from cassandra and the places where i perform
>> sparkContex.union(Array[RDD]).
>> Now the ask is to get the pre computation tasks near real time. So i am
>> exploring the streaming approach.
>>
>> My pre computation tasks not only include just finding the unique numbers
>> for a given device every minute, every hour, every day but it also includes
>> the following tasks:
>> 1. Find the number of unique numbers across a set of devices every
>> minute, every hour, every day
>> 2. Find the number of unique numbers which are commonly occurring across
>> a set of devices every minute, every hour, every day
>> 3. Find (total time a number occurred across a set of devices)/(total
>> unique numbers occurred across the set of devices)
>> The above mentioned pre computation tasks are just a few of what i will
>> be needing and there are many more coming towards me :)
>> I see all these problems need more of data parallel approach and hence i
>> am interested to do this on the spark streaming end.
>>
>>
>> On Tue, Sep 15, 2015 at 11:04 AM, Jörn Franke 
>> wrote:
>>
>>> Why did you not stay with the batch approach? For me the architecture
>>> looks very complex for a simple thing you want to achieve. Why don't you
>>> process the data already in storm ?
>>>
>>> Le mar. 15 sept. 2015 à 6:20, srungarapu vamsi 
>>> a écrit :
>>>
 I am pretty new to spark. Please suggest a better model for the
 following use case.

 I have few (about 1500) devices in field which keep emitting about
 100KB of data every minute. The nature of data sent by the devices is just
 a list of numbers.
 As of now, we have Storm is in the architecture which receives this
 data, sanitizes it and writes to cassandra.
 Now, i have a requirement to process this data. The processing includes
 finding unique numbers emitted by one or more devices for every minute,
 every hour, every day, every month.
 I had implemented this processing part as a batch job execution and now
 i am interested in making it a streaming application. i.e calculating the
 processed data as and when devices emit the data.

 I have the following two approaches:
 1. Storm writes the actual data to cassandra and writes a message on
 Kafka bus that data corresponding to device D and minute M has been written
 to cassandra

 Then Spark streaming reads this message from kafka , then reads the
 data of Device D at minute M from cassandra and starts processing the data.

 2. Storm writes the data to both cassandra and  kafka, spark reads the
 actual data from kafka , processes the data and writes to cassandra.
 The second approach avoids additional hit of reading from cassandra
 every minute , a device has written data to cassandra at the cost of
 putting the actual heavy messages instead of light events on  kafka.

 I am a bit confused among the two approaches. Please suggest which one
 is better and if both are bad, how can i handle this use case?


 --
 /Vamsi

>>>
>>
>>
>> --
>> /Vamsi
>>
>
>
>
> --
>
> David Morales de Frías  ::  +34 607 010 411 :: @dmoralesdf
> 
>
>
> 
> Vía de las dos Castillas, 33, Ática 4, 3ª Planta
> 28224 Pozuelo de Alarcón, Madrid
> Tel: +34 91 828 6473 // www.stratio.com // *@stratiobd
> *
>


Using ML KMeans without hardcoded feature vector creation

2015-09-15 Thread Tóth Zoltán
Hi,

I'm wondering if there is a concise way to run ML KMeans on a DataFrame if
I have the features in multiple numeric columns.

I.e. as in the Iris dataset:
(a1=5.1, a2=3.5, a3=1.4, a4=0.2, id=u'id_1', label=u'Iris-setosa',
binomial_label=1)

I'd like to use KMeans without recreating the DataSet with the feature
vector added manually as a new column and the original columns hardcoded
repeatedly in the code.

The solution I'd like to improve:

from pyspark.mllib.linalg import Vectors
from pyspark.sql.types import Row
from pyspark.ml.clustering import KMeans, KMeansModel

iris = sqlContext.read.parquet("/opt/data/iris.parquet")
iris.first()
# Row(a1=5.1, a2=3.5, a3=1.4, a4=0.2, id=u'id_1', label=u'Iris-setosa',
binomial_label=1)

df = iris.map(lambda r: Row(
id = r.id,
a1 = r.a1,
a2 = r.a2,
a3 = r.a3,
a4 = r.a4,
label = r.label,
binomial_label=r.binomial_label,
features = Vectors.dense(r.a1, r.a2, r.a3, r.a4))
).toDF()


kmeans_estimator = KMeans()\
.setFeaturesCol("features")\
.setPredictionCol("prediction")\
kmeans_transformer = kmeans_estimator.fit(df)

predicted_df = kmeans_transformer.transform(df).drop("features")
predicted_df.first()

# Row(a1=5.1, a2=3.5, a3=1.4, a4=0.2, binomial_label=1, id=u'id_1',
label=u'Iris-setosa', prediction=1)



I'm looking for a solution, which is something like:

feature_cols = ["a1", "a2", "a3", "a4"]

prediction_col_name = "prediction"






Thanks,

Zoltan


Re: unoin streams not working for streams > 3

2015-09-15 Thread Cody Koeninger
I assume you're using the receiver based stream (createStream) rather than
createDirectStream?

Receivers each get scheduled as if they occupy a core, so you need at least
one more core than number of receivers if you want to get any work done.

Try using the direct stream if you can't combine receivers.

On Mon, Sep 14, 2015 at 11:10 PM, Василец Дмитрий 
wrote:

> I use local[*]. And i have 4 cores on laptop.
> On 14 Sep 2015 23:19, "Gerard Maas"  wrote:
>
>> How many cores are you assigning to your spark streaming job?
>>
>> On Mon, Sep 14, 2015 at 10:33 PM, Василец Дмитрий <
>> pronix.serv...@gmail.com> wrote:
>>
>>> hello
>>> I have 4 streams from kafka and streaming not working.
>>> without any errors or logs
>>> but with 3 streams everything perfect.
>>> make sense only amount of streams , different triple combinations always
>>> working.
>>> any ideas how to debug or fix it ?
>>>
>>>
>>>
>>


Re: unoin streams not working for streams > 3

2015-09-15 Thread Василец Дмитрий
thanks.I will try.

On Tue, Sep 15, 2015 at 4:19 PM, Cody Koeninger  wrote:

> I assume you're using the receiver based stream (createStream) rather than
> createDirectStream?
>
> Receivers each get scheduled as if they occupy a core, so you need at
> least one more core than number of receivers if you want to get any work
> done.
>
> Try using the direct stream if you can't combine receivers.
>
> On Mon, Sep 14, 2015 at 11:10 PM, Василец Дмитрий <
> pronix.serv...@gmail.com> wrote:
>
>> I use local[*]. And i have 4 cores on laptop.
>> On 14 Sep 2015 23:19, "Gerard Maas"  wrote:
>>
>>> How many cores are you assigning to your spark streaming job?
>>>
>>> On Mon, Sep 14, 2015 at 10:33 PM, Василец Дмитрий <
>>> pronix.serv...@gmail.com> wrote:
>>>
 hello
 I have 4 streams from kafka and streaming not working.
 without any errors or logs
 but with 3 streams everything perfect.
 make sense only amount of streams , different triple combinations
 always working.
 any ideas how to debug or fix it ?



>>>
>


Re: Change protobuf version or any other third party library version in Spark application

2015-09-15 Thread Lan Jiang
Steve,

Thanks for the input. You are absolutely right. When I use protobuf 2.6.1,
I also ran into method not defined errors. You suggest using Maven sharding
strategy, but I have already built the uber jar to package all my custom
classes and its dependencies including protobuf 3. The problem is how to
configure spark shell to use my uber jar first.

java8964 -- appreciate the link and I will try the configuration. Looks
promising. However, the "user classpath first" attribute does not apply to
spark-shell, am I correct?

Lan

On Tue, Sep 15, 2015 at 8:24 AM, java8964  wrote:

> It is a bad idea to use the major version change of protobuf, as it most
> likely won't work.
>
> But you really want to give it a try, set the "user classpath first", so
> the protobuf 3 coming with your jar will be used.
>
> The setting depends on your deployment mode, check this for the parameter:
>
> https://issues.apache.org/jira/browse/SPARK-2996
>
> Yong
>
> --
> Subject: Re: Change protobuf version or any other third party library
> version in Spark application
> From: ste...@hortonworks.com
> To: ljia...@gmail.com
> CC: user@spark.apache.org
> Date: Tue, 15 Sep 2015 09:19:28 +
>
>
>
>
> On 15 Sep 2015, at 05:47, Lan Jiang  wrote:
>
> Hi, there,
>
> I am using Spark 1.4.1. The protobuf 2.5 is included by Spark 1.4.1 by
> default. However, I would like to use Protobuf 3 in my spark application so
> that I can use some new features such as Map support.  Is there anyway to
> do that?
>
> Right now if I build a uber.jar with dependencies including protobuf 3
> classes and pass to spark-shell through --jars option, during the
> execution, I got the error *java.lang.NoSuchFieldError: unknownFields. *
>
>
>
> protobuf is an absolute nightmare version-wise, as protoc generates
> incompatible java classes even across point versions. Hadoop 2.2+ is and
> will always be protobuf 2.5 only; that applies transitively to downstream
> projects  (the great protobuf upgrade of 2013 was actually pushed by the
> HBase team, and required a co-ordinated change across multiple projects)
>
>
> Is there anyway to use a different version of Protobuf other than the
> default one included in the Spark distribution? I guess I can generalize
> and extend the question to any third party libraries. How to deal with
> version conflict for any third party libraries included in the Spark
> distribution?
>
>
> maven shading is the strategy. Generally it is less needed, though the
> troublesome binaries are,  across the entire apache big data stack:
>
> google protobuf
> google guava
> kryo
> jackson
>
> you can generally bump up the other versions, at least by point releases.
>


spark performance - executor computing time

2015-09-15 Thread patcharee

Hi,

I was running a job (on Spark 1.5 + Yarn + java 8). In a stage that 
lookup 
(org.apache.spark.rdd.PairRDDFunctions.lookup(PairRDDFunctions.scala:873)) 
there was an executor that took the executor computing time > 6 times of 
median. This executor had almost the same shuffle read size and low gc 
time as others.


What can impact the executor computing time? Any suggestions what 
parameters I should monitor/configure?


BR,
Patcharee



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How does driver memory utilized

2015-09-15 Thread Renu Yadav
Hi

I have query regarding driver memory

what are the tasks in which driver memory is used?

Please Help


Re: Worker Machine running out of disk for Long running Streaming process

2015-09-15 Thread gaurav sharma
Hi TD,

Sorry for late reply,


I implemented ur suggestion, but unfortunately it didnt help me, i am still
able to see very old schuffle files, because of which ultimately my long
runnning spark job gets terminated


Below is what i did.


//This is the spark-submit job
public class HourlyAggregatorV2 {

private static Logger logger =
Logger.getLogger(HourlyAggregatorV2.class);

public static void main(String[] args) throws Exception{

//Fix for preventing disk full issue in long running jobs, because
of shuffle files not getting cleaned up from disk
new Thread(new GCThread()).start();

}
}



public class GCThread implements Runnable{

@Override
public void run() {
boolean isGCedOnce = false;
while(true){
if(Calendar.getInstance().get(Calendar.MINUTE)%10 == 0){
if(!isGCedOnce){
System.out.println("Triggered System GC");
System.gc();
isGCedOnce = true;
}
}else {
isGCedOnce = false;
}
}
}

}


On Sat, Aug 22, 2015 at 9:16 PM, Ashish Rangole  wrote:

> Interesting. TD, can you please throw some light on why this is and point
> to  the relevant code in Spark repo. It will help in a better understanding
> of things that can affect a long running streaming job.
> On Aug 21, 2015 1:44 PM, "Tathagata Das"  wrote:
>
>> Could you periodically (say every 10 mins) run System.gc() on the driver.
>> The cleaning up shuffles is tied to the garbage collection.
>>
>>
>> On Fri, Aug 21, 2015 at 2:59 AM, gaurav sharma 
>> wrote:
>>
>>> Hi All,
>>>
>>>
>>> I have a 24x7 running Streaming Process, which runs on 2 hour windowed
>>> data
>>>
>>> The issue i am facing is my worker machines are running OUT OF DISK space
>>>
>>> I checked that the SHUFFLE FILES are not getting cleaned up.
>>>
>>>
>>> /log/spark-2b875d98-1101-4e61-86b4-67c9e71954cc/executor-5bbb53c1-cee9-4438-87a2-b0f2becfac6f/blockmgr-c905b93b-c817-4124-a774-be1e706768c1//00/shuffle_2739_5_0.data
>>>
>>> Ultimately the machines runs out of Disk Spac
>>>
>>>
>>> i read about *spark.cleaner.ttl *config param which what i can
>>> understand from the documentation, says cleans up all the metadata beyond
>>> the time limit.
>>>
>>> I went through https://issues.apache.org/jira/browse/SPARK-5836
>>> it says resolved, but there is no code commit
>>>
>>> Can anyone please throw some light on the issue.
>>>
>>>
>>>
>>


Re: Directly reading data from S3 to EC2 with PySpark

2015-09-15 Thread ayan guha
Also you can set hadoop conf through jsc.hadoopConf property. Do a dir (sc)
to see exact property name
On 15 Sep 2015 22:43, "Gourav Sengupta"  wrote:

> Hi,
>
> If you start your EC2 nodes with correct roles (default in most cases
> depending on your needs) you should be able to work on S3 and all other AWS
> resources without giving any keys.
>
> I have been doing that for some time now and I have not faced any issues
> yet.
>
>
> Regards,
> Gourav
>
>
>
> On Tue, Sep 15, 2015 at 12:54 PM, Cazen  wrote:
>
>> Good day junHyeok
>>
>> Did you set HADOOP_CONF_DIR? It seems that spark cannot find AWS key
>> properties
>>
>> If it doesn't work after set, How about export AWS_ACCESS_KEY_ID,
>> AWS_SECRET_ACCESS_KEY before running py-spark shell?
>>
>> BR
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Directly-reading-data-from-S3-to-EC2-with-PySpark-tp24638p24698.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Prevent spark from serializing some objects

2015-09-15 Thread lev
Hello,

As I understand it, using the method /bar/ will result in serializing the
/Foo/ instance to the cluster:

/class Foo() {
val x = 5

def bar(rdd: RDD[Int]): RDD[Int] = {
rdd.map(_*x)
}
}/

and since the /Foo/ instance might be very big, it might cause performance
hit.

I know how to solve this case (create a local copy of /x/ inside /bar/, and
use it).

I would like to know,
is there a way to prevent /Foo/ from ever being serialized and sent to the
cluster?

I can't force /Foo/ to be not serializable, since it need to be serialized
at some other stage (not sent to spark, just saved to disk)

One idea that I tried was to create a trait like:

/trait SparkNonSrializable

class Foo extends SparkNonSrializable {...}/

and use a custom serializer in spark (by setting the "spark.serializer"
conf),
that will fail for objects that extends /SparkNonSrializable/
but I wasn't able to make it work
(the serializer is getting used, but the condition
/t.isInstanceOf[SparkNonSerializable]/ is never true)

here is my code:

trait SparkNonSerializable

class MySerializer(conf: SparkConf) extends JavaSerializer(conf) {
  override def newInstance(): SerializerInstance = {
val inst = super.newInstance()

new SerializerInstance {
  override def serializeStream(s: OutputStream): SerializationStream =
inst.serializeStream(s)
  override def serialize[T](t: T)(implicit evidence$1: ClassTag[T]):
ByteBuffer =
  {
if (t.isInstanceOf[SparkNonSerializable])
  ???
inst.serialize(t)
  }

  override def deserializeStream(s: InputStream): DeserializationStream
= inst.deserializeStream(s)

  override def deserialize[T](bytes: ByteBuffer)(implicit evidence$2:
ClassTag[T]): T = {
val t = inst.deserialize(bytes)
if (t.isInstanceOf[SparkNonSerializable])
  ???
t
  }

  override def deserialize[T](bytes: ByteBuffer, loader:
ClassLoader)(implicit evidence$3: ClassTag[T]): T = {
val t = inst.deserialize(bytes, loader)
if (t.isInstanceOf[SparkNonSerializable])
  ???
t
  }
}
  }
}



My questions are:
1. Do you see why my custom serializer can't catch objects with that trait

2. Any other ideas of how to prevent /Foo/ from being serialized?
My solution might be OK for tests,
but I'm a bit reluctant to use my own serializer on production code

Thanks,
Lev.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Prevent-spark-from-serializing-some-objects-tp24700.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Directly reading data from S3 to EC2 with PySpark

2015-09-15 Thread Gourav Sengupta
Hi,

If you start your EC2 nodes with correct roles (default in most cases
depending on your needs) you should be able to work on S3 and all other AWS
resources without giving any keys.

I have been doing that for some time now and I have not faced any issues
yet.


Regards,
Gourav



On Tue, Sep 15, 2015 at 12:54 PM, Cazen  wrote:

> Good day junHyeok
>
> Did you set HADOOP_CONF_DIR? It seems that spark cannot find AWS key
> properties
>
> If it doesn't work after set, How about export AWS_ACCESS_KEY_ID,
> AWS_SECRET_ACCESS_KEY before running py-spark shell?
>
> BR
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Directly-reading-data-from-S3-to-EC2-with-PySpark-tp24638p24698.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


RE: Change protobuf version or any other third party library version in Spark application

2015-09-15 Thread java8964
It is a bad idea to use the major version change of protobuf, as it most likely 
won't work.
But you really want to give it a try, set the "user classpath first", so the 
protobuf 3 coming with your jar will be used.
The setting depends on your deployment mode, check this for the parameter:
https://issues.apache.org/jira/browse/SPARK-2996
Yong

Subject: Re: Change protobuf version or any other third party library version 
in Spark application
From: ste...@hortonworks.com
To: ljia...@gmail.com
CC: user@spark.apache.org
Date: Tue, 15 Sep 2015 09:19:28 +













On 15 Sep 2015, at 05:47, Lan Jiang  wrote:


Hi, there,



I am using Spark 1.4.1. The protobuf 2.5 is included by Spark 1.4.1 by default. 
However, I would like to use Protobuf 3 in my spark application so that I can 
use some new features such as Map support.  Is there anyway to do that? 



Right now if I build a uber.jar with dependencies including protobuf 3 classes 
and pass to spark-shell through --jars option, during the execution, I got the 
error java.lang.NoSuchFieldError: unknownFields. 









protobuf is an absolute nightmare version-wise, as protoc generates 
incompatible java classes even across point versions. Hadoop 2.2+ is and will 
always be protobuf 2.5 only; that applies transitively to downstream projects  
(the great protobuf upgrade
 of 2013 was actually pushed by the HBase team, and required a co-ordinated 
change across multiple projects)








Is there anyway to use a different version of Protobuf other than the default 
one included in the Spark distribution? I guess I can generalize and extend the 
question to any third party libraries. How to deal with version conflict for 
any third
 party libraries included in the Spark distribution? 







maven shading is the strategy. Generally it is less needed, though the 
troublesome binaries are,  across the entire apache big data stack:


google protobuf
google guava
kryo

jackson



you can generally bump up the other versions, at least by point releases.   
  

Re: hdfs-ha on mesos - odd bug

2015-09-15 Thread Adrian Bridgett
Thanks Steve - we are already taking the safe route - putting NN and 
datanodes on the central mesos-masters which are on demand.  Later (much 
later!) we _may_ put some datanodes on spot instances (and using several 
spot instance types as the spikes seem to only affect one type - worst 
case we can rebuild the data as well).  OTOH this would mainly only be 
beneficial if spark/mesos understood the data locality which is probably 
some time off (we don't need this ability now).


Indeed, the error we are seeing is orthogonal to the setup - however my 
understanding of ha-hdfs is that it should be resolved via the 
hdfs-site.xml file and doesn't use DNS whatsoever (and indeed, it _does_ 
work - but only after we initialise the driver with a bad hdfs url.)  I 
think there's some (missing) HDFS initialisation therefore when running 
spark on mesos - my suspicion is on the spark side (or my spark config).


http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/HDFSHighAvailabilityWithNFS.html#Configuration_details

On 15/09/2015 10:24, Steve Loughran wrote:

On 15 Sep 2015, at 08:55, Adrian Bridgett  wrote:

Hi Sam, in short, no, it's a traditional install as we plan to use spot 
instances and didn't want price spikes to kill off HDFS.

We're actually doing a bit of a hybrid, using spot instances for the mesos 
slaves, ondemand for the mesos masters.  So for the time being, putting hdfs on 
the masters (we'll probably move to multiple slave instance types to avoid 
losing too many when spot price spikes, but for now this is acceptable).   
Masters running CDH5.

It's incredibly dangerous using hdfs NNs on spot vms; a significant enough 
spike will lose all of them in one go, and there goes your entire filesystem. 
Have a static VM, maybe even backed by EBS.

If you look at Hadoop architectures from Hortonworks, Cloudera and Amazon 
themselves, the usual stance is HDFS on static nodes, spot instances for 
compute only


Using hdfs://current-hdfs-master:8020 works fine, however using 
hdfs://nameservice1 fails in the rather odd way described (well, more that the 
workaround actually works!)  I think there's some underlying bug here that's 
being exposed.


this sounds an issue orthogonal to spot instances. Maybe related to how JVMs 
cache DNS entries forever?


--
*Adrian Bridgett* |  Sysadmin Engineer, OpenSignal 


_
Office: First Floor, Scriptor Court, 155-157 Farringdon Road, 
Clerkenwell, London, EC1R 3AD

Phone #: +44 777-377-8251
Skype: abridgett  |@adrianbridgett | 
LinkedIn link 

_


Re: How to speed up MLlib LDA?

2015-09-15 Thread Marko Asplund
While doing some more testing I noticed that loading the persisted model
from disk (~2 minutes) as well as querying LDA model topic distributions
(~4 seconds for one document) are quite slow operations.

Our application is querying LDA model topic distribution (for one doc at a
time) as part of end-user operation execution flow, so a ~4 second
execution time is very problematic. Am I using the MLlib LDA API correctly
or is this just reflecting the current performance characteristics of the
LDA implementation? My code can be found here:

https://github.com/marko-asplund/tech-protos/blob/master/mllib-lda/src/main/scala/fi/markoa/proto/mllib/LDADemo.scala#L56-L57

For what kinds of use cases are people currently using the LDA
implementation?


marko


Re: Spark Streaming Suggestion

2015-09-15 Thread David Morales
Hi there,

This is exactly our goal in Stratio Sparkta, a real-time aggregation engine
fully developed with spark streaming (and fully open source).

Take a look at:


   - the docs: http://docs.stratio.com/modules/sparkta/development/
   - the repository: https://github.com/Stratio/sparkta
   - and some slides explaining how sparkta was born and what it makes:
   http://www.slideshare.net/Stratio/strata-sparkta


Feel free to ask us anything about the project.








2015-09-15 8:10 GMT+02:00 srungarapu vamsi :

> The batch approach i had implemented takes about 10 minutes to complete
> all the pre-computation tasks for the one hour worth of data. When i went
> through my code, i figured out that most of the time consuming tasks are
> the ones, which read data from cassandra and the places where i perform
> sparkContex.union(Array[RDD]).
> Now the ask is to get the pre computation tasks near real time. So i am
> exploring the streaming approach.
>
> My pre computation tasks not only include just finding the unique numbers
> for a given device every minute, every hour, every day but it also includes
> the following tasks:
> 1. Find the number of unique numbers across a set of devices every minute,
> every hour, every day
> 2. Find the number of unique numbers which are commonly occurring across a
> set of devices every minute, every hour, every day
> 3. Find (total time a number occurred across a set of devices)/(total
> unique numbers occurred across the set of devices)
> The above mentioned pre computation tasks are just a few of what i will be
> needing and there are many more coming towards me :)
> I see all these problems need more of data parallel approach and hence i
> am interested to do this on the spark streaming end.
>
>
> On Tue, Sep 15, 2015 at 11:04 AM, Jörn Franke 
> wrote:
>
>> Why did you not stay with the batch approach? For me the architecture
>> looks very complex for a simple thing you want to achieve. Why don't you
>> process the data already in storm ?
>>
>> Le mar. 15 sept. 2015 à 6:20, srungarapu vamsi 
>> a écrit :
>>
>>> I am pretty new to spark. Please suggest a better model for the
>>> following use case.
>>>
>>> I have few (about 1500) devices in field which keep emitting about 100KB
>>> of data every minute. The nature of data sent by the devices is just a list
>>> of numbers.
>>> As of now, we have Storm is in the architecture which receives this
>>> data, sanitizes it and writes to cassandra.
>>> Now, i have a requirement to process this data. The processing includes
>>> finding unique numbers emitted by one or more devices for every minute,
>>> every hour, every day, every month.
>>> I had implemented this processing part as a batch job execution and now
>>> i am interested in making it a streaming application. i.e calculating the
>>> processed data as and when devices emit the data.
>>>
>>> I have the following two approaches:
>>> 1. Storm writes the actual data to cassandra and writes a message on
>>> Kafka bus that data corresponding to device D and minute M has been written
>>> to cassandra
>>>
>>> Then Spark streaming reads this message from kafka , then reads the data
>>> of Device D at minute M from cassandra and starts processing the data.
>>>
>>> 2. Storm writes the data to both cassandra and  kafka, spark reads the
>>> actual data from kafka , processes the data and writes to cassandra.
>>> The second approach avoids additional hit of reading from cassandra
>>> every minute , a device has written data to cassandra at the cost of
>>> putting the actual heavy messages instead of light events on  kafka.
>>>
>>> I am a bit confused among the two approaches. Please suggest which one
>>> is better and if both are bad, how can i handle this use case?
>>>
>>>
>>> --
>>> /Vamsi
>>>
>>
>
>
> --
> /Vamsi
>



-- 

David Morales de Frías  ::  +34 607 010 411 :: @dmoralesdf




Vía de las dos Castillas, 33, Ática 4, 3ª Planta
28224 Pozuelo de Alarcón, Madrid
Tel: +34 91 828 6473 // www.stratio.com // *@stratiobd
*


Spark wastes a lot of space (tmp data) for iterative jobs

2015-09-15 Thread Ali Hadian
Hi!

We are executing the PageRank example from the Spark java examples package 
on a very large input graph. The code is available here. (Spark's github 
repo).

During the execution, the framework generates huge amount of intermediate 
data per each iteration (i.e. the contribs RDD). The intermediate data is 
temporary, but Spark does not clear the intermediate data of previous 
iterations. That is to say, if we are in the middle of 20th iteration, all 
of the temporary data of all previous iterations (iteration 0 to 19) are 
still kept in the tmp directory. As a result, the tmp directory grows 
linearly.

It seems rational to keep the data from only the previous iteration, because 
if the current iteration fails, the job can be continued using the 
intermediate data from the previous iteration. Anyways, why does it keep the 
intermediate data for ALL previous iterations???

How can we enforce Spark to clear these intermediate data during the 
execution of job?

Kind regards, 
Ali hadian