Re: Multiple Spark Streaming Jobs on Single Master

2015-10-23 Thread gaurav sharma
I specified spark,cores.max = 4

but it started 2 executors with 2 cores each on each of the 2 workers.

in standalone cluster mode, though we can specify Worker cores, there is no
ways to specify Number of cores executor must take on that particular
worker machine.



On Sat, Oct 24, 2015 at 1:41 AM, Augustus Hong 
wrote:

> How did you specify number of cores each executor can use?
>
> Be sure to use this when submitting jobs with spark-submit: 
> *--total-executor-cores
> 100.*
>
> Other options won't work from my experience.
>
> On Fri, Oct 23, 2015 at 8:36 AM, gaurav sharma 
> wrote:
>
>> Hi,
>>
>> I created 2 workers on same machine each with 4 cores and 6GB ram
>>
>> I submitted first job, and it allocated 2 cores on each of the worker
>> processes, and utilized full 4 GB ram for each executor process
>>
>> When i submit my second job it always say in WAITING state.
>>
>>
>> Cheers!!
>>
>>
>>
>> On Tue, Oct 20, 2015 at 10:46 AM, Tathagata Das 
>> wrote:
>>
>>> You can set the max cores for the first submitted job such that it does
>>> not take all the resources from the master. See
>>> http://spark.apache.org/docs/latest/submitting-applications.html
>>>
>>> # Run on a Spark standalone cluster in client deploy mode
>>> ./bin/spark-submit \
>>>   --class org.apache.spark.examples.SparkPi \
>>>   --master spark://207.184.161.138:7077 \
>>>   --executor-memory 20G \
>>>   *--total-executor-cores 100 \*
>>>   /path/to/examples.jar \
>>>   1000
>>>
>>>
>>> On Mon, Oct 19, 2015 at 4:26 PM, Augustus Hong <
>>> augus...@branchmetrics.io> wrote:
>>>
 Hi All,

 Would it be possible to run multiple spark streaming jobs on a single
 master at the same time?

 I currently have one master node and several worker nodes in the
 standalone mode, and I used spark-submit to submit multiple spark streaming
 jobs.

 From what I observed, it seems like only the first submitted job would
 get resources and run.  Jobs submitted afterwards will have the status
 "Waiting", and will only run after the first one is finished or killed.

 I tried limiting each executor to only 1 core(each worker machine has 8
 cores), but the same things happens that only one job will be run, even
 though there are a lot of idle cores.

 Best,
 Augustus



 --
 [image: Branch Metrics mobile deep linking] * Augustus
 Hong*
  Data Analytics | Branch Metrics
  m 650-391-3369 | e augus...@branch.io

>>>
>>>
>>
>
>
> --
> [image: Branch Metrics mobile deep linking] * Augustus
> Hong*
>  Data Analytics | Branch Metrics
>  m 650-391-3369 | e augus...@branch.io
>


Re: Huge shuffle data size

2015-10-23 Thread Sabarish Sasidharan
How many rows are you joining? How many rows in the output?

Regards
Sab
On 24-Oct-2015 2:32 am, "pratik khadloya"  wrote:

> Actually the groupBy is not taking a lot of time.
> The join that i do later takes the most (95 %) amount of time.
> Also, the grouping i am doing is based on the DataFrame api, which does
> not contain any function for reduceBy... i guess the DF automatically uses
> reduce by when we do a group by.
>
> ~Pratik
>
> On Fri, Oct 23, 2015 at 1:38 PM Kartik Mathur  wrote:
>
>> Don't use groupBy , use reduceByKey instead , groupBy should always be
>> avoided as it leads to lot of shuffle reads/writes.
>>
>> On Fri, Oct 23, 2015 at 11:39 AM, pratik khadloya 
>> wrote:
>>
>>> Sorry i sent the wrong join code snippet, the actual snippet is
>>>
>>> ggImpsDf.join(
>>>aggRevenueDf,
>>>aggImpsDf("id_1") <=> aggRevenueDf("id_1")
>>>  && aggImpsDf("id_2") <=> aggRevenueDf("id_2")
>>>  && aggImpsDf("day_hour") <=> aggRevenueDf("day_hour")
>>>  && aggImpsDf("day_hour_2") <=> aggRevenueDf("day_hour_2"),
>>>"inner")
>>>.select(
>>>  aggImpsDf("id_1"), aggImpsDf("id_2"), aggImpsDf("day_hour"),
>>>  aggImpsDf("day_hour_2"), aggImpsDf("metric1"),
>>> aggRevenueDf("metric2"))
>>>.coalesce(200)
>>>
>>>
>>> On Fri, Oct 23, 2015 at 11:16 AM pratik khadloya 
>>> wrote:
>>>
 Hello,

 Data about my spark job is below. My source data is only 916MB (stage
 0) and 231MB (stage 1), but when i join the two data sets (stage 2) it
 takes a very long time and as i see the shuffled data is 614GB. Is this
 something expected? Both the data sets produce 200 partitions.

 Stage IdDescriptionSubmittedDurationTasks: 
 Succeeded/TotalInputOutputShuffle
 ReadShuffle Write2saveAsTable at Driver.scala:269
 
 +details

 2015/10/22 18:48:122.3 h
 200/200
 614.6 GB1saveAsTable at Driver.scala:269
 
 +details

 2015/10/22 18:46:022.1 min
 8/8
 916.2 MB3.9 MB0saveAsTable at Driver.scala:269
 
 +details

 2015/10/22 18:46:0235 s
 3/3
 231.2 MB4.8 MBAm running Spark 1.4.1 and my code snippet which joins
 the two data sets is:

 hc.sql(query).
 mapPartitions(iter => {
   iter.map {
 case Row(
  ...
  ...
  ...
 )
   }
 }
 ).toDF()
 .groupBy("id_1", "id_2", "day_hour", "day_hour_2")
 .agg($"id_1", $"id_2", $"day_hour", $"day_hour_2",
   sum("attr1").alias("attr1"), sum("attr2").alias("attr2"))


 Please advise on how to reduce the shuffle and speed this up.


 ~Pratik


>>


question about HadoopFsRelation

2015-10-23 Thread Koert Kuipers
i noticed in the comments for HadoopFsRelation.buildScan it says:
  * @param inputFiles For a non-partitioned relation, it contains paths of
all data files in the
   *relation. For a partitioned relation, it contains paths of all
data files in a single
   *selected partition.

do i understand it correctly that it actually lists all the data files
(part files), not just data directories that contain the files?
if so,that sounds like trouble to me, because most implementations will use
this info to set the input paths for FileInputFormat. for example in
ParquetRelation:
if (inputFiles.nonEmpty) {
  FileInputFormat.setInputPaths(job, inputFiles.map(_.getPath): _*)
}

if all the part files are listed this will make the jobConf huge and could
cause issues upon serialization and/or broadcasting.

it can also lead to other inefficiencies, for example spark-avro creates a
RDD for every input (part) file, which quickly leads to thousands of RDDs.

i think instead of files only the directories should be listed in the input
path?


Re: How does Spark coordinate with Tachyon wrt data locality

2015-10-23 Thread Calvin Jia
Hi Shane,

Tachyon provides an api to get the block locations of the file which Spark
uses when scheduling tasks.

Hope this helps,
Calvin

On Fri, Oct 23, 2015 at 8:15 AM, Kinsella, Shane 
wrote:

> Hi all,
>
>
>
> I am looking into how Spark handles data locality wrt Tachyon. My main
> concern is how this is coordinated. Will it send a task based on a file
> loaded from Tachyon to a node that it knows has that file locally and how
> does it know which nodes has what?
>
>
>
> Kind regards,
>
> Shane
> This email (including any attachments) is proprietary to Aspect Software,
> Inc. and may contain information that is confidential. If you have received
> this message in error, please do not read, copy or forward this message.
> Please notify the sender immediately, delete it from your system and
> destroy any copies. You may not further disclose or distribute this email
> or its attachments.
>


[SPARK-9776]Another instance of Derby may have already booted the database #8947

2015-10-23 Thread Ge, Yao (Y.)
I have not been able to run spark-shell in yarn-cluster mode since 1.5.0 due to 
the same issue described by [SPARK-9776]. Did this pull request fix the issue?
https://github.com/apache/spark/pull/8947
I still have the same problem with 1.5.1 (I am running on HDP 2.2.6 with Hadoop 
2.6)
Thanks.

-Yao



streaming.twitter.TwitterUtils what is the best way to save twitter status to HDFS?

2015-10-23 Thread Andy Davidson
I need to save the twitter status I receive so that I can do additional
batch based processing on them in the future. Is it safe to assume HDFS is
the best way to go?

Any idea what is the best way to save twitter status to HDFS?

JavaStreamingContext ssc = new JavaStreamingContext(jsc, new
Duration(1000));

Authorization twitterAuth = setupTwitterAuthorization();

JavaDStream tweets =
TwitterFilterQueryUtils.createStream(ssc, twitterAuth, query);



http://spark.apache.org/docs/latest/streaming-programming-guide.html#output-
operations-on-dstreams



saveAsHadoopFiles(prefix, [suffix])Save this DStream's contents as Hadoop
files. The file name at each batch interval is generated based on prefix and
suffix: "prefix-TIME_IN_MS[.suffix]".
Python API This is not available in the Python API.


How ever JavaDStream<> does not support any savesAs* functions



DStream dStream = tweets.dstream();


http://spark.apache.org/docs/latest/api/java/org/apache/spark/streaming/dstr
eam/DStream.html
DStream only supports saveAsObjectFiles

()and saveAsTextFiles
 (()


saveAsTextFiles
public void saveAsTextFiles(java.lang.String prefix,
   java.lang.String suffix)
Save each RDD in this DStream as at text file, using string representation
of elements. The file name at each batch interval is generated based on
prefix andsuffix: "prefix-TIME_IN_MS.suffix².


Any idea where I would find these files? I assume they will be spread out
all over my cluster?


Also I wonder if using the saveAs*() functions are going to cause other
problems. My duration is set to 1 sec. Am I going to overwhelm the system
with a bunch of tiny files? Many of them will be empty



Kind regards



Andy




Re: get host from rdd map

2015-10-23 Thread weoccc
yea,

my use cases is that i want to have some external communications where rdd
is being run in map. The external communication might be handled separately
transparent to spark.  What will be the hacky way and nonhacky way to do
that ? :)

Weide



On Fri, Oct 23, 2015 at 5:32 PM, Ted Yu  wrote:

> Can you outline your use case a bit more ?
>
> Do you want to know all the hosts which would run the map ?
>
> Cheers
>
> On Fri, Oct 23, 2015 at 5:16 PM, weoccc  wrote:
>
>> in rdd map function, is there a way i can know the list of host names
>> where the map runs ? any code sample would be appreciated ?
>>
>> thx,
>>
>> Weide
>>
>>
>>
>


Re: get host from rdd map

2015-10-23 Thread Ted Yu
Can you outline your use case a bit more ?

Do you want to know all the hosts which would run the map ?

Cheers

On Fri, Oct 23, 2015 at 5:16 PM, weoccc  wrote:

> in rdd map function, is there a way i can know the list of host names
> where the map runs ? any code sample would be appreciated ?
>
> thx,
>
> Weide
>
>
>


get host from rdd map

2015-10-23 Thread weoccc
in rdd map function, is there a way i can know the list of host names where
the map runs ? any code sample would be appreciated ?

thx,

Weide


Spark Streaming: how to StreamingContext.queueStream

2015-10-23 Thread Anfernee Xu
Hi,

Here's my situation, I have some kind of offline dataset, but I want to
form a virtual data stream feeding to Spark Streaming, my code looks like
this


   // sort offline data by time
 1)  JavaRDD sortedByTime = offlineDataRDD.sortBy( );

   // compute a list of JavaRDD,  each element JavaRDD is hosting the data
in the same time
   // bucket.
  2) List virtualStreamRdd = ?

Queue> queue = Queues.newLinkedBlockingQueue();
queue.addAll(virtualStreamRdd);

/*
 * Create DStream from the queue
 */

3) final JavaDStream rowDStream =
streamingContext.queueStream(queue);


Currently I'm stucking in 2), any suggestion is appreciated.

Thanks

-- 
--Anfernee


Spark cant ORC files properly using 1.5.1 hadoop 2.6

2015-10-23 Thread unk1102
Hi I am having weird issue I have a Spark job which has bunch of
hiveContext.sql() and creates ORC files as part of hive tables with
partitions and it runs fine in 1.4.1 and hadoop 2.4. 

Now I tried to move to Spark 1.5.1/hadoop 2.6 Spark job does not work as
expected it does not created ORC files. But if I use Spark 1.5.1/hadoop 2.4
it works fine I dont understand the reason please guide.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-cant-ORC-files-properly-using-1-5-1-hadoop-2-6-tp25189.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Huge shuffle data size

2015-10-23 Thread pratik khadloya
Actually the groupBy is not taking a lot of time.
The join that i do later takes the most (95 %) amount of time.
Also, the grouping i am doing is based on the DataFrame api, which does not
contain any function for reduceBy... i guess the DF automatically uses
reduce by when we do a group by.

~Pratik

On Fri, Oct 23, 2015 at 1:38 PM Kartik Mathur  wrote:

> Don't use groupBy , use reduceByKey instead , groupBy should always be
> avoided as it leads to lot of shuffle reads/writes.
>
> On Fri, Oct 23, 2015 at 11:39 AM, pratik khadloya 
> wrote:
>
>> Sorry i sent the wrong join code snippet, the actual snippet is
>>
>> ggImpsDf.join(
>>aggRevenueDf,
>>aggImpsDf("id_1") <=> aggRevenueDf("id_1")
>>  && aggImpsDf("id_2") <=> aggRevenueDf("id_2")
>>  && aggImpsDf("day_hour") <=> aggRevenueDf("day_hour")
>>  && aggImpsDf("day_hour_2") <=> aggRevenueDf("day_hour_2"),
>>"inner")
>>.select(
>>  aggImpsDf("id_1"), aggImpsDf("id_2"), aggImpsDf("day_hour"),
>>  aggImpsDf("day_hour_2"), aggImpsDf("metric1"),
>> aggRevenueDf("metric2"))
>>.coalesce(200)
>>
>>
>> On Fri, Oct 23, 2015 at 11:16 AM pratik khadloya 
>> wrote:
>>
>>> Hello,
>>>
>>> Data about my spark job is below. My source data is only 916MB (stage 0)
>>> and 231MB (stage 1), but when i join the two data sets (stage 2) it takes a
>>> very long time and as i see the shuffled data is 614GB. Is this something
>>> expected? Both the data sets produce 200 partitions.
>>>
>>> Stage IdDescriptionSubmittedDurationTasks: Succeeded/TotalInputOutputShuffle
>>> ReadShuffle Write2saveAsTable at Driver.scala:269
>>> 
>>> +details
>>>
>>> 2015/10/22 18:48:122.3 h
>>> 200/200
>>> 614.6 GB1saveAsTable at Driver.scala:269
>>> 
>>> +details
>>>
>>> 2015/10/22 18:46:022.1 min
>>> 8/8
>>> 916.2 MB3.9 MB0saveAsTable at Driver.scala:269
>>> 
>>> +details
>>>
>>> 2015/10/22 18:46:0235 s
>>> 3/3
>>> 231.2 MB4.8 MBAm running Spark 1.4.1 and my code snippet which joins
>>> the two data sets is:
>>>
>>> hc.sql(query).
>>> mapPartitions(iter => {
>>>   iter.map {
>>> case Row(
>>>  ...
>>>  ...
>>>  ...
>>> )
>>>   }
>>> }
>>> ).toDF()
>>> .groupBy("id_1", "id_2", "day_hour", "day_hour_2")
>>> .agg($"id_1", $"id_2", $"day_hour", $"day_hour_2",
>>>   sum("attr1").alias("attr1"), sum("attr2").alias("attr2"))
>>>
>>>
>>> Please advise on how to reduce the shuffle and speed this up.
>>>
>>>
>>> ~Pratik
>>>
>>>
>


Re: unsubscribe

2015-10-23 Thread Ted Yu
Take a look at first section of https://spark.apache.org/community

On Fri, Oct 23, 2015 at 1:46 PM,  wrote:

> This e-mail and any files transmitted with it are for the sole use of the
> intended recipient(s) and may contain confidential and privileged
> information. If you are not the intended recipient(s), please reply to the
> sender and destroy all copies of the original message. Any unauthorized
> review, use, disclosure, dissemination, forwarding, printing or copying of
> this email, and/or any action taken in reliance on the contents of this
> e-mail is strictly prohibited and may be unlawful. Where permitted by
> applicable law, this e-mail and other e-mail communications sent to and
> from Cognizant e-mail addresses may be monitored.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


unsubscribe

2015-10-23 Thread ZAHOORAHMED.KAZI
This e-mail and any files transmitted with it are for the sole use of the 
intended recipient(s) and may contain confidential and privileged information. 
If you are not the intended recipient(s), please reply to the sender and 
destroy all copies of the original message. Any unauthorized review, use, 
disclosure, dissemination, forwarding, printing or copying of this email, 
and/or any action taken in reliance on the contents of this e-mail is strictly 
prohibited and may be unlawful. Where permitted by applicable law, this e-mail 
and other e-mail communications sent to and from Cognizant e-mail addresses may 
be monitored.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Huge shuffle data size

2015-10-23 Thread Kartik Mathur
Don't use groupBy , use reduceByKey instead , groupBy should always be
avoided as it leads to lot of shuffle reads/writes.

On Fri, Oct 23, 2015 at 11:39 AM, pratik khadloya 
wrote:

> Sorry i sent the wrong join code snippet, the actual snippet is
>
> ggImpsDf.join(
>aggRevenueDf,
>aggImpsDf("id_1") <=> aggRevenueDf("id_1")
>  && aggImpsDf("id_2") <=> aggRevenueDf("id_2")
>  && aggImpsDf("day_hour") <=> aggRevenueDf("day_hour")
>  && aggImpsDf("day_hour_2") <=> aggRevenueDf("day_hour_2"),
>"inner")
>.select(
>  aggImpsDf("id_1"), aggImpsDf("id_2"), aggImpsDf("day_hour"),
>  aggImpsDf("day_hour_2"), aggImpsDf("metric1"),
> aggRevenueDf("metric2"))
>.coalesce(200)
>
>
> On Fri, Oct 23, 2015 at 11:16 AM pratik khadloya 
> wrote:
>
>> Hello,
>>
>> Data about my spark job is below. My source data is only 916MB (stage 0)
>> and 231MB (stage 1), but when i join the two data sets (stage 2) it takes a
>> very long time and as i see the shuffled data is 614GB. Is this something
>> expected? Both the data sets produce 200 partitions.
>>
>> Stage IdDescriptionSubmittedDurationTasks: Succeeded/TotalInputOutputShuffle
>> ReadShuffle Write2saveAsTable at Driver.scala:269
>> 
>> +details
>>
>> 2015/10/22 18:48:122.3 h
>> 200/200
>> 614.6 GB1saveAsTable at Driver.scala:269
>> 
>> +details
>>
>> 2015/10/22 18:46:022.1 min
>> 8/8
>> 916.2 MB3.9 MB0saveAsTable at Driver.scala:269
>> 
>> +details
>>
>> 2015/10/22 18:46:0235 s
>> 3/3
>> 231.2 MB4.8 MBAm running Spark 1.4.1 and my code snippet which joins the
>> two data sets is:
>>
>> hc.sql(query).
>> mapPartitions(iter => {
>>   iter.map {
>> case Row(
>>  ...
>>  ...
>>  ...
>> )
>>   }
>> }
>> ).toDF()
>> .groupBy("id_1", "id_2", "day_hour", "day_hour_2")
>> .agg($"id_1", $"id_2", $"day_hour", $"day_hour_2",
>>   sum("attr1").alias("attr1"), sum("attr2").alias("attr2"))
>>
>>
>> Please advise on how to reduce the shuffle and speed this up.
>>
>>
>> ~Pratik
>>
>>


Re: Multiple Spark Streaming Jobs on Single Master

2015-10-23 Thread Augustus Hong
How did you specify number of cores each executor can use?

Be sure to use this when submitting jobs with spark-submit:
*--total-executor-cores
100.*

Other options won't work from my experience.

On Fri, Oct 23, 2015 at 8:36 AM, gaurav sharma 
wrote:

> Hi,
>
> I created 2 workers on same machine each with 4 cores and 6GB ram
>
> I submitted first job, and it allocated 2 cores on each of the worker
> processes, and utilized full 4 GB ram for each executor process
>
> When i submit my second job it always say in WAITING state.
>
>
> Cheers!!
>
>
>
> On Tue, Oct 20, 2015 at 10:46 AM, Tathagata Das 
> wrote:
>
>> You can set the max cores for the first submitted job such that it does
>> not take all the resources from the master. See
>> http://spark.apache.org/docs/latest/submitting-applications.html
>>
>> # Run on a Spark standalone cluster in client deploy mode
>> ./bin/spark-submit \
>>   --class org.apache.spark.examples.SparkPi \
>>   --master spark://207.184.161.138:7077 \
>>   --executor-memory 20G \
>>   *--total-executor-cores 100 \*
>>   /path/to/examples.jar \
>>   1000
>>
>>
>> On Mon, Oct 19, 2015 at 4:26 PM, Augustus Hong > > wrote:
>>
>>> Hi All,
>>>
>>> Would it be possible to run multiple spark streaming jobs on a single
>>> master at the same time?
>>>
>>> I currently have one master node and several worker nodes in the
>>> standalone mode, and I used spark-submit to submit multiple spark streaming
>>> jobs.
>>>
>>> From what I observed, it seems like only the first submitted job would
>>> get resources and run.  Jobs submitted afterwards will have the status
>>> "Waiting", and will only run after the first one is finished or killed.
>>>
>>> I tried limiting each executor to only 1 core(each worker machine has 8
>>> cores), but the same things happens that only one job will be run, even
>>> though there are a lot of idle cores.
>>>
>>> Best,
>>> Augustus
>>>
>>>
>>>
>>> --
>>> [image: Branch Metrics mobile deep linking] * Augustus
>>> Hong*
>>>  Data Analytics | Branch Metrics
>>>  m 650-391-3369 | e augus...@branch.io
>>>
>>
>>
>


-- 
[image: Branch Metrics mobile deep linking] * Augustus
Hong*
 Data Analytics | Branch Metrics
 m 650-391-3369 | e augus...@branch.io


Re: How to implement zipWithIndex as a UDF?

2015-10-23 Thread Michael Armbrust
The user facing type mapping is documented here:
http://spark.apache.org/docs/latest/sql-programming-guide.html#data-types

On Fri, Oct 23, 2015 at 12:10 PM, Benyi Wang  wrote:

> If I have two columns
>
> StructType(Seq(
>   StructField("id", LongType),
>   StructField("phones", ArrayType(StringType
>
> I want to add index for “phones” before I explode it.
>
> Can this be implemented as GenericUDF?
>
> I tried DataFrame.explode. It worked for simple types like string, but I
> could not figure out how to handle a nested type like StructType.
>
> Can somebody shed a light?
>
> I’m using spark 1.5.1.
> ​
>


Re: How to get inverse Matrix / RDD or how to solve linear system of equations

2015-10-23 Thread Zhiliang Zhu
Hi Sujit , 
Firstly, I must show my deep appreciation and respect towards your kind help 
and excellent knowledge.It would be the best if you and me are in the same 
place then I shall specially go to express my thanks and respect to you.
I will try your way by spark mllib SVD .
For Linear Regression, Ax = b, in fact I want to view their variables and 
coefficient conversely, just as (1):   x1 * a1 + x2 * a2 + ... + xn * an = b , 
there is only with one linear  formula for it.There are also training data set 
with n number of point tuple [a11, a21, ..., an1, b1] just from [A, b] 
(variables ), then the coefficient x = [x1, x2, ..., xn]T may be got by mllib 
linear regression.
 
However, I tested spark mllib LR, while the point tuple dimension is more than 
6, it would need more than 100 000 number of iterations to get enough accurate 
solution about its coefficient, the time complexity is too much, the time cost 
would be very tremendous while the dimension is hundreds of.
In effect, I am working on algorithm optimization with specific model not in 
MLlib, that is object quadratic functionf(x1, x2, ..., xn) with lots of linear 
constraint conditions, then I use Lagrange way to convert the question as 
linear system of equations.My last problem is that, whether spark is properly 
used to algorithm optimization , or just directly use 
org.apache.spark.mllib.optimization, or by some other way, or it is not much 
convenient for this application...
Thank you very much~~Zhiliang 

 On Saturday, October 24, 2015 12:41 AM, Sujit Pal  
wrote:
   

 Hi Zhiliang,
For a system of equations AX = y, Linear Regression will give you a best-fit 
estimate for A (coefficient vector) for a matrix of feature variables X and 
corresponding target variable y for a subset of your data. OTOH, what you are 
looking for here is to solve for x a system of equations Ax = b, where A and b 
are known and you want the vector x.
This Math Stackexchange page [2] explains the math in more detail, but 
basically...
A * x = b can be rewritten as x = A.I * b. You can get the pseudo-inverse of A 
using SVD (Spark MLLib supports SVD [1]). So the SVD decomposition would make A 
a product of three other matrices.
A = U * S * V.T
and the pseudo-inverse can be written as:
A.I = V * S * U.T
Then x can be found by multiplying A.I with b.
-sujit
[1] https://spark.apache.org/docs/1.2.0/mllib-dimensionality-reduction.html[2] 
http://math.stackexchange.com/questions/458404/how-can-we-compute-pseudoinverse-for-any-matrix

On Fri, Oct 23, 2015 at 2:19 AM, Zhiliang Zhu  wrote:

Hi Sujit, and All,
Currently I lost in large difficulty, I am eager to get some help from you.
There is some big linear system of equations as:Ax = b,  A with N number of row 
and N number of column, N is very large, b = [0, 0, ..., 0, 1]TThen, I will 
sovle it to get x = [x1, x2, ..., xn]T.
The simple solution would be to get inverse(A), and then  x = (inverse(A)) * b 
.A would be some JavaRDD> , however, for RDD/matrix there is 
add/multply/transpose APIs, no inverse API for it!
Then, how would it conveniently get inverse(A), or just solve the linear system 
of equations by some other way...In Spark MLlib, there was linear regression, 
the training process might be to solve the coefficients to get some specific 
linear model, just is, 
Ax = y, just train by (x, y) to get A , this might be used to solve the linear 
system of equations. It is like that? I could not decide.
I must show my deep appreciation torwards your all help.
Thank you very much!Zhiliang





  

How to implement zipWithIndex as a UDF?

2015-10-23 Thread Benyi Wang
If I have two columns

StructType(Seq(
  StructField("id", LongType),
  StructField("phones", ArrayType(StringType

I want to add index for “phones” before I explode it.

Can this be implemented as GenericUDF?

I tried DataFrame.explode. It worked for simple types like string, but I
could not figure out how to handle a nested type like StructType.

Can somebody shed a light?

I’m using spark 1.5.1.
​


Re: Saprk error:- Not a valid DFS File name

2015-10-23 Thread pratik khadloya
Check what you have at SimpleMktDataFlow.scala:106

~Pratik

On Fri, Oct 23, 2015 at 11:47 AM kali.tumm...@gmail.com <
kali.tumm...@gmail.com> wrote:

> Full Error:-
> at
>
> org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:195)
> at
>
> org.apache.hadoop.hdfs.DistributedFileSystem.access$000(DistributedFileSystem.java:104)
> at
>
> org.apache.hadoop.hdfs.DistributedFileSystem$16.doCall(DistributedFileSystem.java:831)
> at
>
> org.apache.hadoop.hdfs.DistributedFileSystem$16.doCall(DistributedFileSystem.java:827)
> at
>
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
> at
>
> org.apache.hadoop.hdfs.DistributedFileSystem.mkdirsInternal(DistributedFileSystem.java:827)
> at
>
> org.apache.hadoop.hdfs.DistributedFileSystem.mkdirs(DistributedFileSystem.java:820)
> at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1817)
> at
>
> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.setupJob(FileOutputCommitter.java:305)
> at
>
> org.apache.hadoop.mapred.FileOutputCommitter.setupJob(FileOutputCommitter.java:131)
> at
> org.apache.spark.SparkHadoopWriter.preSetup(SparkHadoopWriter.scala:64)
> at
>
> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1046)
> at
>
> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:941)
> at
>
> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:850)
> at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1164)
> at
> com.citi.ocean.spark.SimpleMktDataFlow$.main(SimpleMktDataFlow.scala:106)
> at
> com.citi.ocean.spark.SimpleMktDataFlow.main(SimpleMktDataFlow.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
>
> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:427)
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Saprk-error-Not-a-valid-DFS-File-name-tp25186p25188.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Stream are not serializable

2015-10-23 Thread pratik khadloya
You might be referring to some class level variables from your code.
I got to see the actual field which caused the error when i marked the
class as serializable and run it on cluster.

class MyClass extends java.io.Serializable

The following resources will also help:
https://youtu.be/mHF3UPqLOL8?t=54m57s
http://stackoverflow.com/questions/22592811/task-not-serializable-java-io-notserializableexception-when-calling-function-ou

~Pratik

On Fri, Oct 23, 2015 at 10:30 AM Ted Yu  wrote:

> Mind sharing your code, if possible ?
>
> Thanks
>
> On Fri, Oct 23, 2015 at 9:49 AM, crakjie  wrote:
>
>> Hello.
>>
>> I have activated the file checkpointing for a DStream to unleach the
>> updateStateByKey.
>> My unit test worked with no problem but when I have integrated this in my
>> full stream I got this exception. :
>>
>> java.io.NotSerializableException: DStream checkpointing has been enabled
>> but
>> the DStreams with their functions are not serializable
>> Serialization stack:
>>
>> at
>>
>> org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:550)
>> at
>>
>> org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:587)
>> at
>>
>> org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:586)
>> at com.misterbell.shiva.StreamingApp$.main(StreamingApp.scala:196)
>> at com.misterbell.shiva.StreamingApp.main(StreamingApp.scala)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>>
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>>
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:497)
>> at
>>
>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664)
>> at
>> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169)
>> at
>> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192)
>> at
>> org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111)
>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>
>>
>> But this exception is not very clear about what part of my stream is not
>> serializable.
>>
>> I try to add
>>
>>
>> .set("spark.driver.extraJavaOptions","-Dsun.io.serialization.extendedDebugInfo=true")
>>
>> .set("spark.executor.extraJavaOptions","-Dsun.io.serialization.extendedDebugInfo=true")
>>
>> to my spark conf to have more information, but it changes nothing ( it
>> should )
>>
>> So how can I find which function or part of my stream is not serializable?
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Stream-are-not-serializable-tp25185.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Saprk error:- Not a valid DFS File name

2015-10-23 Thread kali.tumm...@gmail.com
Full Error:-
at
org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:195)
at
org.apache.hadoop.hdfs.DistributedFileSystem.access$000(DistributedFileSystem.java:104)
at
org.apache.hadoop.hdfs.DistributedFileSystem$16.doCall(DistributedFileSystem.java:831)
at
org.apache.hadoop.hdfs.DistributedFileSystem$16.doCall(DistributedFileSystem.java:827)
at
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at
org.apache.hadoop.hdfs.DistributedFileSystem.mkdirsInternal(DistributedFileSystem.java:827)
at
org.apache.hadoop.hdfs.DistributedFileSystem.mkdirs(DistributedFileSystem.java:820)
at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1817)
at
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.setupJob(FileOutputCommitter.java:305)
at
org.apache.hadoop.mapred.FileOutputCommitter.setupJob(FileOutputCommitter.java:131)
at 
org.apache.spark.SparkHadoopWriter.preSetup(SparkHadoopWriter.scala:64)
at
org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1046)
at
org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:941)
at
org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:850)
at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1164)
at
com.citi.ocean.spark.SimpleMktDataFlow$.main(SimpleMktDataFlow.scala:106)
at com.citi.ocean.spark.SimpleMktDataFlow.main(SimpleMktDataFlow.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:427)




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Saprk-error-Not-a-valid-DFS-File-name-tp25186p25188.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Saprk error:- Not a valid DFS File name

2015-10-23 Thread pratik khadloya
I had face a similar issue. The actual problem was not in the file name.
We run Spark on Yarn. The actual problem was seen in the logs by running
the command:
$ yarn logs -applicationId 

Scroll from the beginning to know the actual error.

~Pratik

On Fri, Oct 23, 2015 at 11:40 AM kali.tumm...@gmail.com <
kali.tumm...@gmail.com> wrote:

> Hi All,
>
> got this weird error when I tried to run spark on YARN-CLUSTER mode , I
> have
> 33 files and I am looping spark in bash one by one most of them worked ok
> except few files.
>
> Is this below error HDFS or spark error ?
>
> Exception in thread "Driver" java.lang.IllegalArgumentException: Pathname
> /user/myid/-u/12:51/_temporary/0 from
> hdfs://dev/user/myid/-u/12:51/_temporary/0 is not a valid DFS filename.
>
> File Name which I passed to spark , does file name causes issue ?
>
>
> hdfs://dev/data/20151019/sipmktdata.ColorDataArchive.UTD.P4_M-P.v5.2015-09-18.txt.20150918
>
> Thanks
> Sri
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Saprk-error-Not-a-valid-DFS-File-name-tp25186.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Saprk error:- Not a valid DFS File name

2015-10-23 Thread kali.tumm...@gmail.com
Hi All, 

got this weird error when I tried to run spark on YARN-CLUSTER mode , I have
33 files and I am looping spark in bash one by one most of them worked ok
except few files.

Is this below error HDFS or spark error ? 

Exception in thread "Driver" java.lang.IllegalArgumentException: Pathname
/user/myid/-u/12:51/_temporary/0 from
hdfs://dev/user/myid/-u/12:51/_temporary/0 is not a valid DFS filename.

File Name which I passed to spark , does file name causes issue ?

hdfs://dev/data/20151019/sipmktdata.ColorDataArchive.UTD.P4_M-P.v5.2015-09-18.txt.20150918

Thanks
Sri 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Saprk-error-Not-a-valid-DFS-File-name-tp25186.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Huge shuffle data size

2015-10-23 Thread pratik khadloya
Sorry i sent the wrong join code snippet, the actual snippet is

ggImpsDf.join(
   aggRevenueDf,
   aggImpsDf("id_1") <=> aggRevenueDf("id_1")
 && aggImpsDf("id_2") <=> aggRevenueDf("id_2")
 && aggImpsDf("day_hour") <=> aggRevenueDf("day_hour")
 && aggImpsDf("day_hour_2") <=> aggRevenueDf("day_hour_2"),
   "inner")
   .select(
 aggImpsDf("id_1"), aggImpsDf("id_2"), aggImpsDf("day_hour"),
 aggImpsDf("day_hour_2"), aggImpsDf("metric1"), aggRevenueDf("metric2"))
   .coalesce(200)


On Fri, Oct 23, 2015 at 11:16 AM pratik khadloya 
wrote:

> Hello,
>
> Data about my spark job is below. My source data is only 916MB (stage 0)
> and 231MB (stage 1), but when i join the two data sets (stage 2) it takes a
> very long time and as i see the shuffled data is 614GB. Is this something
> expected? Both the data sets produce 200 partitions.
>
> Stage IdDescriptionSubmittedDurationTasks: Succeeded/TotalInputOutputShuffle
> ReadShuffle Write2saveAsTable at Driver.scala:269
> 
> +details
>
> 2015/10/22 18:48:122.3 h
> 200/200
> 614.6 GB1saveAsTable at Driver.scala:269
> 
> +details
>
> 2015/10/22 18:46:022.1 min
> 8/8
> 916.2 MB3.9 MB0saveAsTable at Driver.scala:269
> 
> +details
>
> 2015/10/22 18:46:0235 s
> 3/3
> 231.2 MB4.8 MBAm running Spark 1.4.1 and my code snippet which joins the
> two data sets is:
>
> hc.sql(query).
> mapPartitions(iter => {
>   iter.map {
> case Row(
>  ...
>  ...
>  ...
> )
>   }
> }
> ).toDF()
> .groupBy("id_1", "id_2", "day_hour", "day_hour_2")
> .agg($"id_1", $"id_2", $"day_hour", $"day_hour_2",
>   sum("attr1").alias("attr1"), sum("attr2").alias("attr2"))
>
>
> Please advise on how to reduce the shuffle and speed this up.
>
>
> ~Pratik
>
>


spark.python.worker.memory Discontinuity

2015-10-23 Thread Connor Zanin
Hi all,

I am running a simple word count job on a cluster of 4 nodes (24 cores per
node). I am varying two parameter in the configuration,
spark.python.worker.memory and the number of partitions in the RDD. My job
is written in python.

I am observing a discontinuity in the run time of the job when the
spark.python.worker.memory is increased past a threshold. Unfortunately, I
am having trouble understanding exactly what this parameter is doing to
Spark internally and how it changes Spark's behavior to create this
discontinuity.

The documentation describes this parameter as "Amount of memory to use per
python worker process during aggregation," but I find this is vague (or I
do not know enough Spark terminology to know what it means).

I have been pointed to the source code in the past, specifically the
shuffle.py file where _spill() appears.

Can anyone explain how this parameter behaves or point me to more
descriptive documentation? Thanks!

-- 
Regards,

Connor Zanin
Computer Science
University of Delaware


Huge shuffle data size

2015-10-23 Thread pratik khadloya
Hello,

Data about my spark job is below. My source data is only 916MB (stage 0)
and 231MB (stage 1), but when i join the two data sets (stage 2) it takes a
very long time and as i see the shuffled data is 614GB. Is this something
expected? Both the data sets produce 200 partitions.

Stage IdDescriptionSubmittedDurationTasks: Succeeded/TotalInputOutputShuffle
ReadShuffle Write2saveAsTable at Driver.scala:269

+details

2015/10/22 18:48:122.3 h
200/200
614.6 GB1saveAsTable at Driver.scala:269

+details

2015/10/22 18:46:022.1 min
8/8
916.2 MB3.9 MB0saveAsTable at Driver.scala:269

+details

2015/10/22 18:46:0235 s
3/3
231.2 MB4.8 MBAm running Spark 1.4.1 and my code snippet which joins the
two data sets is:

hc.sql(query).
mapPartitions(iter => {
  iter.map {
case Row(
 ...
 ...
 ...
)
  }
}
).toDF()
.groupBy("id_1", "id_2", "day_hour", "day_hour_2")
.agg($"id_1", $"id_2", $"day_hour", $"day_hour_2",
  sum("attr1").alias("attr1"), sum("attr2").alias("attr2"))


Please advise on how to reduce the shuffle and speed this up.


~Pratik


Re: Stream are not serializable

2015-10-23 Thread Ted Yu
Mind sharing your code, if possible ?

Thanks

On Fri, Oct 23, 2015 at 9:49 AM, crakjie  wrote:

> Hello.
>
> I have activated the file checkpointing for a DStream to unleach the
> updateStateByKey.
> My unit test worked with no problem but when I have integrated this in my
> full stream I got this exception. :
>
> java.io.NotSerializableException: DStream checkpointing has been enabled
> but
> the DStreams with their functions are not serializable
> Serialization stack:
>
> at
>
> org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:550)
> at
>
> org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:587)
> at
>
> org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:586)
> at com.misterbell.shiva.StreamingApp$.main(StreamingApp.scala:196)
> at com.misterbell.shiva.StreamingApp.main(StreamingApp.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at
>
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664)
> at
> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169)
> at
> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
>
> But this exception is not very clear about what part of my stream is not
> serializable.
>
> I try to add
>
>
> .set("spark.driver.extraJavaOptions","-Dsun.io.serialization.extendedDebugInfo=true")
>
> .set("spark.executor.extraJavaOptions","-Dsun.io.serialization.extendedDebugInfo=true")
>
> to my spark conf to have more information, but it changes nothing ( it
> should )
>
> So how can I find which function or part of my stream is not serializable?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Stream-are-not-serializable-tp25185.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Stream are not serializable

2015-10-23 Thread crakjie
Hello.

I have activated the file checkpointing for a DStream to unleach the
updateStateByKey.
My unit test worked with no problem but when I have integrated this in my
full stream I got this exception. : 

java.io.NotSerializableException: DStream checkpointing has been enabled but
the DStreams with their functions are not serializable
Serialization stack:

at
org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:550)
at
org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:587)
at
org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:586)
at com.misterbell.shiva.StreamingApp$.main(StreamingApp.scala:196)
at com.misterbell.shiva.StreamingApp.main(StreamingApp.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664)
at 
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 


But this exception is not very clear about what part of my stream is not
serializable.

I try to add 

.set("spark.driver.extraJavaOptions","-Dsun.io.serialization.extendedDebugInfo=true")
.set("spark.executor.extraJavaOptions","-Dsun.io.serialization.extendedDebugInfo=true")

to my spark conf to have more information, but it changes nothing ( it
should ) 

So how can I find which function or part of my stream is not serializable? 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Stream-are-not-serializable-tp25185.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to get inverse Matrix / RDD or how to solve linear system of equations

2015-10-23 Thread Sujit Pal
Hi Zhiliang,

For a system of equations AX = y, Linear Regression will give you a
best-fit estimate for A (coefficient vector) for a matrix of feature
variables X and corresponding target variable y for a subset of your data.
OTOH, what you are looking for here is to solve for x a system of equations
Ax = b, where A and b are known and you want the vector x.

This Math Stackexchange page [2] explains the math in more detail, but
basically...

A * x = b can be rewritten as x = A.I * b.
You can get the pseudo-inverse of A using SVD (Spark MLLib supports SVD
[1]). So the SVD decomposition would make A a product of three other
matrices.

A = U * S * V.T

and the pseudo-inverse can be written as:

A.I = V * S * U.T

Then x can be found by multiplying A.I with b.

-sujit

[1] https://spark.apache.org/docs/1.2.0/mllib-dimensionality-reduction.html
[2]
http://math.stackexchange.com/questions/458404/how-can-we-compute-pseudoinverse-for-any-matrix


On Fri, Oct 23, 2015 at 2:19 AM, Zhiliang Zhu  wrote:

> Hi Sujit, and All,
>
> Currently I lost in large difficulty, I am eager to get some help from you.
>
> There is some big linear system of equations as:
> Ax = b,  A with N number of row and N number of column, N is very large, b
> = [0, 0, ..., 0, 1]T
> Then, I will sovle it to get x = [x1, x2, ..., xn]T.
>
> The simple solution would be to get inverse(A), and then  x = (inverse(A))
> * b .
> A would be some JavaRDD> , however, for RDD/matrix there
> is add/multply/transpose APIs, no inverse API for it!
>
> Then, how would it conveniently get inverse(A), or just solve the linear
> system of equations by some other way...
> In Spark MLlib, there was linear regression, the training process might be
> to solve the coefficients to get some specific linear model, just is,
> Ax = y, just train by (x, y) to get A , this might be used to solve the
> linear system of equations. It is like that? I could not decide.
>
> I must show my deep appreciation torwards your all help.
>
> Thank you very much!
> Zhiliang
>
>
>


Re: Large number of conf broadcasts

2015-10-23 Thread Koert Kuipers
https://github.com/databricks/spark-avro/pull/95

On Fri, Oct 23, 2015 at 5:01 AM, Koert Kuipers  wrote:

> oh no wonder... it undoes the glob (i was reading from /some/path/*),
> creates a hadoopRdd for every path, and then creates a union of them using
> UnionRDD.
>
> thats not what i want... no need to do union. AvroInpuFormat already has
> the ability to handle globs (or multiple paths comma separated) very
> efficiently. AvroRelation should just pass the paths (comma separated).
>
>
>
>
> On Thu, Oct 22, 2015 at 1:37 PM, Anders Arpteg  wrote:
>
>> Yes, seems unnecessary. I actually tried patching the
>> com.databricks.spark.avro reader to only broadcast once per dataset,
>> instead of every single file/partition. It seems to work just as fine, and
>> there are significantly less broadcasts and not seeing out of memory issues
>> any more. Strange that more people does not react to this, since the
>> broadcasting seems completely unnecessary...
>>
>> Best,
>> Anders
>>
>>
>> On Thu, Oct 22, 2015 at 7:03 PM Koert Kuipers  wrote:
>>
>>> i am seeing the same thing. its gona completely crazy creating
>>> broadcasts for the last 15 mins or so. killing it...
>>>
>>> On Thu, Sep 24, 2015 at 1:24 PM, Anders Arpteg 
>>> wrote:
>>>
 Hi,

 Running spark 1.5.0 in yarn-client mode, and am curios in why there are
 so many broadcast being done when loading datasets with large number of
 partitions/files. Have datasets with thousands of partitions, i.e. hdfs
 files in the avro folder, and sometime loading hundreds of these large
 datasets. Believe I have located the broadcast to line
 SparkContext.scala:1006. It seems to just broadcast the hadoop
 configuration, and I don't see why it should be necessary to broadcast that
 for EVERY file? Wouldn't it be possible to reuse the same broadcast
 configuration? It hardly the case the the configuration would be different
 between each file in a single dataset. Seems to be wasting lots of memory
 and needs to persist unnecessarily to disk (see below again).

 Thanks,
 Anders

 15/09/24 17:11:11 INFO BlockManager: Writing block
 broadcast_1871_piece0 to disk
  [19/49086]15/09/24 17:11:11 INFO BlockManagerInfo: Added
 broadcast_1871_piece0 on disk on 10.254.35.24:49428 (size: 23.1 KB)
 15/09/24 17:11:11 INFO MemoryStore: Block broadcast_4803_piece0 stored
 as bytes in memory (estimated size 23.1 KB, free 2.4 KB)
 15/09/24 17:11:11 INFO BlockManagerInfo: Added broadcast_4803_piece0 in
 memory on 10.254.35.24:49428 (size: 23.1 KB, free: 464.0 MB)
 15/09/24 17:11:11 INFO SpotifySparkContext: Created broadcast 4803 from
 hadoopFile at AvroRelation.scala:121
 15/09/24 17:11:11 WARN MemoryStore: Failed to reserve initial memory
 threshold of 1024.0 KB for computing block broadcast_4804 in memory
 .
 15/09/24 17:11:11 WARN MemoryStore: Not enough space to cache
 broadcast_4804 in memory! (computed 496.0 B so far)
 15/09/24 17:11:11 INFO MemoryStore: Memory use = 530.3 MB (blocks) +
 0.0 B (scratch space shared across 0 tasks(s)) = 530.3 MB. Storage
 limit = 530.3 MB.
 15/09/24 17:11:11 WARN MemoryStore: Persisting block broadcast_4804 to
 disk instead.
 15/09/24 17:11:11 INFO MemoryStore: ensureFreeSpace(23703) called with
 curMem=556036460, maxMem=556038881
 15/09/24 17:11:11 INFO MemoryStore: 1 blocks selected for dropping
 15/09/24 17:11:11 INFO BlockManager: Dropping block
 broadcast_1872_piece0 from memory
 15/09/24 17:11:11 INFO BlockManager: Writing block
 broadcast_1872_piece0 to disk


>>>
>>>
>


Re: Spark 1.5 on CDH 5.4.0

2015-10-23 Thread Deenar Toraskar
I got this working. For others trying this It turns out in Spark 1.3/CDH5.4

spark.yarn.jar=local:/opt/cloudera/parcels/

I had changed this to reflect the 1.5.1 version of spark assembly jar

spark.yarn.jar=/opt/spark-1.5.1-bin/...

and this didn't work, I had to drop the "local:" prefix

spark.yarn.jar=/opt/spark-1.5.1-bin/...

Regards
Deenar

On 23 October 2015 at 17:30, Deenar Toraskar <
deenar.toras...@thinkreactive.co.uk> wrote:

> I got this working. For others trying this It turns out in Spark 1.3/CDH5.4
>
> spark.yarn.jar=local:/opt/cloudera/parcels/
>
> I had changed this to reflect the 1.5.1 version of spark assembly jar
>
> spark.yarn.jar=/opt/spark-1.5.1-bin/...
>
> and this didn't work, I had to drop the "local:" prefix
>
> spark.yarn.jar=/opt/spark-1.5.1-bin/...
>
> Regards
> Deenar
>
>
>
>
> *Think Reactive Ltd*
> deenar.toras...@thinkreactive.co.uk
> 07714140812
>
>
>
> On 23 October 2015 at 13:34, Deenar Toraskar 
> wrote:
>
>> Sandy
>>
>> The assembly jar does contain org.apache.spark.deploy.yarn.ExecutorLauncher.
>> I am trying to find out how i can increase the logging level, so I know the
>> exact classpath used by Yarn ContainerLaunch.
>>
>> Deenar
>>
>> On 23 October 2015 at 03:30, Sandy Ryza  wrote:
>>
>>> Hi Deenar,
>>>
>>> The version of Spark you have may not be compiled with YARN support.  If
>>> you inspect the contents of the assembly jar, does
>>> org.apache.spark.deploy.yarn.ExecutorLauncher exist?  If not, you'll
>>> need to find a version that does have the YARN classes.  You can also build
>>> your own using the -Pyarn flag.
>>>
>>> -Sandy
>>>
>>> On Thu, Oct 22, 2015 at 9:04 AM, Deenar Toraskar <
>>> deenar.toras...@gmail.com> wrote:
>>>
 Hi I have got the prebuilt version of Spark 1.5 for Hadoop 2.6 (
 http://www.apache.org/dyn/closer.lua/spark/spark-1.5.1/spark-1.5.1-bin-hadoop2.6.tgz)
 working with CDH 5.4.0 in local mode on a cluster with Kerberos. It works
 well including connecting to the Hive metastore. I am facing an issue
 running spark jobs in yarn-client/yarn-cluster mode. The executors fail to
 start as java cannot find ExecutorLauncher. Error: Could not find or
 load main class org.apache.spark.deploy.yarn.ExecutorLauncher client
 token: N/Adiagnostics: Application application_1443531450011_13437
 failed 2 times due to AM Container for
 appattempt_1443531450011_13437_02 exited with exitCode: 1Stack
 trace: ExitCodeException exitCode=1:at
 org.apache.hadoop.util.Shell.runCommand(Shell.java:538)at
 org.apache.hadoop.util.Shell.run(Shell.java:455)at
 org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:715)at
 org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor.launchContainer(LinuxContainerExecutor.java:293)at
 org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)at
 org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)at
 java.util.concurrent.FutureTask.run(FutureTask.java:262)at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)at
 java.lang.Thread.run(Thread.java:745) Any ideas as to what might be
 going wrong. Also how can I turn on more detailed logging to see what
 command line is being run by Yarn to launch containers? RegardsDeenar

>>>
>>>
>>
>


Re: Running 2 spark application in parallel

2015-10-23 Thread Debasish Das
You can run 2 threads in driver and spark will fifo schedule the 2 jobs on
the same spark context you created (executors and cores)...same idea is
used for spark sql thriftserver flow...

For streaming i think it lets you run only one stream at a time even if you
run them on multiple threads on driver...have to double check...
On Oct 22, 2015 11:41 AM, "Simon Elliston Ball" 
wrote:

> If yarn has capacity to run both simultaneously it will. You should ensure
> you are not allocating too many executors for the first app and leave some
> space for the second)
>
> You may want to run the application on different yarn queues to control
> resource allocation. If you run as a different user within the same queue
> you should also get an even split between the applications, however you may
> need to enable preemption to ensure the first doesn't just hog the queue.
>
> Simon
>
> On 22 Oct 2015, at 19:20, Suman Somasundar 
> wrote:
>
> Hi all,
>
>
>
> Is there a way to run 2 spark applications in parallel under Yarn in the
> same cluster?
>
>
>
> Currently, if I submit 2 applications, one of them waits till the other
> one is completed.
>
>
>
> I want both of them to start and run at the same time.
>
>
>
> Thanks,
> Suman.
>
>


Re: How to close connection in mapPartitions?

2015-10-23 Thread Sujit Pal
Hi Bin,

Very likely the RedisClientPool is being closed too quickly before map has
a chance to get to it. One way to verify would be to comment out the .close
line and see what happens. FWIW I saw a similar problem writing to Solr
where I put a commit where you have a close, and noticed that the commit
was happening before the actual data insertion (in the .map line) happened
(and no data showing up in the index until the next time I ran the code
:-)).

At the time I got around it by doing a zipWithIndex on the Iterator, then
doing a partial commit every n records, and finally doing a commit from the
driver code. However, this won't work for you, and there is a better way
outlined on this page (look for Tobias Pfeiffer, its the code block
immediately following):

http://blog.cloudera.com/blog/2014/09/how-to-translate-from-mapreduce-to-apache-spark/

where you test for hasNext on the iterator and call close if its the last
element, within the scope of the .map call.

-sujit

On Thu, Oct 22, 2015 at 11:32 PM, Aniket Bhatnagar <
aniket.bhatna...@gmail.com> wrote:

> Are you sure RedisClientPool is being initialized properly in the
> constructor of RedisCache? Can you please copy paste the code that you use
> to initialize RedisClientPool inside the constructor of RedisCache?
>
> Thanks,
> Aniket
>
> On Fri, Oct 23, 2015 at 11:47 AM Bin Wang  wrote:
>
>> BTW, "lines" is a DStream.
>>
>> Bin Wang 于2015年10月23日周五 下午2:16写道:
>>
>>> I use mapPartitions to open connections to Redis, I write it like this:
>>>
>>> val seqs = lines.mapPartitions { lines =>
>>>   val cache = new RedisCache(redisUrl, redisPort)
>>>   val result = lines.map(line => Parser.parseBody(line, cache))
>>>   cache.redisPool.close
>>>   result
>>> }
>>>
>>> But it seems the pool is closed before I use it. Am I doing anything
>>> wrong? Here is the error:
>>>
>>> java.lang.IllegalStateException: Pool not open
>>> at 
>>> org.apache.commons.pool.BaseObjectPool.assertOpen(BaseObjectPool.java:140)
>>> at 
>>> org.apache.commons.pool.impl.StackObjectPool.borrowObject(StackObjectPool.java:166)
>>> at com.redis.RedisClientPool.withClient(Pool.scala:34)
>>> at com.appadhoc.data.cache.RedisCache.getExpId(RedisCache.scala:17)
>>> at 
>>> com.appadhoc.data.parser.Parser$$anonfun$parseBody$1.apply(Parser.scala:29)
>>> at 
>>> com.appadhoc.data.parser.Parser$$anonfun$parseBody$1.apply(Parser.scala:26)
>>> at 
>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>> at 
>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>> at scala.collection.immutable.List.foreach(List.scala:318)
>>> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>>> at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>>> at com.appadhoc.data.parser.Parser$.parseBody(Parser.scala:26)
>>> at 
>>> com.appadhoc.data.main.StatCounter$$anonfun$2$$anonfun$3.apply(StatCounter.scala:33)
>>> at 
>>> com.appadhoc.data.main.StatCounter$$anonfun$2$$anonfun$3.apply(StatCounter.scala:33)
>>> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>>> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>> at 
>>> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:209)
>>> at 
>>> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73)
>>> at 
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>>> at 
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>> at org.apache.spark.scheduler.Task.run(Task.scala:88)
>>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>>> at 
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>> at 
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>>


Improve parquet write speed to HDFS and spark.sql.execution.id is already set ERROR

2015-10-23 Thread morfious902002
I have a spark job that creates 6 million rows in RDDs. I convert the RDD
into Data-frame and write it to HDFS. Currently it takes 3 minutes to write
it to HDFS.
I am using spark 1.5.1 with YARN.

Here is the snippet:-
RDDList.parallelStream().forEach(mapJavaRDD -> {
if (mapJavaRDD != null) {
JavaRDD rowRDD =
mapJavaRDD.mapPartitionsWithIndex((integer, v2) -> {

return new ArrayList(1).iterator();
}, false);

DataFrame dF = sqlContext.createDataFrame(rowRDD,
schema).coalesce(3);
synchronized (finalLock) {
dF.write().mode(SaveMode.Append).parquet("hdfs
location");
}

});

After looking into the logs I know the following is the reason for the job
taking too long:-
dF.write().mode(SaveMode.Append).parquet("hdfs
location");

I also get the following errors due to it:-
15/10/21 21:12:30 WARN scheduler.TaskSetManager: Stage 31 contains a task of
very large size (378 KB). The maximum recommended task size is 100 KB.4 of
these kind of warnings appeared.

java.lang.IllegalArgumentException: java.lang.IllegalArgumentException:
spark.sql.execution.id is already set



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Improve-parquet-write-speed-to-HDFS-and-spark-sql-execution-id-is-already-set-ERROR-tp25184.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Multiple Spark Streaming Jobs on Single Master

2015-10-23 Thread gaurav sharma
Hi,

I created 2 workers on same machine each with 4 cores and 6GB ram

I submitted first job, and it allocated 2 cores on each of the worker
processes, and utilized full 4 GB ram for each executor process

When i submit my second job it always say in WAITING state.


Cheers!!



On Tue, Oct 20, 2015 at 10:46 AM, Tathagata Das  wrote:

> You can set the max cores for the first submitted job such that it does
> not take all the resources from the master. See
> http://spark.apache.org/docs/latest/submitting-applications.html
>
> # Run on a Spark standalone cluster in client deploy mode
> ./bin/spark-submit \
>   --class org.apache.spark.examples.SparkPi \
>   --master spark://207.184.161.138:7077 \
>   --executor-memory 20G \
>   *--total-executor-cores 100 \*
>   /path/to/examples.jar \
>   1000
>
>
> On Mon, Oct 19, 2015 at 4:26 PM, Augustus Hong 
> wrote:
>
>> Hi All,
>>
>> Would it be possible to run multiple spark streaming jobs on a single
>> master at the same time?
>>
>> I currently have one master node and several worker nodes in the
>> standalone mode, and I used spark-submit to submit multiple spark streaming
>> jobs.
>>
>> From what I observed, it seems like only the first submitted job would
>> get resources and run.  Jobs submitted afterwards will have the status
>> "Waiting", and will only run after the first one is finished or killed.
>>
>> I tried limiting each executor to only 1 core(each worker machine has 8
>> cores), but the same things happens that only one job will be run, even
>> though there are a lot of idle cores.
>>
>> Best,
>> Augustus
>>
>>
>>
>> --
>> [image: Branch Metrics mobile deep linking] * Augustus
>> Hong*
>>  Data Analytics | Branch Metrics
>>  m 650-391-3369 | e augus...@branch.io
>>
>
>


Improve parquet write speed to HDFS and spark.sql.execution.id is already set ERROR

2015-10-23 Thread Anubhav Agarwal
I have a spark job that creates 6 million rows in RDDs. I convert the RDD
into Data-frame and write it to HDFS. Currently it takes 3 minutes to write
it to HDFS.

Here is the snippet:-
RDDList.parallelStream().forEach(mapJavaRDD -> {
if (mapJavaRDD != null) {
JavaRDD rowRDD =
mapJavaRDD.mapPartitionsWithIndex((integer, v2) -> {

return new ArrayList(1).iterator();
}, false);

DataFrame dF = sqlContext.createDataFrame(rowRDD,
schema).coalesce(3);
synchronized (finalLock) {
dF.write().mode(SaveMode.Append).parquet("hdfs
location");
}

});

After looking into the logs I know the following is the reason for the job
taking too long:-
*dF.write().mode(SaveMode.Append).parquet("hdfs
location");*

I also get the following errors due to it:-
15/10/21 21:12:30 WARN scheduler.TaskSetManager: Stage 31 contains a task
of very large size (378 KB). The maximum recommended task size is 100 KB.4
of these kind of warnings appeared.

java.lang.IllegalArgumentException: java.lang.IllegalArgumentException:
spark.sql.execution.id is already set


How does Spark coordinate with Tachyon wrt data locality

2015-10-23 Thread Kinsella, Shane
Hi all,

I am looking into how Spark handles data locality wrt Tachyon. My main concern 
is how this is coordinated. Will it send a task based on a file loaded from 
Tachyon to a node that it knows has that file locally and how does it know 
which nodes has what?

Kind regards,
Shane
This email (including any attachments) is proprietary to Aspect Software, Inc. 
and may contain information that is confidential. If you have received this 
message in error, please do not read, copy or forward this message. Please 
notify the sender immediately, delete it from your system and destroy any 
copies. You may not further disclose or distribute this email or its 
attachments.


Re: Cannot start REPL shell since 1.4.0

2015-10-23 Thread emlyn
emlyn wrote
> 
> xjlin0 wrote
>> I cannot enter REPL shell in 1.4.0/1.4.1/1.5.0/1.5.1(with pre-built with
>> or without Hadoop or home compiled with ant or maven).  There was no
>> error message in v1.4.x, system prompt nothing.  On v1.5.x, once I enter
>> $SPARK_HOME/bin/pyspark or spark-shell, I got
>> 
>> Error: Could not find or load main class org.apache.spark.launcher.Main
> I have the same problem

In case anyone else has the same problem: I found that the problem only
occurred under my login, not under a new clean user. After some
investigation, I found that I had "GREP_OPTIONS='--color=always'" in my
environment, which was messing up the output of grep with colour codes. I
changed that to "GREP_OPTIONS='--color=auto'" and now it works.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Cannot-start-REPL-shell-since-1-4-0-tp24921p25182.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: [SPARK STREAMING] polling based operation instead of event based operation

2015-10-23 Thread Nipun Arora
Thanks for the suggestion.

1. Heartbeat:

As a matter of fact, the heartbeat solution is what I thought of as well.
However that needs to be outside spark-streaming.
Furthermore, it cannot be generalized to all spark applications. For, e.g.
I am doing several filtering operations before I reach the forEachRDD code
you see in my mail.
Which means that the hearbeat event that I push in the stream, needs to be
such that it passes the filtering operations, and can be used in my final
function.
Is there a better design pattern followed in Storm which can give me a
cleaner heartbeat solution?

2. Microbatching within spark:

It would be great if I can use this, I am quite positive that the function
is not called if there is no data. At the very least the filtering
functions before do not get called.
Each of the function is a callback, which is launched in a separate thread.
I attached a JavaRunTime Debugger to my program, and have run it with
breakpoints inside the map functions - (filter, flatmap etc.). My
observation is that these functions are only triggered if data is sent to
the event stream. I could be wrong... can anyone shed more light on this?

I'll try your suggestion on the
myStream.window(Seconds(10)).foreachRDD({..}). Will share the results with
the list :)


Thanks
Nipun




On Fri, Oct 23, 2015 at 4:05 AM Lars Albertsson 
wrote:

> There is a heartbeat stream pattern that you can use: Create a service
> (perhaps a thread in your driver) that pushes a heartbeat event to a
> different stream every N seconds. Consume that stream as well in your
> streaming application, and perform an action on every heartbeat.
>
> This has worked well in Storm, which differs in implementation, but it
> might work in Spark Streaming as well.
>
> Alternatively, since Spark internally uses microbatching, you might be
> able to take advantage of the existing batching. You can try to use
> foreachRDD on the stream, and see if your function gets called even
> when the microbatch is empty. In that case, assuming your batch size
> can add up to 10 seconds, you can implement heartbeat functionality
> internally. If you are very lucky,
> myStream.window(Seconds(10)).foreachRDD({ ... }) might even do it for
> you.
>
> If you do experiments along those lines, please share the results with the
> list!
>
> Regards,
>
> Lars Albertsson
>
>
>
> On Thu, Oct 22, 2015 at 10:48 PM, Nipun Arora 
> wrote:
> > Hi,
> > In general in spark stream one can do transformations ( filter, map
> etc.) or
> > output operations (collect, forEach) etc. in an event-driven pardigm...
> i.e.
> > the action happens only if a message is received.
> >
> > Is it possible to do actions every few seconds in a polling based
> fashion,
> > regardless if a new message has been received.
> >
> > In my use-case, I am filtering out a stream, and then do operations on
> this
> > filter streams. However, I would like to do operations on the data in the
> > stream every few seconds even if no message has been received in the
> stream.
> >
> > For example I have the following stream, where the violationChecking()
> > function is being called only when a micro-batch is finished. Essentially
> > this also means that I must receive a message in this stream to do
> > processing. Is there any way that I can do the same operation every 10
> > seconds or so? :
> >
> > sortedMessages.foreach(
> > new Function > Integer>>, Void>() {
> > @Override
> > public Void call(JavaRDD > Integer, Integer>> tuple5JavaRDD) throws Exception {
> > List>
> > list = tuple5JavaRDD.collect();
> > violationChecking(list);
> > return null;
> > }
> > }
> > );
> >
> >
> > Thanks,
> >
> > Nipun
>


Re: Maven build failed (Spark master)

2015-10-23 Thread Kayode Odeyemi
I saw this when I tested manually (without ./make-distribution)

Detected Maven Version: 3.2.2 is not in the allowed range 3.3.3.

So I simply upgraded maven to 3.3.3.

Resolved. Thanks

On Fri, Oct 23, 2015 at 3:17 PM, Sean Owen  wrote:

> This doesn't show the actual error output from Maven. I have a strong
> guess that you haven't set MAVEN_OPTS to increase the memory Maven can
> use.
>
> On Fri, Oct 23, 2015 at 6:14 AM, Kayode Odeyemi  wrote:
> > Hi,
> >
> > I can't seem to get a successful maven build. Please see command output
> > below:
> >
> > bash-3.2$ ./make-distribution.sh --name spark-latest --tgz --mvn mvn
> > -Dhadoop.version=2.7.0 -Phadoop-2.7 -Phive -Phive-thriftserver
> -DskipTests
> > clean package
> > +++ dirname ./make-distribution.sh
> > ++ cd .
> > ++ pwd
> > + SPARK_HOME=/usr/local/spark-latest
> > + DISTDIR=/usr/local/spark-latest/dist
> > + SPARK_TACHYON=false
> > + TACHYON_VERSION=0.7.1
> > + TACHYON_TGZ=tachyon-0.7.1-bin.tar.gz
> > +
> > TACHYON_URL=
> https://github.com/amplab/tachyon/releases/download/v0.7.1/tachyon-0.7.1-bin.tar.gz
> > + MAKE_TGZ=false
> > + NAME=none
> > + MVN=/usr/local/spark-latest/build/mvn
> > + ((  12  ))
> > + case $1 in
> > + NAME=spark-latest
> > + shift
> > + shift
> > + ((  10  ))
> > + case $1 in
> > + MAKE_TGZ=true
> > + shift
> > + ((  9  ))
> > + case $1 in
> > + MVN=mvn
> > + shift
> > + shift
> > + ((  7  ))
> > + case $1 in
> > + break
> > + '[' -z /Library/Java/JavaVirtualMachines/jdk1.8.0_20.jdk/Contents/Home
> ']'
> > + '[' -z /Library/Java/JavaVirtualMachines/jdk1.8.0_20.jdk/Contents/Home
> ']'
> > ++ command -v git
> > + '[' /usr/bin/git ']'
> > ++ git rev-parse --short HEAD
> > + GITREV=487d409
> > + '[' '!' -z 487d409 ']'
> > + GITREVSTRING=' (git revision 487d409)'
> > + unset GITREV
> > ++ command -v mvn
> > + '[' '!' /usr/bin/mvn ']'
> > ++ mvn help:evaluate -Dexpression=project.version -Dhadoop.version=2.7.0
> > -Phadoop-2.7 -Phive -Phive-thriftserver -DskipTests clean package
> > ++ grep -v INFO
> > ++ tail -n 1
> > + VERSION='[ERROR] [Help 1]
> > http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException'
> >
> > Same output error with JDK 7
> >
> > Appreciate your help.
> >
> >
>


Re: spark streaming failing to replicate blocks

2015-10-23 Thread Akhil Das
If you can reproduce, then i think you can open up a jira for this.

Thanks
Best Regards

On Fri, Oct 23, 2015 at 1:37 PM, Eugen Cepoi  wrote:

> When fixing the port to the same values as in the stack trace it works
> too. The network config of the slaves seems correct.
>
> Thanks,
> Eugen
>
> 2015-10-23 8:30 GMT+02:00 Akhil Das :
>
>> Mostly a network issue, you need to check your network configuration from
>> the aws console and make sure the ports are accessible within the cluster.
>>
>> Thanks
>> Best Regards
>>
>> On Thu, Oct 22, 2015 at 8:53 PM, Eugen Cepoi 
>> wrote:
>>
>>> Huh indeed this worked, thanks. Do you know why this happens, is that
>>> some known issue?
>>>
>>> Thanks,
>>> Eugen
>>>
>>> 2015-10-22 19:08 GMT+07:00 Akhil Das :
>>>
 Can you try fixing spark.blockManager.port to specific port and see if
 the issue exists?

 Thanks
 Best Regards

 On Mon, Oct 19, 2015 at 6:21 PM, Eugen Cepoi 
 wrote:

> Hi,
>
> I am running spark streaming 1.4.1 on EMR (AMI 3.9) over YARN.
> The job is reading data from Kinesis and the batch size is of 30s (I
> used the same value for the kinesis checkpointing).
> In the executor logs I can see every 5 seconds a sequence of
> stacktraces indicating that the block replication failed. I am using the
> default storage level MEMORY_AND_DISK_SER_2.
> WAL is not enabled nor checkpointing (the checkpoint dir is configured
> for the spark context but not for the streaming context).
>
> Here is an example of those logs for ip-10-63-160-18. They occur in
> every executor while trying to replicate to any other executor.
>
>
> 15/10/19 03:11:55 INFO nio.SendingConnection: Initiating connection to 
> [ip-10-63-160-18.ec2.internal/10.63.160.18:50929]
> 15/10/19 03:11:55 WARN nio.SendingConnection: Error finishing connection 
> to ip-10-63-160-18.ec2.internal/10.63.160.18:50929
> java.net.ConnectException: Connection refused
>   at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>   at 
> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
>   at 
> org.apache.spark.network.nio.SendingConnection.finishConnect(Connection.scala:344)
>   at 
> org.apache.spark.network.nio.ConnectionManager$$anon$10.run(ConnectionManager.scala:292)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>   at java.lang.Thread.run(Thread.java:745)
> 15/10/19 03:11:55 ERROR nio.ConnectionManager: Exception while sending 
> message.
> java.net.ConnectException: Connection refused
>   at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>   at 
> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
>   at 
> org.apache.spark.network.nio.SendingConnection.finishConnect(Connection.scala:344)
>   at 
> org.apache.spark.network.nio.ConnectionManager$$anon$10.run(ConnectionManager.scala:292)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>   at java.lang.Thread.run(Thread.java:745)
> 15/10/19 03:11:55 INFO nio.ConnectionManager: Notifying 
> ConnectionManagerId(ip-10-63-160-18.ec2.internal,50929)
> 15/10/19 03:11:55 INFO nio.ConnectionManager: Handling connection error 
> on connection to ConnectionManagerId(ip-10-63-160-18.ec2.internal,50929)
> 15/10/19 03:11:55 WARN storage.BlockManager: Failed to replicate 
> input-1-144524231 to BlockManagerId(3, ip-10-159-151-22.ec2.internal, 
> 50929), failure #0
> java.net.ConnectException: Connection refused
>   at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>   at 
> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
>   at 
> org.apache.spark.network.nio.SendingConnection.finishConnect(Connection.scala:344)
>   at 
> org.apache.spark.network.nio.ConnectionManager$$anon$10.run(ConnectionManager.scala:292)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>   at java.lang.Thread.run(Thread.java:745)
> 15/10/19 03:11:55 INFO nio.ConnectionManager: Removing SendingConnection 
> to ConnectionManagerId(ip-10-63-160-18.ec2.internal,50929)
> 15/10/19 03:11:55 INFO nio.SendingConnection: Initiating connection to 
> [ip-10-63-160-18.ec2.internal/10.63.160.18:39506]
> 15/10/19 03:11:55 WARN nio.SendingConnection: Error finishing connection 
> to ip-10-63-160-18.ec2.internal/10.63.160.18:39506
> java.net.ConnectException: Connection refused
>   at sun.nio.ch.SocketChannelIm

Re: Unable to build Spark 1.5, is build broken or can anyone successfully build?

2015-10-23 Thread Robineast
Both Spark 1.5 and 1.5.1 are released so it certainly shouldn't be a problem



-
Robin East 
Spark GraphX in Action Michael Malak and Robin East 
Manning Publications Co. 
http://www.manning.com/books/spark-graphx-in-action

--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Unable-to-build-Spark-1-5-is-build-broken-or-can-anyone-successfully-build-tp24513p25181.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: I don't understand what this sentence means."7.1 GB of 7 GB physical memory used"

2015-10-23 Thread Sean Owen
Spark asked YARN to let an executor use 7GB of memory, but it used
more so was killed. In each case you see that the exectuor memory plus
overhead equals the YARN allocation requested. What's the issue with
that?

On Fri, Oct 23, 2015 at 6:46 AM, JoneZhang  wrote:
> Here is the spark configure and error log
>
>
> 
> spark.dynamicAllocation.enabled true
> spark.shuffle.service.enabled   true
> spark.dynamicAllocation.minExecutors10
> spark.executor.cores1
> spark.executor.memory   6G
> spark.yarn.executor.memoryOverhead 1536
> spark.driver.memory 2G
> 15/10/23 17:37:13 Reporter WARN org.apache.spark.deploy.yarn.YarnAllocator>>
> Container killed by YARN for exceeding memory limits. 7.5 GB of 7.5 GB
> physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
> 15/10/23 17:37:13 Reporter WARN org.apache.spark.deploy.yarn.YarnAllocator>>
> Container killed by YARN for exceeding memory limits. 7.5 GB of 7.5 GB
> physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
> 15/10/23 17:37:13 Reporter WARN org.apache.spark.deploy.yarn.YarnAllocator>>
> Container killed by YARN for exceeding memory limits. 7.6 GB of 7.5 GB
> physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
>
> 
> spark.dynamicAllocation.enabled true
> spark.shuffle.service.enabled   true
> spark.dynamicAllocation.minExecutors10
> spark.executor.cores2
> spark.executor.memory   4G
> spark.yarn.executor.memoryOverhead 2048
> spark.driver.memory 2G
> 15/10/23 17:37:13 Reporter WARN org.apache.spark.deploy.yarn.YarnAllocator>>
> Container killed by YARN for exceeding memory limits. 6.2 GB of 6 GB
> physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
> 15/10/23 17:37:13 Reporter WARN org.apache.spark.deploy.yarn.YarnAllocator>>
> Container killed by YARN for exceeding memory limits. 6.0 GB of 6 GB
> physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
> 15/10/23 17:37:13 Reporter WARN org.apache.spark.deploy.yarn.YarnAllocator>>
> Container killed by YARN for exceeding memory limits. 6.3 GB of 6 GB
> physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
>
>
> 
> spark.dynamicAllocation.enabled true
> spark.shuffle.service.enabled   true
> spark.dynamicAllocation.minExecutors10
> spark.executor.cores2
> spark.executor.memory   4G
> spark.yarn.executor.memoryOverhead 3072
> spark.driver.memory 2G
> 15/10/23 21:15:10 main INFO org.apache.spark.deploy.yarn.YarnAllocator>>
> Will request 10 executor containers, each with 2 cores and 7168 MB memory
> including 3072 MB overhead
> ...
> 15/10/23 21:15:15 ContainerLauncher #1 INFO
> org.apache.spark.deploy.yarn.ExecutorRunnable>> Setting up executor with
> commands: List({{JAVA_HOME}}/bin/java, -server, -XX:OnOutOfMemoryError='kill
> %p', -Xms4096m, -Xmx4096m,
> '-Dlog4j.configuration=file:///data/home/sparkWithOutHive/conf/log4j.properties',
> '-Dhive.spark.log.dir=/data/home/sparkWithOutHive/logs/',
> '-Dlog4j.configuration=file:///data/home/sparkWithOutHive/conf/log4j.properties',
> -Djava.io.tmpdir={{PWD}}/tmp, '-Dspark.driver.port=41100',
> '-Dspark.history.ui.port=8080', '-Dspark.ui.port=0',
> -Dspark.yarn.app.container.log.dir=,
> org.apache.spark.executor.CoarseGrainedExecutorBackend, --driver-url,
> akka.tcp://sparkDriver@10.196.24.32:41100/user/CoarseGrainedScheduler,
> --executor-id, 2, --hostname, 10.119.91.207, --cores, 2, --app-id,
> application_1445484223147_0470, --user-class-path, file:$PWD/__app__.jar,
> 1>, /stdout, 2>, /stderr)
> ...
> 15/10/23 21:07:54 Reporter WARN org.apache.spark.deploy.yarn.YarnAllocator>>
> Container killed by YARN for exceeding memory limits. 7.1 GB of 7 GB
> physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
> 15/10/23 21:07:54 Reporter WARN org.apache.spark.deploy.yarn.YarnAllocator>>
> Container killed by YARN for exceeding memory limits. 7.0 GB of 7 GB
> physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
>
>
>
> I have two questions
> 1.-Xms4096m, -Xmx4096m, why not -Xms4096m, -Xmx4096m+3072m?
> 2."7.1 GB of 7 GB physical memory used" means compare  7.1  with 7, or
> compare  7.1  with something else?
>
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/I-don-t-understand-what-this-sentence-means-7-1-GB-of-7-GB-physical-memory-used-tp25180.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

--

I don't understand what this sentence means."7.1 GB of 7 GB physical memory used"

2015-10-23 Thread JoneZhang
Here is the spark configure and error log



spark.dynamicAllocation.enabled true
spark.shuffle.service.enabled   true
spark.dynamicAllocation.minExecutors10
spark.executor.cores1
spark.executor.memory   6G
spark.yarn.executor.memoryOverhead 1536
spark.driver.memory 2G
15/10/23 17:37:13 Reporter WARN org.apache.spark.deploy.yarn.YarnAllocator>>
Container killed by YARN for exceeding memory limits. 7.5 GB of 7.5 GB
physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
15/10/23 17:37:13 Reporter WARN org.apache.spark.deploy.yarn.YarnAllocator>>
Container killed by YARN for exceeding memory limits. 7.5 GB of 7.5 GB
physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
15/10/23 17:37:13 Reporter WARN org.apache.spark.deploy.yarn.YarnAllocator>>
Container killed by YARN for exceeding memory limits. 7.6 GB of 7.5 GB
physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.


spark.dynamicAllocation.enabled true
spark.shuffle.service.enabled   true
spark.dynamicAllocation.minExecutors10
spark.executor.cores2
spark.executor.memory   4G
spark.yarn.executor.memoryOverhead 2048
spark.driver.memory 2G
15/10/23 17:37:13 Reporter WARN org.apache.spark.deploy.yarn.YarnAllocator>>
Container killed by YARN for exceeding memory limits. 6.2 GB of 6 GB
physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
15/10/23 17:37:13 Reporter WARN org.apache.spark.deploy.yarn.YarnAllocator>>
Container killed by YARN for exceeding memory limits. 6.0 GB of 6 GB
physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
15/10/23 17:37:13 Reporter WARN org.apache.spark.deploy.yarn.YarnAllocator>>
Container killed by YARN for exceeding memory limits. 6.3 GB of 6 GB
physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.



spark.dynamicAllocation.enabled true
spark.shuffle.service.enabled   true
spark.dynamicAllocation.minExecutors10
spark.executor.cores2
spark.executor.memory   4G
spark.yarn.executor.memoryOverhead 3072
spark.driver.memory 2G
15/10/23 21:15:10 main INFO org.apache.spark.deploy.yarn.YarnAllocator>>
Will request 10 executor containers, each with 2 cores and 7168 MB memory
including 3072 MB overhead
...
15/10/23 21:15:15 ContainerLauncher #1 INFO
org.apache.spark.deploy.yarn.ExecutorRunnable>> Setting up executor with
commands: List({{JAVA_HOME}}/bin/java, -server, -XX:OnOutOfMemoryError='kill
%p', -Xms4096m, -Xmx4096m,
'-Dlog4j.configuration=file:///data/home/sparkWithOutHive/conf/log4j.properties',
'-Dhive.spark.log.dir=/data/home/sparkWithOutHive/logs/',
'-Dlog4j.configuration=file:///data/home/sparkWithOutHive/conf/log4j.properties',
-Djava.io.tmpdir={{PWD}}/tmp, '-Dspark.driver.port=41100',
'-Dspark.history.ui.port=8080', '-Dspark.ui.port=0',
-Dspark.yarn.app.container.log.dir=,
org.apache.spark.executor.CoarseGrainedExecutorBackend, --driver-url,
akka.tcp://sparkDriver@10.196.24.32:41100/user/CoarseGrainedScheduler,
--executor-id, 2, --hostname, 10.119.91.207, --cores, 2, --app-id,
application_1445484223147_0470, --user-class-path, file:$PWD/__app__.jar,
1>, /stdout, 2>, /stderr)
...
15/10/23 21:07:54 Reporter WARN org.apache.spark.deploy.yarn.YarnAllocator>>
Container killed by YARN for exceeding memory limits. 7.1 GB of 7 GB
physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
15/10/23 21:07:54 Reporter WARN org.apache.spark.deploy.yarn.YarnAllocator>>
Container killed by YARN for exceeding memory limits. 7.0 GB of 7 GB
physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.



I have two questions
1.-Xms4096m, -Xmx4096m, why not -Xms4096m, -Xmx4096m+3072m?
2."7.1 GB of 7 GB physical memory used" means compare  7.1  with 7, or
compare  7.1  with something else?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/I-don-t-understand-what-this-sentence-means-7-1-GB-of-7-GB-physical-memory-used-tp25180.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Cannot start REPL shell since 1.4.0

2015-10-23 Thread Emlyn Corrin
JAVA_HOME is unset.
I've also tried setting it with:
export JAVA_HOME=$(/usr/libexec/java_home)
which sets it to
"/Library/Java/JavaVirtualMachines/jdk1.8.0_31.jdk/Contents/Home" and I
still get the same problem.

On 23 October 2015 at 14:37, Jonathan Coveney  wrote:

> do you have JAVA_HOME set to a java 7 jdk?
>
> 2015-10-23 7:12 GMT-04:00 emlyn :
>
>> xjlin0 wrote
>> > I cannot enter REPL shell in 1.4.0/1.4.1/1.5.0/1.5.1(with pre-built with
>> > or without Hadoop or home compiled with ant or maven).  There was no
>> error
>> > message in v1.4.x, system prompt nothing.  On v1.5.x, once I enter
>> > $SPARK_HOME/bin/pyspark or spark-shell, I got
>> >
>> > Error: Could not find or load main class org.apache.spark.launcher.Main
>>
>> I have the same problem (on MacOS X Yosemite, all spark versions since
>> 1.4,
>> installed both with homebrew and downloaded manually). I've been trying to
>> start the pyspark shell, but it also fails in the same way for spark-shell
>> and spark-sql and spark-submit. I've narrowed it down to the following
>> line
>> in the spark-class script:
>>
>> done < <("$RUNNER" -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main
>> "$@")
>>
>> (where $RUNNER is "java" and $LAUNCH_CLASSPATH is
>>
>> "/usr/local/Cellar/apache-spark/1.5.1/libexec/lib/spark-assembly-1.5.1-hadoop2.6.0.jar",
>> which does exist and does contain the org.apache.spark.launcher.Main
>> class,
>> despite the message that it can't be found)
>>
>> If I run it manually, using:
>>
>> SPARK_HOME=/usr/local/Cellar/apache-spark/1.5.1/libexec java -cp
>>
>> /usr/local/Cellar/apache-spark/1.5.1/libexec/lib/spark-assembly-1.5.1-hadoop2.6.0.jar
>> org.apache.spark.launcher.Main org.apache.spark.deploy.SparkSubmit
>> pyspark-shell-main --name PySparkShell
>>
>> It runs without that error, and instead prints out (where "\0" is a nul
>> character):
>>
>> env\0PYSPARK_SUBMIT_ARGS="--name" "PySparkShell" "pyspark-shell"\0python\0
>>
>> I'm not really sure what to try next, maybe with this extra information
>> someone has an idea what's going wrong, and how to fix it.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Cannot-start-REPL-shell-since-1-4-0-tp24921p25176.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


-- 
*Emlyn Corrin*

Software Engineer | SwiftKey |
em...@swiftkey.com | www.swiftkey.com | @swiftkey
 | fb.com/swiftkey




SwiftKey and the SwiftKey logo are registered trade marks of TouchType Ltd,
a limited company registered in England and Wales, number 06671487

UK Headquarters: SwiftKey, 91-95 Southwark Bridge Road, London, SE1 0AX, UK

CONFIDENTIALITY NOTICE: The information in this e-mail is confidential and
privileged; it is intended for use solely by the individual or entity named
as the recipient hereof. Disclosure, copying, distribution, or use of the
contents of this e-mail by persons other than the intended recipient is
strictly prohibited and may violate applicable laws. If you have received
this e-mail in error, please delete the original message and notify us by
email immediately. Thank you. TouchType Ltd.


Re: Cannot start REPL shell since 1.4.0

2015-10-23 Thread Jonathan Coveney
do you have JAVA_HOME set to a java 7 jdk?

2015-10-23 7:12 GMT-04:00 emlyn :

> xjlin0 wrote
> > I cannot enter REPL shell in 1.4.0/1.4.1/1.5.0/1.5.1(with pre-built with
> > or without Hadoop or home compiled with ant or maven).  There was no
> error
> > message in v1.4.x, system prompt nothing.  On v1.5.x, once I enter
> > $SPARK_HOME/bin/pyspark or spark-shell, I got
> >
> > Error: Could not find or load main class org.apache.spark.launcher.Main
>
> I have the same problem (on MacOS X Yosemite, all spark versions since 1.4,
> installed both with homebrew and downloaded manually). I've been trying to
> start the pyspark shell, but it also fails in the same way for spark-shell
> and spark-sql and spark-submit. I've narrowed it down to the following line
> in the spark-class script:
>
> done < <("$RUNNER" -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main
> "$@")
>
> (where $RUNNER is "java" and $LAUNCH_CLASSPATH is
>
> "/usr/local/Cellar/apache-spark/1.5.1/libexec/lib/spark-assembly-1.5.1-hadoop2.6.0.jar",
> which does exist and does contain the org.apache.spark.launcher.Main class,
> despite the message that it can't be found)
>
> If I run it manually, using:
>
> SPARK_HOME=/usr/local/Cellar/apache-spark/1.5.1/libexec java -cp
>
> /usr/local/Cellar/apache-spark/1.5.1/libexec/lib/spark-assembly-1.5.1-hadoop2.6.0.jar
> org.apache.spark.launcher.Main org.apache.spark.deploy.SparkSubmit
> pyspark-shell-main --name PySparkShell
>
> It runs without that error, and instead prints out (where "\0" is a nul
> character):
>
> env\0PYSPARK_SUBMIT_ARGS="--name" "PySparkShell" "pyspark-shell"\0python\0
>
> I'm not really sure what to try next, maybe with this extra information
> someone has an idea what's going wrong, and how to fix it.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Cannot-start-REPL-shell-since-1-4-0-tp24921p25176.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Maven build failed (Spark master)

2015-10-23 Thread Sean Owen
This doesn't show the actual error output from Maven. I have a strong
guess that you haven't set MAVEN_OPTS to increase the memory Maven can
use.

On Fri, Oct 23, 2015 at 6:14 AM, Kayode Odeyemi  wrote:
> Hi,
>
> I can't seem to get a successful maven build. Please see command output
> below:
>
> bash-3.2$ ./make-distribution.sh --name spark-latest --tgz --mvn mvn
> -Dhadoop.version=2.7.0 -Phadoop-2.7 -Phive -Phive-thriftserver -DskipTests
> clean package
> +++ dirname ./make-distribution.sh
> ++ cd .
> ++ pwd
> + SPARK_HOME=/usr/local/spark-latest
> + DISTDIR=/usr/local/spark-latest/dist
> + SPARK_TACHYON=false
> + TACHYON_VERSION=0.7.1
> + TACHYON_TGZ=tachyon-0.7.1-bin.tar.gz
> +
> TACHYON_URL=https://github.com/amplab/tachyon/releases/download/v0.7.1/tachyon-0.7.1-bin.tar.gz
> + MAKE_TGZ=false
> + NAME=none
> + MVN=/usr/local/spark-latest/build/mvn
> + ((  12  ))
> + case $1 in
> + NAME=spark-latest
> + shift
> + shift
> + ((  10  ))
> + case $1 in
> + MAKE_TGZ=true
> + shift
> + ((  9  ))
> + case $1 in
> + MVN=mvn
> + shift
> + shift
> + ((  7  ))
> + case $1 in
> + break
> + '[' -z /Library/Java/JavaVirtualMachines/jdk1.8.0_20.jdk/Contents/Home ']'
> + '[' -z /Library/Java/JavaVirtualMachines/jdk1.8.0_20.jdk/Contents/Home ']'
> ++ command -v git
> + '[' /usr/bin/git ']'
> ++ git rev-parse --short HEAD
> + GITREV=487d409
> + '[' '!' -z 487d409 ']'
> + GITREVSTRING=' (git revision 487d409)'
> + unset GITREV
> ++ command -v mvn
> + '[' '!' /usr/bin/mvn ']'
> ++ mvn help:evaluate -Dexpression=project.version -Dhadoop.version=2.7.0
> -Phadoop-2.7 -Phive -Phive-thriftserver -DskipTests clean package
> ++ grep -v INFO
> ++ tail -n 1
> + VERSION='[ERROR] [Help 1]
> http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException'
>
> Same output error with JDK 7
>
> Appreciate your help.
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Maven build failed (Spark master)

2015-10-23 Thread Kayode Odeyemi
Hi,

I can't seem to get a successful maven build. Please see command output
below:

bash-3.2$ ./make-distribution.sh --name spark-latest --tgz --mvn mvn
-Dhadoop.version=2.7.0 -Phadoop-2.7 -Phive -Phive-thriftserver -DskipTests
clean package
+++ dirname ./make-distribution.sh
++ cd .
++ pwd
+ SPARK_HOME=/usr/local/spark-latest
+ DISTDIR=/usr/local/spark-latest/dist
+ SPARK_TACHYON=false
+ TACHYON_VERSION=0.7.1
+ TACHYON_TGZ=tachyon-0.7.1-bin.tar.gz
+ TACHYON_URL=
https://github.com/amplab/tachyon/releases/download/v0.7.1/tachyon-0.7.1-bin.tar.gz
+ MAKE_TGZ=false
+ NAME=none
+ MVN=/usr/local/spark-latest/build/mvn
+ ((  12  ))
+ case $1 in
+ NAME=spark-latest
+ shift
+ shift
+ ((  10  ))
+ case $1 in
+ MAKE_TGZ=true
+ shift
+ ((  9  ))
+ case $1 in
+ MVN=mvn
+ shift
+ shift
+ ((  7  ))
+ case $1 in
+ break
+ '[' -z /Library/Java/JavaVirtualMachines/jdk1.8.0_20.jdk/Contents/Home ']'
+ '[' -z /Library/Java/JavaVirtualMachines/jdk1.8.0_20.jdk/Contents/Home ']'
++ command -v git
+ '[' /usr/bin/git ']'
++ git rev-parse --short HEAD
+ GITREV=487d409
+ '[' '!' -z 487d409 ']'
+ GITREVSTRING=' (git revision 487d409)'
+ unset GITREV
++ command -v mvn
+ '[' '!' /usr/bin/mvn ']'
++ mvn help:evaluate -Dexpression=project.version -Dhadoop.version=2.7.0
-Phadoop-2.7 -Phive -Phive-thriftserver -DskipTests clean package
++ grep -v INFO
++ tail -n 1
+ VERSION='[ERROR] [Help 1]
http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException'

Same output error with JDK 7

Appreciate your help.


Re: Spark issue running jar on Linux vs Windows

2015-10-23 Thread Michael Lewis
Thanks for the advice. In my case it turned out to be two issues.

- use Java rather than Scala to launch the process, putting the core Scala libs 
on the class path.

- I needed a merge strategy of Concat for reference.conf files in my build.sbt

Regards,
Mike


> On 23 Oct 2015, at 01:00, Ted Yu  wrote:
> 
> RemoteActorRefProvider is in akka-remote_2.10-2.3.11.jar
> 
> jar tvf 
> ~/.m2/repository/com/typesafe/akka/akka-remote_2.10/2.3.11/akka-remote_2.10-2.3.11.jar
>  | grep RemoteActorRefProvi
>   1761 Fri May 08 16:13:02 PDT 2015 
> akka/remote/RemoteActorRefProvider$$anonfun$5.class
>   1416 Fri May 08 16:13:02 PDT 2015 
> akka/remote/RemoteActorRefProvider$$anonfun$6.class
> 
> Is the above jar on your classpath ?
> 
> Cheers
> 
>> On Thu, Oct 22, 2015 at 4:39 PM, Michael Lewis  wrote:
>> Hi,
>> 
>> I have a Spark driver process that I have built into a single ‘fat jar’ this 
>> runs fine,  in Cygwin, on my development machine,
>> I can run:
>>  
>> scala -cp my-fat-jar-1.0.0.jar com.foo.MyMainClass
>> 
>> this works fine, it will submit Spark job, they process, all good.
>> 
>> 
>> However, on Linux (all Jars Spark(1.4) and Scala version (2.10.5) being the 
>> same), I get this error:
>> 
>> 18:59:14.358 [Curator-QueueBuilder-2] ERROR o.a.c.f.r.queue.DistributedQueue 
>> - Exception processing queue item: queue-00
>> java.lang.NoSuchMethodException: 
>> akka.remote.RemoteActorRefProvider.(java.lang.String, 
>> akka.actor.ActorSystem$Settings, akka.event.EventStream, 
>> akka.actor.Scheduler, akka.act
>> at java.lang.Class.getConstructor0(Class.java:3082) ~[na:1.8.0_60]
>> at java.lang.Class.getDeclaredConstructor(Class.java:2178) 
>> ~[na:1.8.0_60]
>>  
>> i.e. No such method exception.  Can anyone suggest how I can fix this?
>> 
>> I’ve tried changing the scala to java and putting scale-lang on the class 
>> path, but this just generates new errors about missing akka configuration.
>> 
>> Given the identical jars and scala version - I’m not sure why I’m getting 
>> this error running driver on Linux.
>> 
>> Appreciate any help/pointers.
>> 
>> 
>> Thanks,
>> Mike Lewis
> 


Re: Spark 1.5 on CDH 5.4.0

2015-10-23 Thread Deenar Toraskar
Sandy

The assembly jar does contain org.apache.spark.deploy.yarn.ExecutorLauncher.
I am trying to find out how i can increase the logging level, so I know the
exact classpath used by Yarn ContainerLaunch.

Deenar

On 23 October 2015 at 03:30, Sandy Ryza  wrote:

> Hi Deenar,
>
> The version of Spark you have may not be compiled with YARN support.  If
> you inspect the contents of the assembly jar, does
> org.apache.spark.deploy.yarn.ExecutorLauncher exist?  If not, you'll need
> to find a version that does have the YARN classes.  You can also build your
> own using the -Pyarn flag.
>
> -Sandy
>
> On Thu, Oct 22, 2015 at 9:04 AM, Deenar Toraskar <
> deenar.toras...@gmail.com> wrote:
>
>> Hi I have got the prebuilt version of Spark 1.5 for Hadoop 2.6 (
>> http://www.apache.org/dyn/closer.lua/spark/spark-1.5.1/spark-1.5.1-bin-hadoop2.6.tgz)
>> working with CDH 5.4.0 in local mode on a cluster with Kerberos. It works
>> well including connecting to the Hive metastore. I am facing an issue
>> running spark jobs in yarn-client/yarn-cluster mode. The executors fail to
>> start as java cannot find ExecutorLauncher. Error: Could not find or
>> load main class org.apache.spark.deploy.yarn.ExecutorLauncher client
>> token: N/Adiagnostics: Application application_1443531450011_13437
>> failed 2 times due to AM Container for
>> appattempt_1443531450011_13437_02 exited with exitCode: 1Stack
>> trace: ExitCodeException exitCode=1:at
>> org.apache.hadoop.util.Shell.runCommand(Shell.java:538)at
>> org.apache.hadoop.util.Shell.run(Shell.java:455)at
>> org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:715)at
>> org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor.launchContainer(LinuxContainerExecutor.java:293)at
>> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)at
>> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)at
>> java.util.concurrent.FutureTask.run(FutureTask.java:262)at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)at
>> java.lang.Thread.run(Thread.java:745) Any ideas as to what might be
>> going wrong. Also how can I turn on more detailed logging to see what
>> command line is being run by Yarn to launch containers? RegardsDeenar
>>
>
>


Re: NoSuchMethodException : com.google.common.io.ByteStreams.limit

2015-10-23 Thread Steve Loughran

just try dropping in that Jar. Hadoop core ships with an out of date guava JAR 
to avoid breaking old code downstream, but 2.7.x is designed to work with later 
versions too (i.e. it has moved off any of the now-removed methods. See 
https://issues.apache.org/jira/browse/HADOOP-10101 for the specifics

On 23 Oct 2015, at 12:10, jinhong lu 
mailto:lujinho...@gmail.com>> wrote:

Hi, I run spark to write data to hbase, but found NoSuchMethodException:

15/10/23 18:45:21 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, 
dn18-formal.i.nease.net): 
java.lang.NoSuchMethodError: 
com.google.common.io.ByteStreams.limit(Ljava/io/InputStream;J)Ljava/io/InputStream;

I found guava.jar in hadoop/hbase dir and the version is 12.0, but 
com.google.common.io.ByteStreams.limit is since 14.0, so NoSuchMethodException 
occurs.

I try to run spark-submmit by - -jars,but the same. and I try to add
  configuration.set("spark.executor.extraClassPath", "/home/ljh")
  configuration.set("spark.driver.userClassPathFirst","true");
to my code, still the same.

How to solve this? How to remove the guava.jar in hadoop/hbase from class path? 
why it does not use the guava.jar in spark dir.

Here is my code:

rdd.foreach({ res =>
  val configuration = HBaseConfiguration.create();

  configuration.set("hbase.zookeeper.property.clientPort", "2181");
  configuration.set("hbase.zookeeper.quorum", “ip.66");
  configuration.set("hbase.master", “ip:6");
  configuration.set("spark.executor.extraClassPath", "/home/ljh")
  configuration.set("spark.driver.userClassPathFirst","true");
  val hadmin = new HBaseAdmin(configuration);
  configuration.clear();
  configuration.addResource("/home/hadoop/conf/core-default.xml")
  configuration.addResource("/home/hadoop/conf/core-site.xml")
  configuration.addResource("/home/hadoop/conf/mapred-default.xml")
  configuration.addResource("/home/hadoop/conf/mapred-site.xml")
  configuration.addResource("/home/hadoop/conf/yarn-default.xml")
  configuration.addResource("/home/hadoop/conf/yarn-site.xml")
  configuration.addResource("/home/hadoop/conf/hdfs-default.xml")
  configuration.addResource("/home/hadoop/conf/hdfs-site.xml")
  configuration.addResource("/home/hadoop/conf/hbase-default.xml")
  configuration.addResource("/home/ljhn1829/hbase-site.xml")
  val table = new HTable(configuration, "ljh_test2");
  var put = new Put(Bytes.toBytes(res.toKey()));
  put.add(Bytes.toBytes("basic"), Bytes.toBytes("name"), 
Bytes.toBytes(res.totalCount + "\t" + res.positiveCount));
  table.put(put);
  table.flushCommits()
})

and the error message:


15/10/23 19:06:42 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, 
gdc-dn126-formal.i.nease.net): 
java.lang.NoSuchMethodError: 
com.google.common.io.ByteStreams.limit(Ljava/io/InputStream;J)Ljava/io/InputStream;
at 
org.apache.spark.util.collection.ExternalAppendOnlyMap$DiskMapIterator.nextBatchStream(ExternalAppendOnlyMap.scala:420)
at 
org.apache.spark.util.collection.ExternalAppendOnlyMap$DiskMapIterator.(ExternalAppendOnlyMap.scala:392)
at 
org.apache.spark.util.collection.ExternalAppendOnlyMap.spill(ExternalAppendOnlyMap.scala:207)
at 
org.apache.spark.util.collection.ExternalAppendOnlyMap.spill(ExternalAppendOnlyMap.scala:63)
at 
org.apache.spark.util.collection.Spillable$class.maybeSpill(Spillable.scala:83)
at 
org.apache.spark.util.collection.ExternalAppendOnlyMap.maybeSpill(ExternalAppendOnlyMap.scala:63)
at 
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:129)
at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:60)
at 
org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:46)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:90)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

15/10/23 19:06:42 INFO TaskSetManager: Starting task 0.1 in stage 1.0 (TID 2, 
gdc-dn166-formal.i.nease.net, 
PROCESS_LOCAL, 1277 bytes)
15/10/23 19:06:42 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 
gdc-dn166-forma

Re: How to set memory for SparkR with master="local[*]"

2015-10-23 Thread Dirceu Semighini Filho
Hi Matej,
I'm also using this and I'm having the same behavior here, my driver has
only 530mb which is the default value.

Maybe this is a bug.

2015-10-23 9:43 GMT-02:00 Matej Holec :

> Hello!
>
> How to adjust the memory settings properly for SparkR with
> master="local[*]"
> in R?
>
>
> *When running from  R -- SparkR doesn't accept memory settings :(*
>
> I use the following commands:
>
> R>  library(SparkR)
> R>  sc <- sparkR.init(master = "local[*]", sparkEnvir =
> list(spark.driver.memory = "5g"))
>
> Despite the variable spark.driver.memory is correctly set (checked in
> http://node:4040/environment/), the driver has only the default amount of
> memory allocated (Storage Memory 530.3 MB).
>
> *But when running from  spark-1.5.1-bin-hadoop2.6/bin/sparkR -- OK*
>
> The following command:
>
> ]$ spark-1.5.1-bin-hadoop2.6/bin/sparkR --driver-memory 5g
>
> creates SparkR session with properly adjustest driver memory (Storage
> Memory
> 2.6 GB).
>
>
> Any suggestion?
>
> Thanks
> Matej
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-set-memory-for-SparkR-with-master-local-tp25178.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


How to set memory for SparkR with master="local[*]"

2015-10-23 Thread Matej Holec
Hello!

How to adjust the memory settings properly for SparkR with master="local[*]"
in R?


*When running from  R -- SparkR doesn't accept memory settings :(*

I use the following commands:

R>  library(SparkR)
R>  sc <- sparkR.init(master = "local[*]", sparkEnvir =
list(spark.driver.memory = "5g"))
 
Despite the variable spark.driver.memory is correctly set (checked in
http://node:4040/environment/), the driver has only the default amount of
memory allocated (Storage Memory 530.3 MB).

*But when running from  spark-1.5.1-bin-hadoop2.6/bin/sparkR -- OK*

The following command:

]$ spark-1.5.1-bin-hadoop2.6/bin/sparkR --driver-memory 5g

creates SparkR session with properly adjustest driver memory (Storage Memory
2.6 GB).
 

Any suggestion?

Thanks
Matej



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-set-memory-for-SparkR-with-master-local-tp25178.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Cannot start REPL shell since 1.4.0

2015-10-23 Thread emlyn
xjlin0 wrote
> I cannot enter REPL shell in 1.4.0/1.4.1/1.5.0/1.5.1(with pre-built with
> or without Hadoop or home compiled with ant or maven).  There was no error
> message in v1.4.x, system prompt nothing.  On v1.5.x, once I enter
> $SPARK_HOME/bin/pyspark or spark-shell, I got
> 
> Error: Could not find or load main class org.apache.spark.launcher.Main

I have the same problem (on MacOS X Yosemite, all spark versions since 1.4,
installed both with homebrew and downloaded manually). I've been trying to
start the pyspark shell, but it also fails in the same way for spark-shell
and spark-sql and spark-submit. I've narrowed it down to the following line
in the spark-class script:

done < <("$RUNNER" -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main
"$@")

(where $RUNNER is "java" and $LAUNCH_CLASSPATH is
"/usr/local/Cellar/apache-spark/1.5.1/libexec/lib/spark-assembly-1.5.1-hadoop2.6.0.jar",
which does exist and does contain the org.apache.spark.launcher.Main class,
despite the message that it can't be found)

If I run it manually, using:

SPARK_HOME=/usr/local/Cellar/apache-spark/1.5.1/libexec java -cp
/usr/local/Cellar/apache-spark/1.5.1/libexec/lib/spark-assembly-1.5.1-hadoop2.6.0.jar
org.apache.spark.launcher.Main org.apache.spark.deploy.SparkSubmit
pyspark-shell-main --name PySparkShell

It runs without that error, and instead prints out (where "\0" is a nul
character):

env\0PYSPARK_SUBMIT_ARGS="--name" "PySparkShell" "pyspark-shell"\0python\0

I'm not really sure what to try next, maybe with this extra information
someone has an idea what's going wrong, and how to fix it.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Cannot-start-REPL-shell-since-1-4-0-tp24921p25176.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



NoSuchMethodException : com.google.common.io.ByteStreams.limit

2015-10-23 Thread jinhong lu
Hi, I run spark to write data to hbase, but found NoSuchMethodException:

15/10/23 18:45:21 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, 
dn18-formal.i.nease.net): java.lang.NoSuchMethodError: 
com.google.common.io.ByteStreams.limit(Ljava/io/InputStream;J)Ljava/io/InputStream;

I found guava.jar in hadoop/hbase dir and the version is 12.0, but 
com.google.common.io.ByteStreams.limit is since 14.0, so NoSuchMethodException 
occurs.

I try to run spark-submmit by - -jars,but the same. and I try to add  
  configuration.set("spark.executor.extraClassPath", "/home/ljh")
  configuration.set("spark.driver.userClassPathFirst","true");
to my code, still the same.

How to solve this? How to remove the guava.jar in hadoop/hbase from class path? 
why it does not use the guava.jar in spark dir.

Here is my code:

rdd.foreach({ res =>
  val configuration = HBaseConfiguration.create();

  configuration.set("hbase.zookeeper.property.clientPort", "2181");
  configuration.set("hbase.zookeeper.quorum", “ip.66");
  configuration.set("hbase.master", “ip:6");
  configuration.set("spark.executor.extraClassPath", "/home/ljh")
  configuration.set("spark.driver.userClassPathFirst","true");
  val hadmin = new HBaseAdmin(configuration);
  configuration.clear();
  configuration.addResource("/home/hadoop/conf/core-default.xml")
  configuration.addResource("/home/hadoop/conf/core-site.xml")
  configuration.addResource("/home/hadoop/conf/mapred-default.xml")
  configuration.addResource("/home/hadoop/conf/mapred-site.xml")
  configuration.addResource("/home/hadoop/conf/yarn-default.xml")
  configuration.addResource("/home/hadoop/conf/yarn-site.xml")
  configuration.addResource("/home/hadoop/conf/hdfs-default.xml")
  configuration.addResource("/home/hadoop/conf/hdfs-site.xml")
  configuration.addResource("/home/hadoop/conf/hbase-default.xml")
  configuration.addResource("/home/ljhn1829/hbase-site.xml")
  val table = new HTable(configuration, "ljh_test2");
  var put = new Put(Bytes.toBytes(res.toKey()));
  put.add(Bytes.toBytes("basic"), Bytes.toBytes("name"), 
Bytes.toBytes(res.totalCount + "\t" + res.positiveCount));
  table.put(put);
  table.flushCommits()
})

and the error message:


15/10/23 19:06:42 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, 
gdc-dn126-formal.i.nease.net): java.lang.NoSuchMethodError: 
com.google.common.io.ByteStreams.limit(Ljava/io/InputStream;J)Ljava/io/InputStream;
at 
org.apache.spark.util.collection.ExternalAppendOnlyMap$DiskMapIterator.nextBatchStream(ExternalAppendOnlyMap.scala:420)
at 
org.apache.spark.util.collection.ExternalAppendOnlyMap$DiskMapIterator.(ExternalAppendOnlyMap.scala:392)
at 
org.apache.spark.util.collection.ExternalAppendOnlyMap.spill(ExternalAppendOnlyMap.scala:207)
at 
org.apache.spark.util.collection.ExternalAppendOnlyMap.spill(ExternalAppendOnlyMap.scala:63)
at 
org.apache.spark.util.collection.Spillable$class.maybeSpill(Spillable.scala:83)
at 
org.apache.spark.util.collection.ExternalAppendOnlyMap.maybeSpill(ExternalAppendOnlyMap.scala:63)
at 
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:129)
at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:60)
at 
org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:46)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:90)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

15/10/23 19:06:42 INFO TaskSetManager: Starting task 0.1 in stage 1.0 (TID 2, 
gdc-dn166-formal.i.nease.net, PROCESS_LOCAL, 1277 bytes)
15/10/23 19:06:42 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 
gdc-dn166-formal.i.nease.net:3838 (size: 3.2 KB, free: 1060.3 MB)
15/10/23 19:06:42 ERROR YarnScheduler: Lost executor 1 on 
gdc-dn126-formal.i.nease.net: remote Rpc client disassociated
15/10/23 19:06:42 WARN ReliableDeliverySupervisor: Association with remote 
system [akka.tcp://sparkexecu...@gdc-dn126-formal.i.nease.net:1656] has failed, 
address is now gated for [5000] ms. Reason is: [Disassociated].
15/10/23 19:06:42 INFO TaskSetManager: Re-queueing tasks for 1 from TaskSet 1.0
15/10/23 1

Re: [Spark Streaming] How do we reset the updateStateByKey values.

2015-10-23 Thread Uthayan Suthakar
Hi Sander,

Thank you for your very informative email. From your email, I've learned a
quite a bit.

>>>Is the condition determined somehow from the data coming through
streamLogs, and is newData streamLogs again (rather than a whole data
source?)

No, they are two different Streams. I have two stream receivers, one of
which sends event regularly and the other is not so regular (this data is
computed by another application and stored into HDFS). What I'm trying to
do is pick up the data from HDFS and overwrite the Stream's state. Hence
the overwriting should only take place if there were new files in HDFS.

So we have two different RDDs. If no file is found in HDFS, it will simply
read the regular stream, compute and update the state(1) and output the
result. If there is a file found in HDFS, then it should overwrite the
state (1) with the data found from HDFS so the new events from the regular
stream will carry on with the new overwritten state.

I managed to get most of it done, but only having the issue with
overwriting the state.



On 22 October 2015 at 19:35, Sander van Dijk  wrote:

> I don't think it is possible in the way you try to do it. It is important
> to remember that the statements you mention only set up the stream stages,
> before the stream is actually running. Once it's running, you cannot
> change, remove or add stages.
>
> I am not sure how you determine your condition and what the actual change
> should be when that condition is met: you say you want a different update
> function but then give a statement with the same update function but a
> different source stream). Is the condition determined somehow from the data
> coming through streamLogs, and is newData basically streamLogs again
> (rather than a whole data source?). In that case I can think of 3 things to
> try:
>
> - if the condition you switch on can be determined independently from
> every item in streamLogs, you can simply do an if/else inside
> updateResultsStream to change the method that you determine your state
> - if this is not the case, but you can determine when to switch your
> condition for each key independently, you can extend your state type to
> also keep track of your condition: rather than using
> JavaPairDStream you make updatedResultsState a
> JavaPairDStream> (assuming you have some
> class Pair), and you make updateResultsStream update and check the state of
> the boolean.
> - finally, you can have a separate state stream that keeps track of your
> condition globally, then join that with you main stream and use that to
> update state. Something like:
>
> // determineCondition should result in a reduction to a single item that
> signals whether the condition is met in the current batch,
> updateContitionState should remember that
> conditionStateStream =
> streamLogs.reduce(determineCondition).updateStateByKey(updateConditionState)
>
>
> // addCondition gets RDDs from streamLogs and  single-item RDDs with the
> condition state and should add that state to each item in the streamLogs RDD
> joinedStream = streamLogs.transformWith(conditionStateStream,
> addCondition)
>
> // This is similar to the extend state type of the previous idea, but now
> your condition state is determined globally rather than per log entry
> updatedResultsState = joinedStream.updateStateByKey(updateResultsStream)
>
> I hope this applies to your case and that it makes sense, my Java is a bit
> rusty :) and perhaps others can suggest better spark streaming methods that
> can be used, but hopefully the idea is clear.
>
> Sander
>
> On Thu, Oct 22, 2015 at 4:06 PM Uthayan Suthakar <
> uthayan.sutha...@gmail.com> wrote:
>
>> Hello guys,
>>
>> I have a stream job that will carryout computations and update the state
>> (SUM the value). At some point, I would like to reset the state. I could
>> drop the state by setting 'None' but I don't want to drop it. I would like
>> to keep the state but update the state.
>>
>>
>> For example:
>>
>> JavaPairDStream updatedResultsState =
>> streamLogs.updateStateByKey(updateResultsStream);
>>
>> At some condition, I would like to update the state by key but with the
>> different values, hence different update function.
>>
>>
>> e.g.
>>
>>  updatedResultsState = newData.updateStateByKey(resetResultsStream);
>>
>> But the  newData.updateStateByKeyvalues cannot be replaced with the value
>> in streamLogs.updateStateByKey. Do you know how I could replace the state
>> value in  streamLogs with newData.
>>
>> Is this possible?
>>
>>
>>
>>
>>
>>


Re: java.lang.NegativeArraySizeException? as iterating a big RDD

2015-10-23 Thread Todd Nist
Hi Yifan,

You could also try increasing the spark.kryoserializer.buffer.max.mb

*spark.kryoserializer.buffer.max.mb *(64 Mb by default) : useful if your
default buffer size goes further than 64 Mb;

Per doc:
Maximum allowable size of Kryo serialization buffer. This must be larger
than any object you attempt to serialize. Increase this if you get a
"buffer limit exceeded" exception inside Kryo.

-Todd

On Fri, Oct 23, 2015 at 6:51 AM, Yifan LI  wrote:

> Thanks for your advice, Jem. :)
>
> I will increase the partitioning and see if it helps.
>
> Best,
> Yifan LI
>
>
>
>
>
> On 23 Oct 2015, at 12:48, Jem Tucker  wrote:
>
> Hi Yifan,
>
> I think this is a result of Kryo trying to seriallize something too large.
> Have you tried to increase your partitioning?
>
> Cheers,
>
> Jem
>
> On Fri, Oct 23, 2015 at 11:24 AM Yifan LI  wrote:
>
>> Hi,
>>
>> I have a big sorted RDD sRdd(~962million elements), and need to scan its
>> elements in order(using sRdd.toLocalIterator).
>>
>> But the process failed when the scanning was done after around 893million
>> elements, returned with following exception:
>>
>> Anyone has idea? Thanks!
>>
>>
>> Exception in thread "main" org.apache.spark.SparkException: Job aborted
>> due to stage failure: Task 0 in stage 421752.0 failed 128 times, most
>> recent failure: Lost task 0.127 in stage 421752.0 (TID 17304,
>> small15-tap1.common.lip6.fr): java.lang.NegativeArraySizeException
>> at
>> com.esotericsoftware.kryo.util.IdentityObjectIntMap.resize(IdentityObjectIntMap.java:409)
>> at
>> com.esotericsoftware.kryo.util.IdentityObjectIntMap.putStash(IdentityObjectIntMap.java:227)
>> at
>> com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntMap.java:221)
>> at
>> com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:117)
>> at
>> com.esotericsoftware.kryo.util.IdentityObjectIntMap.putStash(IdentityObjectIntMap.java:228)
>> at
>> com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntMap.java:221)
>> at
>> com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:117)
>> at
>> com.esotericsoftware.kryo.util.MapReferenceResolver.addWrittenObject(MapReferenceResolver.java:23)
>> at com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:598)
>> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:566)
>> at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:36)
>> at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33)
>> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
>> at
>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318)
>> at
>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)
>> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
>> at
>> org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:250)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:236)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> Driver stacktrace:
>> at org.apache.spark.scheduler.DAGScheduler.org
>> 
>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263)
>> at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> at
>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
>> at scala.Option.foreach(Option.scala:236)
>> at
>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
>> at
>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1457)
>> at
>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418)
>> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>
>> Best,
>> Yifan LI
>>
>>
>>
>>
>>
>>
>


Re: java.lang.NegativeArraySizeException? as iterating a big RDD

2015-10-23 Thread Yifan LI
Thanks for your advice, Jem. :)

I will increase the partitioning and see if it helps. 

Best,
Yifan LI





> On 23 Oct 2015, at 12:48, Jem Tucker  wrote:
> 
> Hi Yifan, 
> 
> I think this is a result of Kryo trying to seriallize something too large. 
> Have you tried to increase your partitioning? 
> 
> Cheers,
> 
> Jem
> 
> On Fri, Oct 23, 2015 at 11:24 AM Yifan LI  > wrote:
> Hi,
> 
> I have a big sorted RDD sRdd(~962million elements), and need to scan its 
> elements in order(using sRdd.toLocalIterator).
> 
> But the process failed when the scanning was done after around 893million 
> elements, returned with following exception:
> 
> Anyone has idea? Thanks!
> 
> 
> Exception in thread "main" org.apache.spark.SparkException: Job aborted due 
> to stage failure: Task 0 in stage 421752.0 failed 128 times, most recent 
> failure: Lost task 0.127 in stage 421752.0 (TID 17304, 
> small15-tap1.common.lip6.fr ): 
> java.lang.NegativeArraySizeException
>   at 
> com.esotericsoftware.kryo.util.IdentityObjectIntMap.resize(IdentityObjectIntMap.java:409)
>   at 
> com.esotericsoftware.kryo.util.IdentityObjectIntMap.putStash(IdentityObjectIntMap.java:227)
>   at 
> com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntMap.java:221)
>   at 
> com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:117)
>   at 
> com.esotericsoftware.kryo.util.IdentityObjectIntMap.putStash(IdentityObjectIntMap.java:228)
>   at 
> com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntMap.java:221)
>   at 
> com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:117)
>   at 
> com.esotericsoftware.kryo.util.MapReferenceResolver.addWrittenObject(MapReferenceResolver.java:23)
>   at com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:598)
>   at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:566)
>   at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:36)
>   at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33)
>   at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
>   at 
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318)
>   at 
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)
>   at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
>   at 
> org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:250)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:236)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> 
> Driver stacktrace:
>   at org.apache.spark.scheduler.DAGScheduler.org 
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263)
>   at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>   at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
>   at scala.Option.foreach(Option.scala:236)
>   at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1457)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418)
>   at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> 
> Best,
> Yifan LI
> 
> 
> 
> 
> 



Re: java.lang.NegativeArraySizeException? as iterating a big RDD

2015-10-23 Thread Jem Tucker
Hi Yifan,

I think this is a result of Kryo trying to seriallize something too large.
Have you tried to increase your partitioning?

Cheers,

Jem

On Fri, Oct 23, 2015 at 11:24 AM Yifan LI  wrote:

> Hi,
>
> I have a big sorted RDD sRdd(~962million elements), and need to scan its
> elements in order(using sRdd.toLocalIterator).
>
> But the process failed when the scanning was done after around 893million
> elements, returned with following exception:
>
> Anyone has idea? Thanks!
>
>
> Exception in thread "main" org.apache.spark.SparkException: Job aborted
> due to stage failure: Task 0 in stage 421752.0 failed 128 times, most
> recent failure: Lost task 0.127 in stage 421752.0 (TID 17304,
> small15-tap1.common.lip6.fr): java.lang.NegativeArraySizeException
> at
> com.esotericsoftware.kryo.util.IdentityObjectIntMap.resize(IdentityObjectIntMap.java:409)
> at
> com.esotericsoftware.kryo.util.IdentityObjectIntMap.putStash(IdentityObjectIntMap.java:227)
> at
> com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntMap.java:221)
> at
> com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:117)
> at
> com.esotericsoftware.kryo.util.IdentityObjectIntMap.putStash(IdentityObjectIntMap.java:228)
> at
> com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntMap.java:221)
> at
> com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:117)
> at
> com.esotericsoftware.kryo.util.MapReferenceResolver.addWrittenObject(MapReferenceResolver.java:23)
> at com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:598)
> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:566)
> at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:36)
> at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33)
> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
> at
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318)
> at
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)
> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
> at
> org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:250)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:236)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> Driver stacktrace:
> at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
> at scala.Option.foreach(Option.scala:236)
> at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1457)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>
> Best,
> Yifan LI
>
>
>
>
>
>


java.lang.NegativeArraySizeException? as iterating a big RDD

2015-10-23 Thread Yifan LI
Hi,

I have a big sorted RDD sRdd(~962million elements), and need to scan its 
elements in order(using sRdd.toLocalIterator).

But the process failed when the scanning was done after around 893million 
elements, returned with following exception:

Anyone has idea? Thanks!


Exception in thread "main" org.apache.spark.SparkException: Job aborted due to 
stage failure: Task 0 in stage 421752.0 failed 128 times, most recent failure: 
Lost task 0.127 in stage 421752.0 (TID 17304, small15-tap1.common.lip6.fr): 
java.lang.NegativeArraySizeException
at 
com.esotericsoftware.kryo.util.IdentityObjectIntMap.resize(IdentityObjectIntMap.java:409)
at 
com.esotericsoftware.kryo.util.IdentityObjectIntMap.putStash(IdentityObjectIntMap.java:227)
at 
com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntMap.java:221)
at 
com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:117)
at 
com.esotericsoftware.kryo.util.IdentityObjectIntMap.putStash(IdentityObjectIntMap.java:228)
at 
com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntMap.java:221)
at 
com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:117)
at 
com.esotericsoftware.kryo.util.MapReferenceResolver.addWrittenObject(MapReferenceResolver.java:23)
at com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:598)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:566)
at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:36)
at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
at 
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318)
at 
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
at 
org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:250)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:236)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
at scala.Option.foreach(Option.scala:236)
at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1457)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

Best,
Yifan LI







Re: Best way to use Spark UDFs via Hive (Spark Thrift Server)

2015-10-23 Thread Deenar Toraskar
You can do the following. Start the spark-shell. Register the UDFs in the
shell using sqlContext, then start the Thrift Server using startWithContext
from the spark shell:
https://github.com/apache/spark/blob/master/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala#L56


Regards
Deenar

On 23 October 2015 at 01:15, Dave Moyers  wrote:

> Hi,
>
> We have several udf's written in Scala that we use within jobs submitted
> into Spark. They work perfectly with the sqlContext after being registered.
> We also allow access to saved tables via the Hive Thrift server bundled
> with Spark. However, we would like to allow Hive connections to use the
> udf's in their queries against the saved tables. Is there a way to register
> udf's such that they can be used within both a Spark job and in a Hive
> connection?
>
> Thanks!
> Dave
>
> Sent from my iPad
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Running many small Spark jobs repeatedly

2015-10-23 Thread Stephan Kepser
Hi there,

we have a set of relatively light weight jobs that we would like to run
repeatedly on our Spark cluster.

The situation is as follows. we have a reliable source of data, a Cassandra
database. One table contains time series data, which we would like to
analyse. To do so we read a window of records defined by a start time stamp
and an end time stamp from the table and process all records of this
window. Since data is permanently added to the Cassandra table, we would
like to run the analysis job repeatedly, in fact every other second. Would
we have to keep re-submitting this Spark job every other second? How about
start-up times, if the expected job run time is just 2 seconds?

Or is there a way to define a long running job by surrounding the Java Job
we have with a  while(true) { run job) } ?

What's more the table contains data for several tenants (about 25) and from
several different sources. We would want to analyse those (tenants and
sources) using a job for each combination of tenant and source. This leads
to quite a few jobs. We expect something like 50 jobs.

We see that this is similar to Spark streaming. But we're not sure whether
we should use Spark streaming. If we have about 50 jobs running in
parallel, we would need 50 receivers. And each receiver would require 1
core. The jobs would also require quite a few cores, though we probably
don't need a core per job. But it easily adds up to 75 cores, which seems
quite a lot for the little processing we do.

I expect retrieving data repeatedly from a database table and analysing the
data with several jobs is a pretty standard situation in Spark
applications. But couldn't find anything about this in the docs or on the
internet.

Any ideas or hints would be very welcome.

Thanks a lot,

Stephan

-- 
Dr. Stephan Kepser | Senior IT-Consultant

codecentric AG | Merscheider Straße 1 | 42699 Solingen | Deutschland
tel: +49 (0) 212.23362845 | fax: +49 (0) 212.23362879 | mobil: +49 (0)
151.52883635
www.codecentric.de | blog.codecentric.de | www.meettheexperts.de |
www.more4fi.de

Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
Vorstand: Michael Hochgürtel, Mirko Novakovic, Rainer Vehns
Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz

Diese E-Mail einschließlich evtl. beigefügter Dateien enthält vertrauliche
und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige
Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren Sie
bitte sofort den Absender und löschen Sie diese E-Mail und evtl.
beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen
evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist
nicht gestattet.


Strange problem of SparkLauncher

2015-10-23 Thread ??????
I need to run Spark Job as a service in my project, so there is a 
"ServiceManager" in it and it use 
SparkLauncher(org.apache.spark.launcher.SparkLauncher) to submit Spark jobs.


First, I tried to write a demo, putting only the SparkLauncher codes in the 
main and run it with java -jar, it's fine.


Then I tried starting my ServiceManager, and then run the demo, it's still OK, 
this means there should be no conflicts in ports or files between ServiceManger 
and the Spark job.


After that, I copied the codes in the demo into my ServiceManager as a method, 
then start the ServiceManager and call it. However, the SparkSubmit process got 
stuck in/after the "addJars" procedure. 


And here comes the strangest thing, when I killed the ServiceManager process, 
the SparkSubmit process continued to run immediately. It seems if the 
SparkLauncher is called with my ServiceManager, it got blocked.


This is confusing me, do you have any idea why this happened?


Thanks a lot

How to get inverse Matrix / RDD or how to solve linear system of equations

2015-10-23 Thread Zhiliang Zhu
Hi Sujit, and All,
Currently I lost in large difficulty, I am eager to get some help from you.
There is some big linear system of equations as:Ax = b,  A with N number of row 
and N number of column, N is very large, b = [0, 0, ..., 0, 1]TThen, I will 
sovle it to get x = [x1, x2, ..., xn]T.
The simple solution would be to get inverse(A), and then  x = (inverse(A)) * b 
.A would be some JavaRDD> , however, for RDD/matrix there is 
add/multply/transpose APIs, no inverse API for it!
Then, how would it conveniently get inverse(A), or just solve the linear system 
of equations by some other way...In Spark MLlib, there was linear regression, 
the training process might be to solve the coefficients to get some specific 
linear model, just is, 
Ax = y, just train by (x, y) to get A , this might be used to solve the linear 
system of equations. It is like that? I could not decide.
I must show my deep appreciation torwards your all help.
Thank you very much!Zhiliang



Re: Large number of conf broadcasts

2015-10-23 Thread Koert Kuipers
oh no wonder... it undoes the glob (i was reading from /some/path/*),
creates a hadoopRdd for every path, and then creates a union of them using
UnionRDD.

thats not what i want... no need to do union. AvroInpuFormat already has
the ability to handle globs (or multiple paths comma separated) very
efficiently. AvroRelation should just pass the paths (comma separated).




On Thu, Oct 22, 2015 at 1:37 PM, Anders Arpteg  wrote:

> Yes, seems unnecessary. I actually tried patching the
> com.databricks.spark.avro reader to only broadcast once per dataset,
> instead of every single file/partition. It seems to work just as fine, and
> there are significantly less broadcasts and not seeing out of memory issues
> any more. Strange that more people does not react to this, since the
> broadcasting seems completely unnecessary...
>
> Best,
> Anders
>
>
> On Thu, Oct 22, 2015 at 7:03 PM Koert Kuipers  wrote:
>
>> i am seeing the same thing. its gona completely crazy creating broadcasts
>> for the last 15 mins or so. killing it...
>>
>> On Thu, Sep 24, 2015 at 1:24 PM, Anders Arpteg 
>> wrote:
>>
>>> Hi,
>>>
>>> Running spark 1.5.0 in yarn-client mode, and am curios in why there are
>>> so many broadcast being done when loading datasets with large number of
>>> partitions/files. Have datasets with thousands of partitions, i.e. hdfs
>>> files in the avro folder, and sometime loading hundreds of these large
>>> datasets. Believe I have located the broadcast to line
>>> SparkContext.scala:1006. It seems to just broadcast the hadoop
>>> configuration, and I don't see why it should be necessary to broadcast that
>>> for EVERY file? Wouldn't it be possible to reuse the same broadcast
>>> configuration? It hardly the case the the configuration would be different
>>> between each file in a single dataset. Seems to be wasting lots of memory
>>> and needs to persist unnecessarily to disk (see below again).
>>>
>>> Thanks,
>>> Anders
>>>
>>> 15/09/24 17:11:11 INFO BlockManager: Writing block broadcast_1871_piece0
>>> to disk  [19/49086]15/09/24
>>> 17:11:11 INFO BlockManagerInfo: Added broadcast_1871_piece0 on disk on
>>> 10.254.35.24:49428 (size: 23.1 KB)
>>> 15/09/24 17:11:11 INFO MemoryStore: Block broadcast_4803_piece0 stored
>>> as bytes in memory (estimated size 23.1 KB, free 2.4 KB)
>>> 15/09/24 17:11:11 INFO BlockManagerInfo: Added broadcast_4803_piece0 in
>>> memory on 10.254.35.24:49428 (size: 23.1 KB, free: 464.0 MB)
>>> 15/09/24 17:11:11 INFO SpotifySparkContext: Created broadcast 4803 from
>>> hadoopFile at AvroRelation.scala:121
>>> 15/09/24 17:11:11 WARN MemoryStore: Failed to reserve initial memory
>>> threshold of 1024.0 KB for computing block broadcast_4804 in memory
>>> .
>>> 15/09/24 17:11:11 WARN MemoryStore: Not enough space to cache
>>> broadcast_4804 in memory! (computed 496.0 B so far)
>>> 15/09/24 17:11:11 INFO MemoryStore: Memory use = 530.3 MB (blocks) + 0.0
>>> B (scratch space shared across 0 tasks(s)) = 530.3 MB. Storage
>>> limit = 530.3 MB.
>>> 15/09/24 17:11:11 WARN MemoryStore: Persisting block broadcast_4804 to
>>> disk instead.
>>> 15/09/24 17:11:11 INFO MemoryStore: ensureFreeSpace(23703) called with
>>> curMem=556036460, maxMem=556038881
>>> 15/09/24 17:11:11 INFO MemoryStore: 1 blocks selected for dropping
>>> 15/09/24 17:11:11 INFO BlockManager: Dropping block
>>> broadcast_1872_piece0 from memory
>>> 15/09/24 17:11:11 INFO BlockManager: Writing block broadcast_1872_piece0
>>> to disk
>>>
>>>
>>
>>


Re: spark streaming failing to replicate blocks

2015-10-23 Thread Eugen Cepoi
When fixing the port to the same values as in the stack trace it works too.
The network config of the slaves seems correct.

Thanks,
Eugen

2015-10-23 8:30 GMT+02:00 Akhil Das :

> Mostly a network issue, you need to check your network configuration from
> the aws console and make sure the ports are accessible within the cluster.
>
> Thanks
> Best Regards
>
> On Thu, Oct 22, 2015 at 8:53 PM, Eugen Cepoi 
> wrote:
>
>> Huh indeed this worked, thanks. Do you know why this happens, is that
>> some known issue?
>>
>> Thanks,
>> Eugen
>>
>> 2015-10-22 19:08 GMT+07:00 Akhil Das :
>>
>>> Can you try fixing spark.blockManager.port to specific port and see if
>>> the issue exists?
>>>
>>> Thanks
>>> Best Regards
>>>
>>> On Mon, Oct 19, 2015 at 6:21 PM, Eugen Cepoi 
>>> wrote:
>>>
 Hi,

 I am running spark streaming 1.4.1 on EMR (AMI 3.9) over YARN.
 The job is reading data from Kinesis and the batch size is of 30s (I
 used the same value for the kinesis checkpointing).
 In the executor logs I can see every 5 seconds a sequence of
 stacktraces indicating that the block replication failed. I am using the
 default storage level MEMORY_AND_DISK_SER_2.
 WAL is not enabled nor checkpointing (the checkpoint dir is configured
 for the spark context but not for the streaming context).

 Here is an example of those logs for ip-10-63-160-18. They occur in
 every executor while trying to replicate to any other executor.


 15/10/19 03:11:55 INFO nio.SendingConnection: Initiating connection to 
 [ip-10-63-160-18.ec2.internal/10.63.160.18:50929]
 15/10/19 03:11:55 WARN nio.SendingConnection: Error finishing connection 
 to ip-10-63-160-18.ec2.internal/10.63.160.18:50929
 java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at 
 sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
at 
 org.apache.spark.network.nio.SendingConnection.finishConnect(Connection.scala:344)
at 
 org.apache.spark.network.nio.ConnectionManager$$anon$10.run(ConnectionManager.scala:292)
at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
 15/10/19 03:11:55 ERROR nio.ConnectionManager: Exception while sending 
 message.
 java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at 
 sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
at 
 org.apache.spark.network.nio.SendingConnection.finishConnect(Connection.scala:344)
at 
 org.apache.spark.network.nio.ConnectionManager$$anon$10.run(ConnectionManager.scala:292)
at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
 15/10/19 03:11:55 INFO nio.ConnectionManager: Notifying 
 ConnectionManagerId(ip-10-63-160-18.ec2.internal,50929)
 15/10/19 03:11:55 INFO nio.ConnectionManager: Handling connection error on 
 connection to ConnectionManagerId(ip-10-63-160-18.ec2.internal,50929)
 15/10/19 03:11:55 WARN storage.BlockManager: Failed to replicate 
 input-1-144524231 to BlockManagerId(3, ip-10-159-151-22.ec2.internal, 
 50929), failure #0
 java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at 
 sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
at 
 org.apache.spark.network.nio.SendingConnection.finishConnect(Connection.scala:344)
at 
 org.apache.spark.network.nio.ConnectionManager$$anon$10.run(ConnectionManager.scala:292)
at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
 15/10/19 03:11:55 INFO nio.ConnectionManager: Removing SendingConnection 
 to ConnectionManagerId(ip-10-63-160-18.ec2.internal,50929)
 15/10/19 03:11:55 INFO nio.SendingConnection: Initiating connection to 
 [ip-10-63-160-18.ec2.internal/10.63.160.18:39506]
 15/10/19 03:11:55 WARN nio.SendingConnection: Error finishing connection 
 to ip-10-63-160-18.ec2.internal/10.63.160.18:39506
 java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at 
 sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
at 
 org.apache.spark.network.nio.SendingConnection.finishConnect(Connection.scala:344)
at 
 

Re: [SPARK STREAMING] polling based operation instead of event based operation

2015-10-23 Thread Lars Albertsson
There is a heartbeat stream pattern that you can use: Create a service
(perhaps a thread in your driver) that pushes a heartbeat event to a
different stream every N seconds. Consume that stream as well in your
streaming application, and perform an action on every heartbeat.

This has worked well in Storm, which differs in implementation, but it
might work in Spark Streaming as well.

Alternatively, since Spark internally uses microbatching, you might be
able to take advantage of the existing batching. You can try to use
foreachRDD on the stream, and see if your function gets called even
when the microbatch is empty. In that case, assuming your batch size
can add up to 10 seconds, you can implement heartbeat functionality
internally. If you are very lucky,
myStream.window(Seconds(10)).foreachRDD({ ... }) might even do it for
you.

If you do experiments along those lines, please share the results with the list!

Regards,

Lars Albertsson



On Thu, Oct 22, 2015 at 10:48 PM, Nipun Arora  wrote:
> Hi,
> In general in spark stream one can do transformations ( filter, map etc.) or
> output operations (collect, forEach) etc. in an event-driven pardigm... i.e.
> the action happens only if a message is received.
>
> Is it possible to do actions every few seconds in a polling based fashion,
> regardless if a new message has been received.
>
> In my use-case, I am filtering out a stream, and then do operations on this
> filter streams. However, I would like to do operations on the data in the
> stream every few seconds even if no message has been received in the stream.
>
> For example I have the following stream, where the violationChecking()
> function is being called only when a micro-batch is finished. Essentially
> this also means that I must receive a message in this stream to do
> processing. Is there any way that I can do the same operation every 10
> seconds or so? :
>
> sortedMessages.foreach(
> new Function Integer>>, Void>() {
> @Override
> public Void call(JavaRDD Integer, Integer>> tuple5JavaRDD) throws Exception {
> List>
> list = tuple5JavaRDD.collect();
> violationChecking(list);
> return null;
> }
> }
> );
>
>
> Thanks,
>
> Nipun

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Saving offset while reading from kafka

2015-10-23 Thread Erwan ALLAIN
Have a look at this: https://github.com/koeninger/kafka-exactly-once

especially:
https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/TransactionalPerBatch.scala
https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/TransactionalPerPartition.scala

On Fri, Oct 23, 2015 at 5:07 AM, Ramkumar V  wrote:

> Hi,
>
> I had written spark streaming application using kafka stream and its
> writing to hdfs for every hour(batch time). I would like to know how to get
> offset or commit offset of kafka stream while writing to hdfs so that if
> there is any issue or redeployment, i'll start from the point where i did a
> previous successful commit offset. I want to store offset in external db or
> something like that, not in zookeeper. if i want to resume kafka stream
> from the particular offset, how to resume from the particular offset in
> spark ?
>
> *Thanks*,
> 
>
>


Re: sqlContext load by offset

2015-10-23 Thread Kayode Odeyemi
When I use that I get a "Caused by: org.postgresql.util.PSQLException:
ERROR: column "none" does not exist"

On Thu, Oct 22, 2015 at 9:31 PM, Kayode Odeyemi  wrote:

> Hi,
>
> I've trying to load a postgres table using the following expression:
>
> val cachedIndex = cache.get("latest_legacy_group_index")
> val mappingsDF = sqlContext.load("jdbc", Map(
>   "url" -> Config.dataSourceUrl(mode, Some("mappings")),
>   "dbtable" -> s"(select userid, yid, username from legacyusers offset
> $cachedIndex ) as legacyusers")
> )
>
> I'll like to know if this expression is correct:
>
> "dbtable" -> s"(select userid, yid, username from legacyusers offset
> $cachedIndex ) as legacyusers")
>
> As you can see. I'm trying to load the table records by offset
>
> I appreciate your help.
>
>


-- 
Odeyemi 'Kayode O.
http://ng.linkedin.com/in/kayodeodeyemi. t: @charyorde


Re: Save RandomForest Model from ML package

2015-10-23 Thread amarouni

It's an open issue : https://issues.apache.org/jira/browse/SPARK-4587

That's being said, you can workaround the issue by serializing the Model
(simple java serialization) and then restoring it before calling the
predicition job.

Best Regards,

On 22/10/2015 14:33, Sebastian Kuepers wrote:
> Hey,
>
> I try to figure out the best practice on saving and loading models
> which have bin fitted with the ML package - i.e. with the RandomForest
> classifier.
>
> There is PMML support in the MLib package afaik but not in ML - is
> that correct?
>
> How do you approach this, so that you do not have to fit your model
> before every prediction job?
>
> Thanks,
> Sebastian
>
>
> Sebastian Küpers
> Account Director
>
> Publicis Pixelpark
> Leibnizstrasse 65, 10629 Berlin
> T +49 30 5058 1838
> M +49 172 389 28 52
> sebastian.kuep...@publicispixelpark.de
> Web: publicispixelpark.de, Twitter: @pubpxp
> Facebook: publicispixelpark.de/facebook
> Publicis Pixelpark - eine Marke der Pixelpark AG
> Vorstand: Horst Wagner (Vorsitzender), Dirk Kedrowitsch
> Aufsichtsratsvorsitzender: Pedro Simko
> Amtsgericht Charlottenburg: HRB 72163
>
>
>
>
>
> 
> Disclaimer The information in this email and any attachments may
> contain proprietary and confidential information that is intended for
> the addressee(s) only. If you are not the intended recipient, you are
> hereby notified that any disclosure, copying, distribution, retention
> or use of the contents of this information is prohibited. When
> addressed to our clients or vendors, any information contained in this
> e-mail or any attachments is subject to the terms and conditions in
> any governing contract. If you have received this e-mail in error,
> please immediately contact the sender and delete the e-mail.