Re: How to kill spark applications submitted using spark-submit reliably?

2015-11-22 Thread Ted Yu
>
> If you ask about trapping the SIGKILL signal in your script, see the
> following:
>
> http://linuxcommand.org/wss0160.php
>
> Cheers
>


> On Fri, Nov 20, 2015 at 10:02 PM, Vikram Kone 
> wrote:
>
>> I tried adding shutdown hook to my code but it didn't help. Still same
>> issue
>>
>>
>> On Fri, Nov 20, 2015 at 7:08 PM, Ted Yu  wrote:
>>
>>> Which Spark release are you using ?
>>>
>>> Can you pastebin the stack trace of the process running on your machine ?
>>>
>>> Thanks
>>>
>>> On Nov 20, 2015, at 6:46 PM, Vikram Kone  wrote:
>>>
>>> Hi,
>>> I'm seeing a strange problem. I have a spark cluster in standalone mode.
>>> I submit spark jobs from a remote node as follows from the terminal
>>>
>>> spark-submit --master spark://10.1.40.18:7077  --class com.test.Ping
>>> spark-jobs.jar
>>>
>>> when the app is running , when I press ctrl-C on the console terminal,
>>> then the process is killed and so is the app in the spark master UI. When I
>>> go to spark master ui, i see that this app is in state Killed under
>>> Completed applications, which is what I expected to see.
>>>
>>> Now, I created a shell script as follows to do the same
>>>
>>> #!/bin/bash
>>> spark-submit --master spark://10.1.40.18:7077  --class com.test.Ping
>>> spark-jobs.jar
>>> echo $! > my.pid
>>>
>>> When I execute the shell script from terminal, as follows
>>>
>>> $> bash myscript.sh
>>>
>>> The application is submitted correctly to spark master and I can see it
>>> as one of the running apps in teh spark master ui. But when I kill the
>>> process in my terminal as follows
>>>
>>> $> ps kill $(cat my.pid)
>>>
>>> I see that the process is killed on my machine but the spark appliation
>>> is still running in spark master! It doesn't get killed.
>>>
>>> I noticed one more thing that, when I launch the spark job via shell
>>> script and kill the application from spark master UI by clicking on "kill"
>>> next to the running application, it gets killed in spark ui but I still see
>>> the process running in my machine.
>>>
>>> In both cases, I would expect the remote spark app to be killed and my
>>> local process to be killed.
>>>
>>> Why is this happening? and how can I kill a spark app from the terminal
>>> launced via shell script w.o going to the spark master UI?
>>>
>>> I want to launch the spark app via script and log the pid so i can
>>> monitor it remotely
>>>
>>> thanks for the help
>>>
>>>
>>
>


Re: Datastore for GrpahX

2015-11-22 Thread Sonal Goyal
For graphx, you should be able to read and write data from practically any
datastore Spark supports - flat files, rdbms, hadoop etc. If you want to
save your graph as it is, check something like Neo4j.

http://neo4j.com/developer/apache-spark/

Best Regards,
Sonal
Founder, Nube Technologies 
Reifier at Strata Hadoop World

Reifier at Spark Summit 2015






On Sun, Nov 22, 2015 at 2:38 AM, Ilango Ravi  wrote:

> Hi
>
> I am trying to figure which  Datastore I can use for storing data to be
> used with GraphX.  Is there a good Graph database out there which I can use
> for storing Graph data for efficient data storage/retireval.
>
> thanks,
> ravi
>


Re: How to kill spark applications submitted using spark-submit reliably?

2015-11-22 Thread Sudhanshu Janghel
I have noticed that the UI takes some time to reflect the requested changes. Is 
that the issue ? Have you tried waiting for a few minutes after killing the 
spark job from terminal ?

Regards,
Sudhanshu

Kind Regards,
Sudhanshu

On 23 Nov 2015, at 1:43 a.m., Ted Yu  wrote:

>> If you ask about trapping the SIGKILL signal in your script, see the 
>> following:
>> 
>> http://linuxcommand.org/wss0160.php
>> 
>> Cheers
>  
>>> On Fri, Nov 20, 2015 at 10:02 PM, Vikram Kone  wrote:
>>> I tried adding shutdown hook to my code but it didn't help. Still same issue
>>> 
>>> 
 On Fri, Nov 20, 2015 at 7:08 PM, Ted Yu  wrote:
 Which Spark release are you using ?
 
 Can you pastebin the stack trace of the process running on your machine ?
 
 Thanks
 
> On Nov 20, 2015, at 6:46 PM, Vikram Kone  wrote:
> 
> Hi,
> I'm seeing a strange problem. I have a spark cluster in standalone mode. 
> I submit spark jobs from a remote node as follows from the terminal
> 
> spark-submit --master spark://10.1.40.18:7077  --class com.test.Ping 
> spark-jobs.jar
> 
> when the app is running , when I press ctrl-C on the console terminal, 
> then the process is killed and so is the app in the spark master UI. When 
> I go to spark master ui, i see that this app is in state Killed under 
> Completed applications, which is what I expected to see.
> 
> Now, I created a shell script as follows to do the same
> 
> #!/bin/bash
> spark-submit --master spark://10.1.40.18:7077  --class com.test.Ping 
> spark-jobs.jar
> echo $! > my.pid
> 
> When I execute the shell script from terminal, as follows
> 
> $> bash myscript.sh
> 
> The application is submitted correctly to spark master and I can see it 
> as one of the running apps in teh spark master ui. But when I kill the 
> process in my terminal as follows
> 
> $> ps kill $(cat my.pid)
> 
> I see that the process is killed on my machine but the spark appliation 
> is still running in spark master! It doesn't get killed.
> 
> I noticed one more thing that, when I launch the spark job via shell 
> script and kill the application from spark master UI by clicking on 
> "kill" next to the running application, it gets killed in spark ui but I 
> still see the process running in my machine. 
> 
> In both cases, I would expect the remote spark app to be killed and my 
> local process to be killed.
> 
> Why is this happening? and how can I kill a spark app from the terminal 
> launced via shell script w.o going to the spark master UI?
> 
> I want to launch the spark app via script and log the pid so i can 
> monitor it remotely
> 
> thanks for the help
> 


Re: Spark twitter streaming in Java

2015-11-22 Thread Yogs
Hi Soni,

I think you need to start the JavaStreamingContext. Add something like this
at the end of your program :

jssc.start();
jssc.awaitTermination(6);
jssc.stop();

- Yogesh

On Thu, Nov 19, 2015 at 12:34 PM, Soni spark 
wrote:

> Dear Friends,
>
> I am struggling with spark twitter streaming. I am not getting any data.
> Please correct below code if you found any mistakes.
>
> import org.apache.spark.*;
> import org.apache.spark.api.java.
> function.*;
> import org.apache.spark.streaming.*;
> import org.apache.spark.streaming.api.java.*;
> import org.apache.spark.streaming.twitter.*;
> import twitter4j.GeoLocation;
> import twitter4j.Status;
> import java.util.Arrays;
> import scala.Tuple2;
>
> public class SparkTwitterStreaming {
>
> public static void main(String[] args) {
>
> final String consumerKey = "XXX";
> final String consumerSecret = "XX";
> final String accessToken = "XX";
> final String accessTokenSecret = "XXX";
> SparkConf conf = new
> SparkConf().setMaster("local[2]").setAppName("SparkTwitterStreaming");
> JavaStreamingContext jssc = new JavaStreamingContext(conf, new
> Duration(6));
> System.setProperty("twitter4j.oauth.consumerKey", consumerKey);
> System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret);
> System.setProperty("twitter4j.oauth.accessToken", accessToken);
> System.setProperty("twitter4j.oauth.accessTokenSecret",
> accessTokenSecret);
> String[] filters = new String[] {"Narendra Modi"};
> JavaReceiverInputDStream twitterStream =
> TwitterUtils.createStream(jssc,filters);
>
> // Without filter: Output text of all tweets
> JavaDStream statuses = twitterStream.map(
> new Function() {
> public String call(Status status) { return
> status.getText(); }
> }
> );
> statuses.print();
> statuses.dstream().saveAsTextFiles("/home/apache/tweets", "txt");
>
>   }
>
> }
>
>


RE: SparkR DataFrame , Out of memory exception for very small file.

2015-11-22 Thread Sun, Rui
Vipul,

Not sure if I understand your question. DataFrame is immutable. You can't 
update a DataFrame.

Could you paste some log info for the OOM error?

-Original Message-
From: vipulrai [mailto:vipulrai8...@gmail.com] 
Sent: Friday, November 20, 2015 12:11 PM
To: user@spark.apache.org
Subject: SparkR DataFrame , Out of memory exception for very small file.

Hi Users,

I have a general doubt regarding DataFrames in SparkR.

I am trying to read a file from Hive and it gets created as DataFrame.

sqlContext <- sparkRHive.init(sc)

#DF
sales <- read.df(sqlContext, "hdfs://sample.csv", header ='true', 
 source = "com.databricks.spark.csv", inferSchema='true')

registerTempTable(sales,"Sales")

Do I need to create a new DataFrame for every update to the DataFrame like 
addition of new column or  need to update the original sales DataFrame.

sales1<- SparkR::sql(sqlContext,"Select a.* , 607 as C1 from Sales as a")


Please help me with this , as the orignal file is only 20MB but it throws out 
of memory exception on a cluster of 4GB Master and Two workers of 4GB each.

Also, what is the logic with DataFrame do I need to register and drop tempTable 
after every update??

Thanks,
Vipul



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkR-DataFrame-Out-of-memory-exception-for-very-small-file-tp25435.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Initial State

2015-11-22 Thread Tathagata Das
There is a way. Please see the scala docs.
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.dstream.PairDStreamFunctions

The first version of updateStateByKey has the parameter "initialRDD"

On Fri, Nov 20, 2015 at 6:52 PM, Bryan  wrote:

> All,
>
> Is there a way to introduce an initial RDD without doing updateStateByKey?
> I have an initial set of counts, and the algorithm I am using requires that
> I accumulate additional counts from streaming data, age off older counts,
> and make some calculations on them. The accumulation of counts uses
> reduceByKeyAndWindow. Is there another method to seed in the initial counts
> beyond updateStateByKey?
>
> Regards,
>
> Bryan Jeffrey
>


Re: Need Help Diagnosing/operating/tuning

2015-11-22 Thread Jeremy Davis
It seems like the problem is related to —executor-cores. Is there possibly some 
sort of race condition when using multiple cores per executor?


On Nov 22, 2015, at 12:38 PM, Jeremy Davis 
> wrote:


Hello,
I’m at a loss trying to diagnose why my spark job is failing. (works fine on 
small data)
It is failing during the repartition, or on the subsequent steps.. which then 
seem to fail and fall back to repartitioning..
I’ve tried adjusting every parameter I can find, but have had no success.
Input is only 23GB of LZO )probably 8x compression), and I’ve verified all 
files are valid (not corrupted).
I’ve tried more and less of : memory, partitions, executors, cores...
I’ve set maxFailures up to 300.
Setting 4GB heap usually makes it through repartitioning, but fails on 
subsequent steps (Sometimes being killed from running past memory limits). 
Larger Heaps usually don’t even make it through the first repartition due to 
all kinds of weird errors that look like read errors...

I’m at a loss on how to debug this thing.
Is there a tutorial somewhere?

——


Spark 1.4.1
Java 7
Cluster has 3TB of memory, and 400 cores.


Here are a collection of exceptions



java.io.FileNotFoundException: 
/var/storage/sdd3/nm-local/usercache/jeremy/appcache/application_1447722466442_1649/blockmgr-9ed5583f-cac1-4701-9f70-810c215b954f/13/shuffle_0_5_0.data
 (No such file or directory)
at java.io.FileOutputStream.open(Native Method)
at java.io.FileOutputStream.(FileOutputStream.java:221)
at 
org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:128)
at 
org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:215)
at 
org.apache.spark.util.collection.ChainedBuffer.read(ChainedBuffer.scala:56)
at 
org.apache.spark.util.collection.PartitionedSerializedPairBuffer$$anon$2.writeNext(PartitionedSerializedPairBuffer.scala:137)
at 
org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:757)
at 
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:70)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)




java.lang.InternalError: lzo1x_decompress_safe returned: -6
at 
com.hadoop.compression.lzo.LzoDecompressor.decompressBytesDirect(Native Method)
at 
com.hadoop.compression.lzo.LzoDecompressor.decompress(LzoDecompressor.java:315)
at 
com.hadoop.compression.lzo.LzopDecompressor.decompress(LzopDecompressor.java:122)
at 
com.hadoop.compression.lzo.LzopInputStream.decompress(LzopInputStream.java:247)
at 
org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:85)
at java.io.InputStream.read(InputStream.java:101)
at org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:180)
at 
org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216)
at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
at 
org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:209)
at 
org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:47)
at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:248)
at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:216)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at 
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:216)
at 
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)



java.io.IOException: Filesystem closed
at 

Re: Spark-SQL idiomatic way of adding a new partition or writing to Partitioned Persistent Table

2015-11-22 Thread Deenar Toraskar
Thanks Michael

Thanks for the response. Here is my understanding, correct me if I am wrong

1) Spark SQL written partitioned tables do not write metadata to the Hive
metastore. Spark SQL discovers partitions from the table location on the
underlying DFS, and not the metastore. It does this the first time a table
is accessed, so if the underlying partitions change a refresh table
 is required. Is there a way to see partitions discovered by
Spark SQL, show partitions  does not work on Spark SQL
partitioned tables. Also hive allows different partitions in different
physical locations, I guess this wont be possibly in Spark SQL.

2) If you want to retain compatibility with other SQL on Hadoop engines,
register your dataframe as a temp table and then use the  Hive's dynamic
partitioned insert syntax. SparkSQL uses this for Hive style tables.

3) Automatic schema discovery. I presume this is parquet only and only
if spark.sql.parquet.mergeSchema
/ mergeSchema is set to true. What happens when mergeSchema is set to false
( i guess i can check this out).

My two cents

a) it would help if there was kind of the hive nonstrict mode equivalent,
which would enforce schema compatibility for all partitions written to a
table.
b) refresh table is annoying for tables where partitions are being written
frequently, for other reasons, not sure if there is way around this.
c) it would be great if DataFrameWriter had an option to maintain
compatibility with the HiveMetastore. registerTempTable and "insert
overwrite table select from" is quite ugly and cumbersome
d) It would be helpful to resurrect the
https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/,
happy to help out with the Spark SQL portions.

Regards
Deenar


On 22 November 2015 at 18:54, Michael Armbrust 
wrote:

> Is it possible to add a new partition to a persistent table using Spark
>> SQL ? The following call works and data gets written in the correct
>> directories, but no partition metadata is not added to the Hive metastore.
>>
> I believe if you use Hive's dynamic partitioned insert syntax then we will
> fall back on metastore and do the update.
>
>> In addition I see nothing preventing any arbitrary schema being appended
>> to the existing table.
>>
> This is perhaps kind of a feature, we do automatic schema discovery and
> merging when loading a new parquet table.
>
>> Does SparkSQL not need partition metadata when reading data back?
>>
> No, we dynamically discover it in a distributed job when the table is
> loaded.
>


RE: Initial State

2015-11-22 Thread Bryan
I am currently using updateStateByKey (which as you pointed out allows the 
introduction of an initial RDD) to introduce an initial RDD to my window 
counting function. I was hoping to essentially seed the widow state in startup 
without the use of updateStateByKey to avoid the associated cost.

Is there an alternative method to initialize state?

InputQueueStream joined to window would seem to work, but InputQueueStream does 
not allow checkpointing

Sent from Outlook Mail



From: Tathagata Das
Sent: Sunday, November 22, 2015 8:01 PM
To: Bryan
Cc: user
Subject: Re: Initial State


There is a way. Please see the scala docs.
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.dstream.PairDStreamFunctions
 
The first version of updateStateByKey has the parameter "initialRDD"

On Fri, Nov 20, 2015 at 6:52 PM, Bryan  wrote:
All,

Is there a way to introduce an initial RDD without doing updateStateByKey? I 
have an initial set of counts, and the algorithm I am using requires that I 
accumulate additional counts from streaming data, age off older counts, and 
make some calculations on them. The accumulation of counts uses 
reduceByKeyAndWindow. Is there another method to seed in the initial counts 
beyond updateStateByKey?

Regards,

Bryan Jeffrey





Re: Spark-SQL idiomatic way of adding a new partition or writing to Partitioned Persistent Table

2015-11-22 Thread Stephen Boesch
>> and then use the  Hive's dynamic partitioned insert syntax

What does this entail?  Same sql but you need to do

   set  hive.exec.dynamic.partition = true;
in the hive/sql context  (along with several other related dynamic
partition settings.)

Is there anything else/special required?


2015-11-22 17:32 GMT-08:00 Deenar Toraskar :

> Thanks Michael
>
> Thanks for the response. Here is my understanding, correct me if I am wrong
>
> 1) Spark SQL written partitioned tables do not write metadata to the Hive
> metastore. Spark SQL discovers partitions from the table location on the
> underlying DFS, and not the metastore. It does this the first time a table
> is accessed, so if the underlying partitions change a refresh table
>  is required. Is there a way to see partitions discovered by
> Spark SQL, show partitions  does not work on Spark SQL
> partitioned tables. Also hive allows different partitions in different
> physical locations, I guess this wont be possibly in Spark SQL.
>
> 2) If you want to retain compatibility with other SQL on Hadoop engines,
> register your dataframe as a temp table and then use the  Hive's dynamic
> partitioned insert syntax. SparkSQL uses this for Hive style tables.
>
> 3) Automatic schema discovery. I presume this is parquet only and only if 
> spark.sql.parquet.mergeSchema
> / mergeSchema is set to true. What happens when mergeSchema is set to
> false ( i guess i can check this out).
>
> My two cents
>
> a) it would help if there was kind of the hive nonstrict mode equivalent,
> which would enforce schema compatibility for all partitions written to a
> table.
> b) refresh table is annoying for tables where partitions are being written
> frequently, for other reasons, not sure if there is way around this.
> c) it would be great if DataFrameWriter had an option to maintain
> compatibility with the HiveMetastore. registerTempTable and "insert
> overwrite table select from" is quite ugly and cumbersome
> d) It would be helpful to resurrect the
> https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/,
> happy to help out with the Spark SQL portions.
>
> Regards
> Deenar
>
>
> On 22 November 2015 at 18:54, Michael Armbrust 
> wrote:
>
>> Is it possible to add a new partition to a persistent table using Spark
>>> SQL ? The following call works and data gets written in the correct
>>> directories, but no partition metadata is not added to the Hive metastore.
>>>
>> I believe if you use Hive's dynamic partitioned insert syntax then we
>> will fall back on metastore and do the update.
>>
>>> In addition I see nothing preventing any arbitrary schema being appended
>>> to the existing table.
>>>
>> This is perhaps kind of a feature, we do automatic schema discovery and
>> merging when loading a new parquet table.
>>
>>> Does SparkSQL not need partition metadata when reading data back?
>>>
>> No, we dynamically discover it in a distributed job when the table is
>> loaded.
>>
>
>


Re: How to adjust Spark shell table width

2015-11-22 Thread Ted Yu
Currently the width, if truncation is performed, is hardcoded to be
20 characters.

I wonder if capability for user to specify the width should be added.

If so, I can send a PR.

Cheers

On Sun, Nov 22, 2015 at 1:39 AM, Jagrut Sharma 
wrote:

> Since version 1.5.0, show(false) on a DataFrame prevents truncation of
> long strings in the output. By default, strings more than 20 characters are
> truncated.
>
> Example usage:
> scala> df.show(false)
>
> --
> Jagrut
>
>
> On Sat, Nov 21, 2015 at 6:24 AM, Fengdong Yu 
> wrote:
>
>> Hi,
>>
>> I found if the column value is too long, spark shell only show a partial
>> result.
>>
>> such as:
>>
>> sqlContext.sql("select url from tableA”).show(10)
>>
>> it cannot show the whole URL here. so how to adjust it?  Thanks
>>
>>
>>
>>
>>
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>


Re: thought experiment: use spark ML to real time prediction

2015-11-22 Thread Vincenzo Selvaggio
The Data Mining Group (http://dmg.org/) that created PMML are working on a
new standard called PFA that indeed uses JSON documents, see
http://dmg.org/pfa/docs/motivation/ for details.

PFA could be the answer to your option c.

Regards,
Vincenzo

On Wed, Nov 18, 2015 at 12:03 PM, Nick Pentreath 
wrote:

> One such "lightweight PMML in JSON" is here -
> https://github.com/bigmlcom/json-pml. At least for the schema
> definitions. But nothing available in terms of evaluation/scoring. Perhaps
> this is something that can form a basis for such a new undertaking.
>
> I agree that distributed models are only really applicable in the case of
> massive scale factor models - and then anyway for latency purposes one
> needs to use LSH or something similar to achieve sufficiently real-time
> performance. These days one can easily spin up a single very powerful
> server to handle even very large models.
>
> On Tue, Nov 17, 2015 at 11:34 PM, DB Tsai  wrote:
>
>> I was thinking about to work on better version of PMML, JMML in JSON, but
>> as you said, this requires a dedicated team to define the standard which
>> will be a huge work.  However, option b) and c) still don't address the
>> distributed models issue. In fact, most of the models in production have to
>> be small enough to return the result to users within reasonable latency, so
>> I doubt how usefulness of the distributed models in real production
>> use-case. For R and Python, we can build a wrapper on-top of the
>> lightweight "spark-ml-common" project.
>>
>>
>> Sincerely,
>>
>> DB Tsai
>> --
>> Web: https://www.dbtsai.com
>> PGP Key ID: 0xAF08DF8D
>>
>> On Tue, Nov 17, 2015 at 2:29 AM, Nick Pentreath > > wrote:
>>
>>> I think the issue with pulling in all of spark-core is often with
>>> dependencies (and versions) conflicting with the web framework (or Akka in
>>> many cases). Plus it really is quite heavy if you just want a fairly
>>> lightweight model-serving app. For example we've built a fairly simple but
>>> scalable ALS factor model server on Scalatra, Akka and Breeze. So all you
>>> really need is the web framework and Breeze (or an alternative linear
>>> algebra lib).
>>>
>>> I definitely hear the pain-point that PMML might not be able to handle
>>> some types of transformations or models that exist in Spark. However,
>>> here's an example from scikit-learn -> PMML that may be instructive (
>>> https://github.com/scikit-learn/scikit-learn/issues/1596 and
>>> https://github.com/jpmml/jpmml-sklearn), where a fairly impressive list
>>> of estimators and transformers are supported (including e.g. scaling and
>>> encoding, and PCA).
>>>
>>> I definitely think the current model I/O and "export" or "deploy to
>>> production" situation needs to be improved substantially. However, you are
>>> left with the following options:
>>>
>>> (a) build out a lightweight "spark-ml-common" project that brings in the
>>> dependencies needed for production scoring / transformation in independent
>>> apps. However, here you only support Scala/Java - what about R and Python?
>>> Also, what about the distributed models? Perhaps "local" wrappers can be
>>> created, though this may not work for very large factor or LDA models. See
>>> also H20 example http://docs.h2o.ai/h2oclassic/userguide/scorePOJO.html
>>>
>>> (b) build out Spark's PMML support, and add missing stuff to PMML where
>>> possible. The benefit here is an existing standard with various tools for
>>> scoring (via REST server, Java app, Pig, Hive, various language support).
>>>
>>> (c) build out a more comprehensive I/O, serialization and scoring
>>> framework. Here you face the issue of supporting various predictors and
>>> transformers generically, across platforms and versioning. i.e. you're
>>> re-creating a new standard like PMML
>>>
>>> Option (a) is do-able, but I'm a bit concerned that it may be too "Spark
>>> specific", or even too "Scala / Java" specific. But it is still potentially
>>> very useful to Spark users to build this out and have a somewhat standard
>>> production serving framework and/or library (there are obviously existing
>>> options like PredictionIO etc).
>>>
>>> Option (b) is really building out the existing PMML support within
>>> Spark, so a lot of the initial work has already been done. I know some
>>> folks had (or have) licensing issues with some components of JPMML (e.g.
>>> the evaluator and REST server). But perhaps the solution here is to build
>>> an Apache2-licensed evaluator framework.
>>>
>>> Option (c) is obviously interesting - "let's build a better PMML (that
>>> uses JSON or whatever instead of XML!)". But it also seems like a huge
>>> amount of reinventing the wheel, and like any new standard would take time
>>> to garner wide support (if at all).
>>>
>>> It would be really useful to start to understand what the main missing
>>> pieces are in 

Re: spark shuffle

2015-11-22 Thread Shushant Arora
And does groupByKey will keep all values of pairrdd  in an iterable list in
inmemory of reducer. Which will lead to outofmemory if values of a key are
beyond memory of that node .
1.Is there a way to spill that to disk ?
2.If not is there a feasibility of partitioning pairdd using custom
partitioner and make all values of same key on same node and number of
partitions to be equal to number of distinct keys.

On Sat, Nov 21, 2015 at 11:21 PM, Shushant Arora 
wrote:

> Hi
>
> I have few doubts
>
> 1.does
> rdd.saveasNewAPIHadoopFile(outputdir,keyclass,valueclass,ouputformat
> class)-> shuffles data or it will always create same no of files in output
> dir as number of partitions in rdd.
>
> 2. How to use multiple outputs in saveasNewAPIHadoopFile to have file name
> generated from key for non Textoutputformat type outputformats.
>
> 3. I have a JavaPairRDD  - I want to partition it into number of
> partitons equal to distinct keys in pairrdd.
>
>1.will pairrdd.groupByKey() will create new rdd with partitions
> equal to number of  distinct keys in parent pairrdd?
>
>2.or i will have to calculate distinct keys in pairrdd (by
> using
> pairrdd.keys().distinct().count())and then call custom partitioner() on
> pair rdd withnumber of partitions as calculated
> distinct keys and partition by key?
>
> Thanks
>


Re: How to adjust Spark shell table width

2015-11-22 Thread Jagrut Sharma
Since version 1.5.0, show(false) on a DataFrame prevents truncation of long
strings in the output. By default, strings more than 20 characters are
truncated.

Example usage:
scala> df.show(false)

--
Jagrut


On Sat, Nov 21, 2015 at 6:24 AM, Fengdong Yu 
wrote:

> Hi,
>
> I found if the column value is too long, spark shell only show a partial
> result.
>
> such as:
>
> sqlContext.sql("select url from tableA”).show(10)
>
> it cannot show the whole URL here. so how to adjust it?  Thanks
>
>
>
>
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>


Re: thought experiment: use spark ML to real time prediction

2015-11-22 Thread Andy Davidson
Hi Nick

I started this thread. IMHO we need something like spark to train our
models. The resulting model are typically small enough to easily fit on a
single machine. My real time production system is not built on spark. The
real time system needs to use the model to make predictions in real time.


User case: ³high frequency stock training². Use spark to train a model.
There is no way I could use spark streaming in the real time production
system. I need some way to easily move the model trained using spark to a
non spark environment so I can make predictions in real time.

³credit card Fraud detection² is another similar use case.

Kind regards

Andy




From:  Nick Pentreath 
Date:  Wednesday, November 18, 2015 at 4:03 AM
To:  DB Tsai 
Cc:  "user @spark" 
Subject:  Re: thought experiment: use spark ML to real time prediction

> One such "lightweight PMML in JSON" is here -
> https://github.com/bigmlcom/json-pml. At least for the schema definitions. But
> nothing available in terms of evaluation/scoring. Perhaps this is something
> that can form a basis for such a new undertaking.
> 
> I agree that distributed models are only really applicable in the case of
> massive scale factor models - and then anyway for latency purposes one needs
> to use LSH or something similar to achieve sufficiently real-time performance.
> These days one can easily spin up a single very powerful server to handle even
> very large models.
> 
> On Tue, Nov 17, 2015 at 11:34 PM, DB Tsai  wrote:
>> I was thinking about to work on better version of PMML, JMML in JSON, but as
>> you said, this requires a dedicated team to define the standard which will be
>> a huge work.  However, option b) and c) still don't address the distributed
>> models issue. In fact, most of the models in production have to be small
>> enough to return the result to users within reasonable latency, so I doubt
>> how usefulness of the distributed models in real production use-case. For R
>> and Python, we can build a wrapper on-top of the lightweight
>> "spark-ml-common" project.
>> 
>> 
>> Sincerely,
>> 
>> DB Tsai
>> --
>> Web: https://www.dbtsai.com
>> PGP Key ID: 0xAF08DF8D
>> 
>> On Tue, Nov 17, 2015 at 2:29 AM, Nick Pentreath 
>> wrote:
>>> I think the issue with pulling in all of spark-core is often with
>>> dependencies (and versions) conflicting with the web framework (or Akka in
>>> many cases). Plus it really is quite heavy if you just want a fairly
>>> lightweight model-serving app. For example we've built a fairly simple but
>>> scalable ALS factor model server on Scalatra, Akka and Breeze. So all you
>>> really need is the web framework and Breeze (or an alternative linear
>>> algebra lib).
>>> 
>>> I definitely hear the pain-point that PMML might not be able to handle some
>>> types of transformations or models that exist in Spark. However, here's an
>>> example from scikit-learn -> PMML that may be instructive
>>> (https://github.com/scikit-learn/scikit-learn/issues/1596 and
>>> https://github.com/jpmml/jpmml-sklearn), where a fairly impressive list of
>>> estimators and transformers are supported (including e.g. scaling and
>>> encoding, and PCA).
>>> 
>>> I definitely think the current model I/O and "export" or "deploy to
>>> production" situation needs to be improved substantially. However, you are
>>> left with the following options:
>>> 
>>> (a) build out a lightweight "spark-ml-common" project that brings in the
>>> dependencies needed for production scoring / transformation in independent
>>> apps. However, here you only support Scala/Java - what about R and Python?
>>> Also, what about the distributed models? Perhaps "local" wrappers can be
>>> created, though this may not work for very large factor or LDA models. See
>>> also H20 example http://docs.h2o.ai/h2oclassic/userguide/scorePOJO.html
>>> 
>>> (b) build out Spark's PMML support, and add missing stuff to PMML where
>>> possible. The benefit here is an existing standard with various tools for
>>> scoring (via REST server, Java app, Pig, Hive, various language support).
>>> 
>>> (c) build out a more comprehensive I/O, serialization and scoring framework.
>>> Here you face the issue of supporting various predictors and transformers
>>> generically, across platforms and versioning. i.e. you're re-creating a new
>>> standard like PMML
>>> 
>>> Option (a) is do-able, but I'm a bit concerned that it may be too "Spark
>>> specific", or even too "Scala / Java" specific. But it is still potentially
>>> very useful to Spark users to build this out and have a somewhat standard
>>> production serving framework and/or library (there are obviously existing
>>> options like PredictionIO etc).
>>> 
>>> Option (b) is really building out the existing PMML support within Spark, so
>>> a lot of the initial work has already been 

Need Help Diagnosing/operating/tuning

2015-11-22 Thread Jeremy Davis

Hello,
I’m at a loss trying to diagnose why my spark job is failing. (works fine on 
small data)
It is failing during the repartition, or on the subsequent steps.. which then 
seem to fail and fall back to repartitioning..
I’ve tried adjusting every parameter I can find, but have had no success.
Input is only 23GB of LZO )probably 8x compression), and I’ve verified all 
files are valid (not corrupted).
I’ve tried more and less of : memory, partitions, executors, cores...
I’ve set maxFailures up to 300.
Setting 4GB heap usually makes it through repartitioning, but fails on 
subsequent steps (Sometimes being killed from running past memory limits). 
Larger Heaps usually don’t even make it through the first repartition due to 
all kinds of weird errors that look like read errors...

I’m at a loss on how to debug this thing.
Is there a tutorial somewhere?

——


Spark 1.4.1
Java 7
Cluster has 3TB of memory, and 400 cores.


Here are a collection of exceptions



java.io.FileNotFoundException: 
/var/storage/sdd3/nm-local/usercache/jeremy/appcache/application_1447722466442_1649/blockmgr-9ed5583f-cac1-4701-9f70-810c215b954f/13/shuffle_0_5_0.data
 (No such file or directory)
at java.io.FileOutputStream.open(Native Method)
at java.io.FileOutputStream.(FileOutputStream.java:221)
at 
org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:128)
at 
org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:215)
at 
org.apache.spark.util.collection.ChainedBuffer.read(ChainedBuffer.scala:56)
at 
org.apache.spark.util.collection.PartitionedSerializedPairBuffer$$anon$2.writeNext(PartitionedSerializedPairBuffer.scala:137)
at 
org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:757)
at 
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:70)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)




java.lang.InternalError: lzo1x_decompress_safe returned: -6
at 
com.hadoop.compression.lzo.LzoDecompressor.decompressBytesDirect(Native Method)
at 
com.hadoop.compression.lzo.LzoDecompressor.decompress(LzoDecompressor.java:315)
at 
com.hadoop.compression.lzo.LzopDecompressor.decompress(LzopDecompressor.java:122)
at 
com.hadoop.compression.lzo.LzopInputStream.decompress(LzopInputStream.java:247)
at 
org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:85)
at java.io.InputStream.read(InputStream.java:101)
at org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:180)
at 
org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216)
at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
at 
org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:209)
at 
org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:47)
at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:248)
at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:216)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at 
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:216)
at 
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)



java.io.IOException: Filesystem closed
at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:707)
at 
org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:776)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:837)
at 

Re: How to kill spark applications submitted using spark-submit reliably?

2015-11-22 Thread Ted Yu
If you ask about trapping the SIGKILL signal in your script, see the
following:

http://linuxcommand.org/wss0160.php

Cheers

On Fri, Nov 20, 2015 at 10:02 PM, Vikram Kone  wrote:

> I tried adding shutdown hook to my code but it didn't help. Still same
> issue
>
>
> On Fri, Nov 20, 2015 at 7:08 PM, Ted Yu  wrote:
>
>> Which Spark release are you using ?
>>
>> Can you pastebin the stack trace of the process running on your machine ?
>>
>> Thanks
>>
>> On Nov 20, 2015, at 6:46 PM, Vikram Kone  wrote:
>>
>> Hi,
>> I'm seeing a strange problem. I have a spark cluster in standalone mode.
>> I submit spark jobs from a remote node as follows from the terminal
>>
>> spark-submit --master spark://10.1.40.18:7077  --class com.test.Ping
>> spark-jobs.jar
>>
>> when the app is running , when I press ctrl-C on the console terminal,
>> then the process is killed and so is the app in the spark master UI. When I
>> go to spark master ui, i see that this app is in state Killed under
>> Completed applications, which is what I expected to see.
>>
>> Now, I created a shell script as follows to do the same
>>
>> #!/bin/bash
>> spark-submit --master spark://10.1.40.18:7077  --class com.test.Ping
>> spark-jobs.jar
>> echo $! > my.pid
>>
>> When I execute the shell script from terminal, as follows
>>
>> $> bash myscript.sh
>>
>> The application is submitted correctly to spark master and I can see it
>> as one of the running apps in teh spark master ui. But when I kill the
>> process in my terminal as follows
>>
>> $> ps kill $(cat my.pid)
>>
>> I see that the process is killed on my machine but the spark appliation
>> is still running in spark master! It doesn't get killed.
>>
>> I noticed one more thing that, when I launch the spark job via shell
>> script and kill the application from spark master UI by clicking on "kill"
>> next to the running application, it gets killed in spark ui but I still see
>> the process running in my machine.
>>
>> In both cases, I would expect the remote spark app to be killed and my
>> local process to be killed.
>>
>> Why is this happening? and how can I kill a spark app from the terminal
>> launced via shell script w.o going to the spark master UI?
>>
>> I want to launch the spark app via script and log the pid so i can
>> monitor it remotely
>>
>> thanks for the help
>>
>>
>