Re: Many Receiver vs. Many threads per Receiver

2015-02-23 Thread Akhil Das
I believe when you go with 1, it will distribute the consumer across your
cluster (possibly on 6 machines), but still it i don't see a away to tell
from which partition it will consume etc. If you are looking to have a
consumer where you can specify the partition details and all, then you are
better off with the lowlevel consumer.




Thanks
Best Regards

On Tue, Feb 24, 2015 at 9:36 AM, bit1...@163.com  wrote:

> Hi,
> I  am experimenting Spark Streaming and Kafka Integration, To read
> messages from Kafka in parallel, basically there are two ways
> 1. Create many Receivers like (1 to 6).map(_ => KakfaUtils.createStream).
> 2. Specifiy many threads when calling KakfaUtils.createStream like val
> topicMap("myTopic"=>6), this will create one receiver with 6 reading
> threads.
>
> My question is which option is better, sounds option 2 is better is to me
> because it saves a lot of cores(one Receiver one core), but I learned
> from somewhere else that choice 1 is better, so I would ask and see how you
> guys elaborate on this. Thank
>
> --
> bit1...@163.com
>


Re: spark streaming window operations on a large window size

2015-02-23 Thread Tathagata Das
Yes.

On Mon, Feb 23, 2015 at 11:16 PM, Avi Levi  wrote:

> @Tathagata Das so basically you are saying it is supported out of the
> box, but we should expect a significant performance hit - is that right?
>
>
>
> On Tue, Feb 24, 2015 at 5:37 AM, Tathagata Das 
> wrote:
>
>> The default persistence level is MEMORY_AND_DISK, so the LRU policy would
>> discard the blocks to disk, so the streaming app will not fail. However,
>> since things will get constantly read in and out of disk as windows are
>> processed, the performance wont be great. So it is best to have sufficient
>> memory to keep all the window data in memory.
>>
>> TD
>>
>> On Mon, Feb 23, 2015 at 8:26 AM, Shao, Saisai 
>> wrote:
>>
>>> I don't think current Spark Streaming supports window operations which
>>> beyond its available memory, internally Spark Streaming puts all the data
>>> in the memory belongs to the effective window, if the memory is not enough,
>>> BlockManager will discard the blocks at LRU policy, so something unexpected
>>> will be occurred.
>>>
>>> Thanks
>>> Jerry
>>>
>>> -Original Message-
>>> From: avilevi3 [mailto:avile...@gmail.com]
>>> Sent: Monday, February 23, 2015 12:57 AM
>>> To: user@spark.apache.org
>>> Subject: spark streaming window operations on a large window size
>>>
>>> Hi guys,
>>>
>>> does spark streaming supports window operations on a sliding window that
>>> is data is larger than the available memory?
>>> we would like to
>>> currently we are using kafka as input, but we could change that if
>>> needed.
>>>
>>> thanks
>>> Avi
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-window-operations-on-a-large-window-size-tp21764.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For
>>> additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>


Re: Write ahead Logs and checkpoint

2015-02-23 Thread Tathagata Das
I think it will not affect. We are ignore the offsets store any where
outside Spark Streaming. It is the fact that progress information was being
stored in two different places (SS and Kafka/ZK) that was causing
inconsistencies and duplicates.

TD

On Mon, Feb 23, 2015 at 11:27 PM, Felix C  wrote:

>  Kafka 0.8.2 has built-in offset management, how would that affect direct
> stream in spark?
> Please see KAFKA-1012
>
> --- Original Message ---
>
> From: "Tathagata Das" 
> Sent: February 23, 2015 9:53 PM
> To: "V Dineshkumar" 
> Cc: "user" 
> Subject: Re: Write ahead Logs and checkpoint
>
>  Exactly, that is the reason.
>
>  To avoid that, in Spark 1.3 to-be-released, we have added a new Kafka
> API (called direct stream) which does not use Zookeeper at all to keep
> track of progress, and maintains offset within Spark Streaming. That can
> guarantee all records being received exactly-once. Its experimental for
> now, but we will make it stable. Please try it out.
>
>  TD
>
> On Mon, Feb 23, 2015 at 9:41 PM, V Dineshkumar <
> developer.dines...@gmail.com> wrote:
>
> Hi,
>
>  My spark streaming application is pulling data from Kafka.To prevent
> data loss I have implemented WAL and enable checkpointing.On killing my
> application and restarting it I am able to prevent data loss now but
> however I am getting duplicate messages.
>
>  Is it because the application got killed before it was able checkpoint
> the current processing state??
> If yes how to tackle the duplicate messages?
>
>  Thanks,
> Dinesh
>
>
>


Re: How to start spark-shell with YARN?

2015-02-23 Thread Arush Kharbanda
Hi

Are you sure that you built Spark for Yarn.If standalone works, not sure if
its build for Yarn.

Thanks
Arush

On Tue, Feb 24, 2015 at 12:06 PM, Xi Shen  wrote:

> Hi,
>
> I followed this guide,
> http://spark.apache.org/docs/1.2.1/running-on-yarn.html, and tried to
> start spark-shell with yarn-client
>
> ./bin/spark-shell --master yarn-client
>
>
> But I got
>
> WARN ReliableDeliverySupervisor: Association with remote system 
> [akka.tcp://sparkYarnAM@10.0.2.15:38171] has failed, address is now gated for 
> [5000] ms. Reason is: [Disassociated].
>
> In the spark-shell, and other exceptions in they yarn log. Please see
> http://stackoverflow.com/questions/28671171/spark-shell-cannot-connect-to-yarn
> for more detail.
>
>
> However, submitting to the this cluster works. Also, spark-shell as
> standalone works.
>
>
> My system:
>
> - ubuntu amd64
> - spark 1.2.1
> - yarn from hadoop 2.6 stable
>
>
> Thanks,
>
> [image: --]
> Xi Shen
> [image: http://]about.me/davidshen
> 
>   
>
>


-- 

[image: Sigmoid Analytics] 

*Arush Kharbanda* || Technical Teamlead

ar...@sigmoidanalytics.com || www.sigmoidanalytics.com


Re: Write ahead Logs and checkpoint

2015-02-23 Thread Felix C
Kafka 0.8.2 has built-in offset management, how would that affect direct stream 
in spark?
Please see KAFKA-1012

--- Original Message ---

From: "Tathagata Das" 
Sent: February 23, 2015 9:53 PM
To: "V Dineshkumar" 
Cc: "user" 
Subject: Re: Write ahead Logs and checkpoint

Exactly, that is the reason.

To avoid that, in Spark 1.3 to-be-released, we have added a new Kafka API
(called direct stream) which does not use Zookeeper at all to keep track of
progress, and maintains offset within Spark Streaming. That can guarantee
all records being received exactly-once. Its experimental for now, but we
will make it stable. Please try it out.

TD

On Mon, Feb 23, 2015 at 9:41 PM, V Dineshkumar  wrote:

> Hi,
>
> My spark streaming application is pulling data from Kafka.To prevent data
> loss I have implemented WAL and enable checkpointing.On killing my
> application and restarting it I am able to prevent data loss now but
> however I am getting duplicate messages.
>
> Is it because the application got killed before it was able checkpoint the
> current processing state??
> If yes how to tackle the duplicate messages?
>
> Thanks,
> Dinesh
>


Spark sql issue

2015-02-23 Thread Udit Mehta
Hi,

I am using spark sql to create/alter hive tables. I have a highly nested
json and I am using the schemRDD to infer the schema. The json has 6
columns and 1 of the column (which is a struct) has around 60 fields (key
value pairs).
When I run the spark sql query for the above table, it just hangs up
without any DEBUG/ERROR logs. Also if I remove a few columns or fields from
the column having 60 fields, it works fine.

Is there a limit on the size of a query which is preventing me from running
the spark sql query.
The original query works perfectly if I use the hive client.

Any help is appreciated.

Thanks,
Udit


Re: spark streaming window operations on a large window size

2015-02-23 Thread Avi Levi
@Tathagata Das so basically you are saying it is supported out of the box,
but we should expect a significant performance hit - is that right?



On Tue, Feb 24, 2015 at 5:37 AM, Tathagata Das  wrote:

> The default persistence level is MEMORY_AND_DISK, so the LRU policy would
> discard the blocks to disk, so the streaming app will not fail. However,
> since things will get constantly read in and out of disk as windows are
> processed, the performance wont be great. So it is best to have sufficient
> memory to keep all the window data in memory.
>
> TD
>
> On Mon, Feb 23, 2015 at 8:26 AM, Shao, Saisai 
> wrote:
>
>> I don't think current Spark Streaming supports window operations which
>> beyond its available memory, internally Spark Streaming puts all the data
>> in the memory belongs to the effective window, if the memory is not enough,
>> BlockManager will discard the blocks at LRU policy, so something unexpected
>> will be occurred.
>>
>> Thanks
>> Jerry
>>
>> -Original Message-
>> From: avilevi3 [mailto:avile...@gmail.com]
>> Sent: Monday, February 23, 2015 12:57 AM
>> To: user@spark.apache.org
>> Subject: spark streaming window operations on a large window size
>>
>> Hi guys,
>>
>> does spark streaming supports window operations on a sliding window that
>> is data is larger than the available memory?
>> we would like to
>> currently we are using kafka as input, but we could change that if needed.
>>
>> thanks
>> Avi
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-window-operations-on-a-large-window-size-tp21764.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional
>> commands, e-mail: user-h...@spark.apache.org
>>
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: InvalidProtocolBufferException: Protocol message end-group tag did not match expected tag

2015-02-23 Thread fanooos
I have resolved this issue. Actually there was two problems. 

The first problem in the application was the port of the HDFS. It was
configured (in core-site.xml) to 9000 but in the application I was using
50070 which (as I think) the default port.

The second problem, I forgot to put the file into HDFS :( .





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/InvalidProtocolBufferException-Protocol-message-end-group-tag-did-not-match-expected-tag-tp21777p21781.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: issue Running Spark Job on Yarn Cluster

2015-02-23 Thread sachin Singh
I am using CDH5.3.1



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/issue-Running-Spark-Job-on-Yarn-Cluster-tp21779p21780.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



issue Running Spark Job on Yarn Cluster

2015-02-23 Thread sachin Singh
Hi,
I want to run my spark Job in Hadoop yarn Cluster mode,
I am using below command -
spark-submit --master yarn-cluster --driver-memory 1g --executor-memory 1g
--executor-cores 1 --class com.dc.analysis.jobs.AggregationJob
sparkanalitic.jar param1 param2 param3
I am getting error as under, kindly suggest whats going wrong ,is command is
proper or not ,thanks in advance,

 diagnostics: Application application_1424284032717_0066 failed 2 times due
to AM Container for appattempt_1424284032717_0066_02 exited with 
exitCode: 15 due to: Exception from container-launch.
Container id: container_1424284032717_0066_02_01
Exit code: 15
Stack trace: ExitCodeException exitCode=15: 
at org.apache.hadoop.util.Shell.runCommand(Shell.java:538)
at org.apache.hadoop.util.Shell.run(Shell.java:455)
at
org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:702)
at
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:197)
at
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:299)
at
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:81)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)


Container exited with a non-zero exit code 15
.Failing this attempt.. Failing the application.
 ApplicationMaster host: N/A
 ApplicationMaster RPC port: -1
 queue: root.hdfs
 start time: 1424699723648
 final status: FAILED
 tracking URL:
http://myhostname:8088/cluster/app/application_1424284032717_0066
 user: hdfs
2015-02-23 19:26:04 DEBUG Client - stopping client from cache:
org.apache.hadoop.ipc.Client@4085f1ac
2015-02-23 19:26:04 DEBUG Utils - Shutdown hook called
2015-02-23 19:26:05 DEBUG Utils - Shutdown hook called



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/issue-Running-Spark-Job-on-Yarn-Cluster-tp21779.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Fwd: How to start spark-shell with YARN?

2015-02-23 Thread Xi Shen
Hi,

I followed this guide,
http://spark.apache.org/docs/1.2.1/running-on-yarn.html, and tried to start
spark-shell with yarn-client

./bin/spark-shell --master yarn-client


But I got

WARN ReliableDeliverySupervisor: Association with remote system
[akka.tcp://sparkYarnAM@10.0.2.15:38171] has failed, address is now
gated for [5000] ms. Reason is: [Disassociated].

In the spark-shell, and other exceptions in they yarn log. Please see
http://stackoverflow.com/questions/28671171/spark-shell-cannot-connect-to-yarn
for more detail.


However, submitting to the this cluster works. Also, spark-shell as
standalone works.


My system:

- ubuntu amd64
- spark 1.2.1
- yarn from hadoop 2.6 stable


Thanks,

[image: --]
Xi Shen
[image: http://]about.me/davidshen

  


Re: Movie Recommendation tutorial

2015-02-23 Thread Xiangrui Meng
Try to set lambda to 0.1. -Xiangrui

On Mon, Feb 23, 2015 at 3:06 PM, Krishna Sankar  wrote:
> The RSME varies a little bit between the versions.
> Partitioned the training,validation,test set like so:
>
> training = ratings_rdd_01.filter(lambda x: (x[3] % 10) < 6)
> validation = ratings_rdd_01.filter(lambda x: (x[3] % 10) >= 6 and (x[3] %
> 10) < 8)
> test = ratings_rdd_01.filter(lambda x: (x[3] % 10) >= 8)
> Validation MSE :
>
> # 1.3.0 Mean Squared Error = 0.871456869392
> # 1.2.1 Mean Squared Error = 0.877305629074
>
> Itertools results:
>
> 1.3.0 - RSME = 1.354839 (rank = 8 and lambda = 1.0, and numIter = 20)
> 1.1.1 - RSME = 1.335831 (rank = 8 and lambda = 1.0, and numIter = 10)
>
> Cheers
> 
>
> On Mon, Feb 23, 2015 at 12:37 PM, Xiangrui Meng  wrote:
>>
>> Which Spark version did you use? Btw, there are three datasets from
>> MovieLens. The tutorial used the medium one (1 million). -Xiangrui
>>
>> On Mon, Feb 23, 2015 at 8:36 AM, poiuytrez 
>> wrote:
>> > What do you mean?
>> >
>> >
>> >
>> > --
>> > View this message in context:
>> > http://apache-spark-user-list.1001560.n3.nabble.com/Movie-Recommendation-tutorial-tp21769p21771.html
>> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> >
>> > -
>> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> > For additional commands, e-mail: user-h...@spark.apache.org
>> >
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Pyspark save Decison Tree Module with joblib/pickle

2015-02-23 Thread Xiangrui Meng
FYI, in 1.3 we support save/load tree models in Scala and Java. We will add
save/load support to Python soon. -Xiangrui

On Mon, Feb 23, 2015 at 2:57 PM, Sebastián Ramírez <
sebastian.rami...@senseta.com> wrote:

> In your log it says:
>
> pickle.PicklingError: Can't pickle : it's not found as
> thread.lock
>
> As far as I know, you can't pickle Spark models. If you go to the
> documentation for Pickle you can see that you can pickle only simple Python
> structures and code (written in Python), at least as I understand:
> https://docs.python.org/2/library/pickle.html#what-can-be-pickled-and-unpickled
>
> To save a model you can call: model.toDebugString()
> 
>
> That gives you a string in pseudo-code that you can save to a file. Then,
> you can parse that pseudo code to write a proper script that runs the
> Decision Tree. Actually, that's what I did for a Random Forest (an ensamble
> of Decision Trees).
>
> Hope that helps,
>
>
> *Sebastián Ramírez*
> Diseñador de Algoritmos
>
>  
> 
>  Tel: (+571) 795 7950 ext: 1012
>  Cel: (+57) 300 370 77 10
>  Calle 73 No 7 - 06  Piso 4
>  Linkedin: co.linkedin.com/in/tiangolo/
>  Twitter: @tiangolo 
>  Email: sebastian.rami...@senseta.com
>  www.senseta.com
>
> On Mon, Feb 23, 2015 at 4:55 AM, Jaggu  wrote:
>
>> Hi Team,
>> I was trying to save a DecisionTree model from Pyspark using joblib.
>> It is giving me the following error http://pastebin.com/82CFhPNn . Any
>> clue
>> how to resolve the same or save a model.
>>
>> Best regards
>>
>> Jagan
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Pyspark-save-Decison-Tree-Module-with-joblib-pickle-tp21765.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>
> **
> *This e-mail transmission, including any attachments, is intended only for
> the named recipient(s) and may contain information that is privileged,
> confidential and/or exempt from disclosure under applicable law. If you
> have received this transmission in error, or are not the named
> recipient(s), please notify Senseta immediately by return e-mail and
> permanently delete this transmission, including any attachments.*
>


Re: Re: About FlumeUtils.createStream

2015-02-23 Thread bit1...@163.com
The behvior is exactly what I expected. Thanks Akhil and Tathagata!



bit1...@163.com
 
From: Akhil Das
Date: 2015-02-24 13:32
To: bit1129
CC: Tathagata Das; user
Subject: Re: Re: About FlumeUtils.createStream
That depends on how many machines you have in your cluster. Say you have 6 
workers and its most likely it is to be distributed across all worker (assuming 
your topic has 6 partitions). Now when you have more than 6 partition, say 12. 
Then these 6 receivers will start to consume from 2 partitions at a time. And 
when you have less partitions say 3, then 3 of the receivers will be idle.
On 24 Feb 2015 10:16, "bit1...@163.com"  wrote:
Hi, Akhil,Tathagata,

This leads me to another question ,For the Spark Streaming and Kafka 
Integration, If there are more than one Receiver in the cluster, such as 
  val streams = (1 to 6).map ( _ => KafkaUtils.createStream(ssc, zkQuorum, 
group, topicMap).map(_._2) ), 
then these Receivers will stay on one cluster node, or will they distributed 
among the cluster nodes?



bit1...@163.com
 
From: Akhil Das
Date: 2015-02-24 12:58
To: Tathagata Das
CC: user; bit1129
Subject: Re: About FlumeUtils.createStream
I see, thanks for the clarification TD.
On 24 Feb 2015 09:56, "Tathagata Das"  wrote:
Akhil, that is incorrect. 

Spark will list on the given port for Flume to push data into it. 
When in local mode, it will listen on localhost:
When in some kind of cluster, instead of localhost you will have to give the 
hostname of the cluster node where you want Flume to forward the data. Spark 
will launch the Flume receiver on that node (assuming the hostname matching is 
correct), and list on port , for receiving data from Flume. So only the 
configured machine will listen on port . 

I suggest trying the other stream. FlumeUtils.createPollingStream. More details 
here. 
http://spark.apache.org/docs/latest/streaming-flume-integration.html



On Sat, Feb 21, 2015 at 12:17 AM, Akhil Das  wrote:
Spark won't listen on  mate, It basically means you have a flume source 
running at port  of your localhost. And when you submit your application in 
standalone mode, workers will consume date from that port.

Thanks
Best Regards

On Sat, Feb 21, 2015 at 9:22 AM, bit1...@163.com  wrote:

Hi,
In the spark streaming application, I write the code, 
FlumeUtils.createStream(ssc,"localhost",),which means spark will listen on 
the  port, and wait for Flume Sink to write to it.
My question is:  when I submit the application to the Spark Standalone cluster, 
will  be opened only on the Driver Machine or all the workers will also 
open the  port and wait for the Flume data? 








Re: Task not serializable exception

2015-02-23 Thread Kartheek.R
I could trace where the problem is. If I run without any threads, it works
fine. When I allocate threads, I run into Not serializable  problem. But, I
need to have threads in my code.

Any help please!!!

This is my code:
object SparkKart
{
def parseVector(line: String): Vector[Double] = {
DenseVector(line.split(' ').map(_.toDouble))
  }
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("SparkKart")
val sc = new SparkContext(sparkConf)
val lines = sc.textFile(args(0))
val data = lines.map(parseVector
_).persist(StorageLevel.MEMORY_ONLY_SER)
val kPoints = data.takeSample(withReplacement = false, 4, 42).toArray

val thread1= new Thread(new Runnable {
  def run() {
val dist1 =data.map(x => squaredDistance(x,kPoints(0)))
dist1.saveAsTextFile("hdfs:/kart")

 } 
})
thread1.start

}
}



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Task-not-serializable-exception-tp21776p21778.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: InvalidProtocolBufferException: Protocol message end-group tag did not match expected tag

2015-02-23 Thread أنس الليثي
Hadoop version : 2.6.0
Spark Version : 1.2.1

here is also the pom.xml
http://maven.apache.org/POM/4.0.0"; xmlns:xsi="
http://www.w3.org/2001/XMLSchema-instance"; xsi:schemaLocation="
http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
  4.0.0
  TestSpark
  TestSpark
  0.0.1-SNAPSHOT
  
  
  org.apache.spark
  spark-core_2.10
  1.2.1
  
  
  
src

  
maven-compiler-plugin
3.1

  1.8
  1.8

  

  


Best regards

On 24 February 2015 at 08:43, Ted Yu  wrote:

> bq. have installed hadoop on a local virtual machine
>
> Can you tell us the release of hadoop you installed ?
>
> What Spark release are you using ? Or be more specific, what hadoop
> release was the Spark built against ?
>
> Cheers
>
> On Mon, Feb 23, 2015 at 9:37 PM, fanooos  wrote:
>
>> Hi
>>
>> I have installed hadoop on a local virtual machine using the steps from
>> this
>> URL
>>
>>
>> https://www.digitalocean.com/community/tutorials/how-to-install-hadoop-on-ubuntu-13-10
>>
>> In the local machine I write a little Spark application in java to read a
>> file from the hadoop instance installed in the virtual machine.
>>
>> The code is below
>>
>> public static void main(String[] args) {
>>
>> JavaSparkContext sc = new JavaSparkContext(new
>> SparkConf().setAppName("Spark Count").setMaster("local"));
>>
>> JavaRDD lines =
>> sc.textFile("hdfs://10.62.57.141:50070/tmp/lines.txt");
>> JavaRDD lengths = lines.flatMap(new FlatMapFunction> Integer>() {
>> @Override
>> public Iterable call(String t) throws Exception {
>> return Arrays.asList(t.length());
>> }
>> });
>> List collect = lengths.collect();
>> int totalLength = lengths.reduce(new Function2> Integer>() {
>> @Override
>> public Integer call(Integer v1, Integer v2) throws
>> Exception {
>> return v1+v2;
>> }
>> });
>> System.out.println(totalLength);
>>
>>   }
>>
>>
>> The application throws this exception
>>
>> Exception in thread "main" java.io.IOException: Failed on local
>> exception: com.google.protobuf.InvalidProtocolBufferException: Protocol
>> message end-group tag did not match expected tag.; Host Details : local
>> host
>> is: "TOSHIBA-PC/192.168.56.1"; destination host is: "10.62.57.141":50070;
>> at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:764)
>> at org.apache.hadoop.ipc.Client.call(Client.java:1351)
>> at org.apache.hadoop.ipc.Client.call(Client.java:1300)
>> at
>>
>> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
>> at com.sun.proxy.$Proxy12.getFileInfo(Unknown Source)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
>> at java.lang.reflect.Method.invoke(Unknown Source)
>> at
>>
>> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
>> at
>>
>> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>> at com.sun.proxy.$Proxy12.getFileInfo(Unknown Source)
>> at
>>
>> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:651)
>> at
>> org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:1679)
>> at
>>
>> org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1106)
>> at
>>
>> org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1102)
>> at
>>
>> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>> at
>>
>> org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1102)
>> at
>> org.apache.hadoop.fs.FileSystem.globStatusInternal(FileSystem.java:1701)
>> at
>> org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1647)
>> at
>>
>> org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:222)
>> at
>>
>> org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:270)
>> at
>> org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:201)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220)
>> at scala.Option.getOrElse(Option.scala:120)
>> at org.apache.spark.rdd.RDD.partitions(RDD.scala:220)
>> at
>> org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222)
>> at
>> org.apache.s

Re: Write ahead Logs and checkpoint

2015-02-23 Thread Tathagata Das
Exactly, that is the reason.

To avoid that, in Spark 1.3 to-be-released, we have added a new Kafka API
(called direct stream) which does not use Zookeeper at all to keep track of
progress, and maintains offset within Spark Streaming. That can guarantee
all records being received exactly-once. Its experimental for now, but we
will make it stable. Please try it out.

TD

On Mon, Feb 23, 2015 at 9:41 PM, V Dineshkumar  wrote:

> Hi,
>
> My spark streaming application is pulling data from Kafka.To prevent data
> loss I have implemented WAL and enable checkpointing.On killing my
> application and restarting it I am able to prevent data loss now but
> however I am getting duplicate messages.
>
> Is it because the application got killed before it was able checkpoint the
> current processing state??
> If yes how to tackle the duplicate messages?
>
> Thanks,
> Dinesh
>


Re: Getting to proto buff classes in Spark Context

2015-02-23 Thread necro351 .
Sorry Ted, that was me clumsily trying to redact my organization's name
from the computer output (in my e-mail editor). I can assure you that
basically defend7 and rick are the same thing in this case so the class is
present in the jar.

On Mon Feb 23 2015 at 9:39:09 PM Ted Yu  wrote:

> The classname given in stack trace was com.rick.reports.Reports
>
> In the output from jar command the class is com.defend7.reports.Reports.
>
> FYI
>
> On Mon, Feb 23, 2015 at 9:33 PM, necro351 .  wrote:
>
>> Hi Ted,
>>
>> Yes it appears to be:
>> rick@ubuntu:~/go/src/rick/sparksprint/containers/tests/StreamingReports$
>> jar tvf
>> ../../../analyzer/spark/target/scala-2.10/rick-processors-assembly-1.0.jar|grep
>> SensorReports
>>   1128 Mon Feb 23 17:34:46 PST 2015
>> com/defend7/reports/Reports$SensorReports$1.class
>>  13507 Mon Feb 23 17:34:46 PST 2015
>> com/defend7/reports/Reports$SensorReports$Builder.class
>>  10640 Mon Feb 23 17:34:46 PST 2015
>> com/defend7/reports/Reports$SensorReports.class
>>815 Mon Feb 23 17:34:46 PST 2015
>> com/defend7/reports/Reports$SensorReportsOrBuilder.class
>>
>>
>> On Mon Feb 23 2015 at 8:57:18 PM Ted Yu  wrote:
>>
>>> bq. Caused by: java.lang.ClassNotFoundException: com.rick.reports.
>>> Reports$SensorReports
>>>
>>> Is Reports$SensorReports class in rick-processors-assembly-1.0.jar ?
>>>
>>> Thanks
>>>
>>> On Mon, Feb 23, 2015 at 8:43 PM, necro351 .  wrote:
>>>
 Hello,

 I am trying to deserialize some data encoded using proto buff from
 within Spark and am getting class-not-found exceptions. I have narrowed the
 program down to something very simple that shows the problem exactly (see
 'The Program' below) and hopefully someone can tell me the easy fix :)

 So the situation is I have some proto buff reports in /tmp/reports. I
 also have a Spark project with the below Scala code (under The Program) as
 well as a Java file defining SensorReports all in the same src sub-tree in
 my Spark project. Its built using sbt in the standard way. The Spark job
 reads in the reports from /tmp/reports and then prints them to the console.
 When I build and run my spark job with spark-submit everything works as
 expected and the reports are printed out. When I uncomment the 'XXX'
 variant in the Scala spark program and try to print the reports from within
 a Spark Context I get the class-not-found exceptions. I don't understand
 why. If I get this working then I will want to do more than just print the
 reports from within the Spark Context.

 My read of the documentation tells me that my spark job should have
 access to everything in the submitted jar and that jar includes the Java
 code generated by the proto buff library which defines SensorReports. This
 is the spark-submit invocation I use after building my job as an assembly
 with the sbt-assembly plugin:

 spark-submit --class com.rick.processors.NewReportProcessor --master
 local[*]
 ../../../analyzer/spark/target/scala-2.10/rick-processors-assembly-1.0.jar

 I have also tried adding the jar programmatically using sc.addJar but
 that does not help. I found a bug from July (
 https://github.com/apache/spark/pull/181) that seems related but it
 went into Spark 1.2.0 (which is what I am currently using) so I don't think
 that's it.

 Any ideas? Thanks!

 The Program:
 ==
 package com.rick.processors



 import java.io.File

 import java.nio.file.{Path, Files, FileSystems}

 import org.apache.spark.{SparkContext, SparkConf}

 import com.rick.reports.Reports.SensorReports



 object NewReportProcessor {

   private val sparkConf = new SparkConf().setAppName("ReportProcessor")

   private val sc = new SparkContext(sparkConf)



   def main(args: Array[String]) = {

 val protoBuffsBinary = localFileReports()

 val sensorReportsBundles = protoBuffsBinary.map(bundle =>
 SensorReports.parseFrom(bundle))
 // XXX: Printing from within the SparkContext throws
 class-not-found
 // exceptions, why?

 // sc.makeRDD(sensorReportsBundles).foreach((x: SensorReports) =>
 println(x.toString))
 sensorReportsBundles.foreach((x: SensorReports) =>
 println(x.toString))
   }



   private def localFileReports() = {

 val reportDir = new File("/tmp/reports")

 val reportFiles =
 reportDir.listFiles.filter(_.getName.endsWith(".report"))

 reportFiles.map(file => {

   val path = FileSystems.getDefault().getPath("/tmp/reports",
 file.getName())
   Files.readAllBytes(path)

 })

   }

 }

 The Class-not-found exceptions:
 =
 Spark assembly has been built with Hive, including Datanucleus ja

Re: Magic number 16: Why doesn't Spark Streaming process more than 16 files?

2015-02-23 Thread Tathagata Das
In case this mystery has not been solved, DStream.print() essentially does
a RDD.take(10) on each RDD, which computes only a subset of the partitions
in the RDD. But collects forces the evaluation of all the RDDs. Since you
are writing to json in the mapI() function, this could be the reason.

TD

On Wed, Feb 18, 2015 at 7:15 AM, Imran Rashid  wrote:

> so if you only change this line:
>
>
> https://gist.github.com/emres/0fb6de128baea099e741#file-mymoduledriver-java-L137
>
> to
>
> json.print()
>
> it processes 16 files instead?  I am totally perplexed.  My only
> suggestions to help debug are
> (1) see what happens when you get rid of MyModuleWorker completely --
> change MyModuleDriver#process to just
> inStream.print()
> and see what happens
>
> (2) stick a bunch of printlns into MyModuleWorker#call
>
> (3) turn on DEBUG logging
> for org.apache.spark.streaming.dstream.FileInputDStream
>
> my gut instinct is that something else is flaky about the file input
> stream (eg., it makes some assumption about the file system which maybe
> aren't valid in your case, it has a bunch of caveats), and that it has just
> happened to work sometimes with your foreachRdd and failed sometimes with
> print.
>
> Sorry I am not a lot of help in this case, hope this leads you down the
> right track or somebody else can help out.
>
> Imran
>
>
> On Wed, Feb 18, 2015 at 2:28 AM, Emre Sevinc 
> wrote:
>
>> Hello Imran,
>>
>> (a) I know that all 20 files are processed when I use foreachRDD, because
>> I can see the processed files in the output directory. (My application
>> logic writes them to an output directory after they are processed, *but*
>> that writing operation does not happen in foreachRDD, below you can see the
>> URL that includes my code and clarifies this).
>>
>> (b) I know only 16 files are processed because in the output directory I
>> see only 16 files processed. I wait for minutes and minutes and no more
>> files appear in the output directory. When I see only 16 files are
>> processed and Spark Streaming went to the mode of idly watching the input
>> directory, and then if I copy a few more files, they are also processed.
>>
>> (c) Sure, you can see part of my code in the following gist:
>> https://gist.github.com/emres/0fb6de128baea099e741
>>  It might seem a little convoluted at first, because my application
>> is divided into two classes, a Driver class (setting up things and
>> initializing them), and a Worker class (that implements the core
>> functionality). I've also put the relevant methods from the my utility
>> classes for completeness.
>>
>> I am as perplexed as you are as to why forcing the output via foreachRDD
>> ended up in different behaviour compared to simply using print() method.
>>
>> Kind regards,
>> Emre
>>
>>
>>
>> On Tue, Feb 17, 2015 at 4:23 PM, Imran Rashid 
>> wrote:
>>
>>> Hi Emre,
>>>
>>> there shouldn't be any difference in which files get processed w/
>>> print() vs. foreachRDD().  In fact, if you look at the definition of
>>> print(), it is just calling foreachRDD() underneath.  So there is something
>>> else going on here.
>>>
>>> We need a little more information to figure out exactly what is going
>>> on.  (I think Sean was getting at the same thing ...)
>>>
>>> (a) how do you know that when you use foreachRDD, all 20 files get
>>> processed?
>>>
>>> (b) How do you know that only 16 files get processed when you print()?
>>> Do you know the other files are being skipped, or maybe they are just
>>> "stuck" somewhere?  eg., suppose you start w/ 20 files, and you see 16 get
>>> processed ... what happens after you add a few more files to the
>>> directory?  Are they processed immediately, or are they never processed
>>> either?
>>>
>>> (c) Can you share any more code of what you are doing to the dstreams
>>> *before* the print() / foreachRDD()?  That might give us more details about
>>> what the difference is.
>>>
>>> I can't see how .count.println() would be different than just println(),
>>> but maybe I am missing something also.
>>>
>>> Imran
>>>
>>> On Mon, Feb 16, 2015 at 7:49 AM, Emre Sevinc 
>>> wrote:
>>>
 Sean,

 In this case, I've been testing the code on my local machine and using
 Spark locally, so I all the log output was available on my terminal. And
 I've used the .print() method to have an output operation, just to force
 Spark execute.

 And I was not using foreachRDD, I was only using print() method on a
 JavaDStream object, and it was working fine for a few files, up to 16 (and
 without print() it did not do anything because there were no output
 operations).

 To sum it up, in my case:

  - Initially, use .print() and no foreachRDD: processes up to 16 files
 and does not do anything for the remaining 4.
  - Remove .print() and use foreachRDD: processes all of the 20 files.

 Maybe, as in Akhil Das's suggestion, using .count.print() might also
 have fixed my problem, but I'm s

Re: InvalidProtocolBufferException: Protocol message end-group tag did not match expected tag

2015-02-23 Thread Ted Yu
bq. have installed hadoop on a local virtual machine

Can you tell us the release of hadoop you installed ?

What Spark release are you using ? Or be more specific, what hadoop release
was the Spark built against ?

Cheers

On Mon, Feb 23, 2015 at 9:37 PM, fanooos  wrote:

> Hi
>
> I have installed hadoop on a local virtual machine using the steps from
> this
> URL
>
>
> https://www.digitalocean.com/community/tutorials/how-to-install-hadoop-on-ubuntu-13-10
>
> In the local machine I write a little Spark application in java to read a
> file from the hadoop instance installed in the virtual machine.
>
> The code is below
>
> public static void main(String[] args) {
>
> JavaSparkContext sc = new JavaSparkContext(new
> SparkConf().setAppName("Spark Count").setMaster("local"));
>
> JavaRDD lines =
> sc.textFile("hdfs://10.62.57.141:50070/tmp/lines.txt");
> JavaRDD lengths = lines.flatMap(new FlatMapFunction Integer>() {
> @Override
> public Iterable call(String t) throws Exception {
> return Arrays.asList(t.length());
> }
> });
> List collect = lengths.collect();
> int totalLength = lengths.reduce(new Function2 Integer>() {
> @Override
> public Integer call(Integer v1, Integer v2) throws
> Exception {
> return v1+v2;
> }
> });
> System.out.println(totalLength);
>
>   }
>
>
> The application throws this exception
>
> Exception in thread "main" java.io.IOException: Failed on local
> exception: com.google.protobuf.InvalidProtocolBufferException: Protocol
> message end-group tag did not match expected tag.; Host Details : local
> host
> is: "TOSHIBA-PC/192.168.56.1"; destination host is: "10.62.57.141":50070;
> at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:764)
> at org.apache.hadoop.ipc.Client.call(Client.java:1351)
> at org.apache.hadoop.ipc.Client.call(Client.java:1300)
> at
>
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
> at com.sun.proxy.$Proxy12.getFileInfo(Unknown Source)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
> at java.lang.reflect.Method.invoke(Unknown Source)
> at
>
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
> at
>
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
> at com.sun.proxy.$Proxy12.getFileInfo(Unknown Source)
> at
>
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:651)
> at
> org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:1679)
> at
>
> org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1106)
> at
>
> org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1102)
> at
>
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
> at
>
> org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1102)
> at
> org.apache.hadoop.fs.FileSystem.globStatusInternal(FileSystem.java:1701)
> at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1647)
> at
>
> org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:222)
> at
>
> org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:270)
> at
> org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:201)
> at
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222)
> at
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220)
> at scala.Option.getOrElse(Option.scala:120)
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:220)
> at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
> at
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222)
> at
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220)
> at scala.Option.getOrElse(Option.scala:120)
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:220)
> at
> org.apache.spark.rdd.FlatMappedRDD.getPartitions(FlatMappedRDD.scala:30)
> at
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222)
> at
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220)
> at scala.Option.getOrElse(Option.scala:120)
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:220)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1367)
> at org.apache.spark.rdd.RDD.collect(RDD.scala:797)
> 

Write ahead Logs and checkpoint

2015-02-23 Thread V Dineshkumar
Hi,

My spark streaming application is pulling data from Kafka.To prevent data
loss I have implemented WAL and enable checkpointing.On killing my
application and restarting it I am able to prevent data loss now but
however I am getting duplicate messages.

Is it because the application got killed before it was able checkpoint the
current processing state??
If yes how to tackle the duplicate messages?

Thanks,
Dinesh


Re: Getting to proto buff classes in Spark Context

2015-02-23 Thread Ted Yu
The classname given in stack trace was com.rick.reports.Reports

In the output from jar command the class is com.defend7.reports.Reports.

FYI

On Mon, Feb 23, 2015 at 9:33 PM, necro351 .  wrote:

> Hi Ted,
>
> Yes it appears to be:
> rick@ubuntu:~/go/src/rick/sparksprint/containers/tests/StreamingReports$
> jar tvf
> ../../../analyzer/spark/target/scala-2.10/rick-processors-assembly-1.0.jar|grep
> SensorReports
>   1128 Mon Feb 23 17:34:46 PST 2015
> com/defend7/reports/Reports$SensorReports$1.class
>  13507 Mon Feb 23 17:34:46 PST 2015
> com/defend7/reports/Reports$SensorReports$Builder.class
>  10640 Mon Feb 23 17:34:46 PST 2015
> com/defend7/reports/Reports$SensorReports.class
>815 Mon Feb 23 17:34:46 PST 2015
> com/defend7/reports/Reports$SensorReportsOrBuilder.class
>
>
> On Mon Feb 23 2015 at 8:57:18 PM Ted Yu  wrote:
>
>> bq. Caused by: java.lang.ClassNotFoundException: com.rick.reports.
>> Reports$SensorReports
>>
>> Is Reports$SensorReports class in rick-processors-assembly-1.0.jar ?
>>
>> Thanks
>>
>> On Mon, Feb 23, 2015 at 8:43 PM, necro351 .  wrote:
>>
>>> Hello,
>>>
>>> I am trying to deserialize some data encoded using proto buff from
>>> within Spark and am getting class-not-found exceptions. I have narrowed the
>>> program down to something very simple that shows the problem exactly (see
>>> 'The Program' below) and hopefully someone can tell me the easy fix :)
>>>
>>> So the situation is I have some proto buff reports in /tmp/reports. I
>>> also have a Spark project with the below Scala code (under The Program) as
>>> well as a Java file defining SensorReports all in the same src sub-tree in
>>> my Spark project. Its built using sbt in the standard way. The Spark job
>>> reads in the reports from /tmp/reports and then prints them to the console.
>>> When I build and run my spark job with spark-submit everything works as
>>> expected and the reports are printed out. When I uncomment the 'XXX'
>>> variant in the Scala spark program and try to print the reports from within
>>> a Spark Context I get the class-not-found exceptions. I don't understand
>>> why. If I get this working then I will want to do more than just print the
>>> reports from within the Spark Context.
>>>
>>> My read of the documentation tells me that my spark job should have
>>> access to everything in the submitted jar and that jar includes the Java
>>> code generated by the proto buff library which defines SensorReports. This
>>> is the spark-submit invocation I use after building my job as an assembly
>>> with the sbt-assembly plugin:
>>>
>>> spark-submit --class com.rick.processors.NewReportProcessor --master
>>> local[*]
>>> ../../../analyzer/spark/target/scala-2.10/rick-processors-assembly-1.0.jar
>>>
>>> I have also tried adding the jar programmatically using sc.addJar but
>>> that does not help. I found a bug from July (
>>> https://github.com/apache/spark/pull/181) that seems related but it
>>> went into Spark 1.2.0 (which is what I am currently using) so I don't think
>>> that's it.
>>>
>>> Any ideas? Thanks!
>>>
>>> The Program:
>>> ==
>>> package com.rick.processors
>>>
>>>
>>>
>>> import java.io.File
>>>
>>> import java.nio.file.{Path, Files, FileSystems}
>>>
>>> import org.apache.spark.{SparkContext, SparkConf}
>>>
>>> import com.rick.reports.Reports.SensorReports
>>>
>>>
>>>
>>> object NewReportProcessor {
>>>
>>>   private val sparkConf = new SparkConf().setAppName("ReportProcessor")
>>>
>>>   private val sc = new SparkContext(sparkConf)
>>>
>>>
>>>
>>>   def main(args: Array[String]) = {
>>>
>>> val protoBuffsBinary = localFileReports()
>>>
>>> val sensorReportsBundles = protoBuffsBinary.map(bundle =>
>>> SensorReports.parseFrom(bundle))
>>> // XXX: Printing from within the SparkContext throws class-not-found
>>>
>>> // exceptions, why?
>>>
>>> // sc.makeRDD(sensorReportsBundles).foreach((x: SensorReports) =>
>>> println(x.toString))
>>> sensorReportsBundles.foreach((x: SensorReports) =>
>>> println(x.toString))
>>>   }
>>>
>>>
>>>
>>>   private def localFileReports() = {
>>>
>>> val reportDir = new File("/tmp/reports")
>>>
>>> val reportFiles =
>>> reportDir.listFiles.filter(_.getName.endsWith(".report"))
>>>
>>> reportFiles.map(file => {
>>>
>>>   val path = FileSystems.getDefault().getPath("/tmp/reports",
>>> file.getName())
>>>   Files.readAllBytes(path)
>>>
>>> })
>>>
>>>   }
>>>
>>> }
>>>
>>> The Class-not-found exceptions:
>>> =
>>> Spark assembly has been built with Hive, including Datanucleus jars on
>>> classpath
>>> Using Spark's default log4j profile:
>>> org/apache/spark/log4j-defaults.properties
>>> 15/02/23 17:35:03 WARN Utils: Your hostname, ubuntu resolves to a
>>> loopback address: 127.0.1.1; using 192.168.241.128 instead (on interface
>>> eth0)
>>> 15/02/23 17:35:03 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to
>>> another address
>>> 15/02/23 17:35:04 INFO SecurityManager: Changing view acls to: 

InvalidProtocolBufferException: Protocol message end-group tag did not match expected tag

2015-02-23 Thread fanooos
Hi 

I have installed hadoop on a local virtual machine using the steps from this
URL

https://www.digitalocean.com/community/tutorials/how-to-install-hadoop-on-ubuntu-13-10

In the local machine I write a little Spark application in java to read a
file from the hadoop instance installed in the virtual machine. 

The code is below

public static void main(String[] args) {

JavaSparkContext sc = new JavaSparkContext(new
SparkConf().setAppName("Spark Count").setMaster("local"));

JavaRDD lines =
sc.textFile("hdfs://10.62.57.141:50070/tmp/lines.txt");
JavaRDD lengths = lines.flatMap(new FlatMapFunction() {
@Override
public Iterable call(String t) throws Exception {
return Arrays.asList(t.length());
}
});
List collect = lengths.collect();
int totalLength = lengths.reduce(new Function2() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1+v2;
}
});
System.out.println(totalLength);

  }


The application throws this exception

Exception in thread "main" java.io.IOException: Failed on local
exception: com.google.protobuf.InvalidProtocolBufferException: Protocol
message end-group tag did not match expected tag.; Host Details : local host
is: "TOSHIBA-PC/192.168.56.1"; destination host is: "10.62.57.141":50070; 
at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:764)
at org.apache.hadoop.ipc.Client.call(Client.java:1351)
at org.apache.hadoop.ipc.Client.call(Client.java:1300)
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
at com.sun.proxy.$Proxy12.getFileInfo(Unknown Source)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy12.getFileInfo(Unknown Source)
at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:651)
at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:1679)
at
org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1106)
at
org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1102)
at
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at
org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1102)
at 
org.apache.hadoop.fs.FileSystem.globStatusInternal(FileSystem.java:1701)
at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1647)
at
org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:222)
at
org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:270)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:201)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:220)
at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:220)
at 
org.apache.spark.rdd.FlatMappedRDD.getPartitions(FlatMappedRDD.scala:30)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:220)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1367)
at org.apache.spark.rdd.RDD.collect(RDD.scala:797)
at
org.apache.spark.api.java.JavaRDDLike$class.collect(JavaRDDLike.scala:309)
at org.apache.spark.api.java.JavaRDD.collect(JavaRDD.scala:32)
at org.css.RaiSpark.RaiSparkApp.main(RaiSparkApp.java:25)
Caused by: com.google.protobuf.InvalidProtocolBufferException: Protocol
message end-group tag did not match expected tag.
at
com.google.protobuf.InvalidProtocolBufferException.invalidEndTag(InvalidProtocolBufferException.java:94)
at
com.google.protobuf.CodedInputStre

Re: Getting to proto buff classes in Spark Context

2015-02-23 Thread necro351 .
Hi Ted,

Yes it appears to be:
rick@ubuntu:~/go/src/rick/sparksprint/containers/tests/StreamingReports$
jar tvf
../../../analyzer/spark/target/scala-2.10/rick-processors-assembly-1.0.jar|grep
SensorReports
  1128 Mon Feb 23 17:34:46 PST 2015
com/defend7/reports/Reports$SensorReports$1.class
 13507 Mon Feb 23 17:34:46 PST 2015
com/defend7/reports/Reports$SensorReports$Builder.class
 10640 Mon Feb 23 17:34:46 PST 2015
com/defend7/reports/Reports$SensorReports.class
   815 Mon Feb 23 17:34:46 PST 2015
com/defend7/reports/Reports$SensorReportsOrBuilder.class


On Mon Feb 23 2015 at 8:57:18 PM Ted Yu  wrote:

> bq. Caused by: java.lang.ClassNotFoundException: com.rick.reports.Reports$
> SensorReports
>
> Is Reports$SensorReports class in rick-processors-assembly-1.0.jar ?
>
> Thanks
>
> On Mon, Feb 23, 2015 at 8:43 PM, necro351 .  wrote:
>
>> Hello,
>>
>> I am trying to deserialize some data encoded using proto buff from within
>> Spark and am getting class-not-found exceptions. I have narrowed the
>> program down to something very simple that shows the problem exactly (see
>> 'The Program' below) and hopefully someone can tell me the easy fix :)
>>
>> So the situation is I have some proto buff reports in /tmp/reports. I
>> also have a Spark project with the below Scala code (under The Program) as
>> well as a Java file defining SensorReports all in the same src sub-tree in
>> my Spark project. Its built using sbt in the standard way. The Spark job
>> reads in the reports from /tmp/reports and then prints them to the console.
>> When I build and run my spark job with spark-submit everything works as
>> expected and the reports are printed out. When I uncomment the 'XXX'
>> variant in the Scala spark program and try to print the reports from within
>> a Spark Context I get the class-not-found exceptions. I don't understand
>> why. If I get this working then I will want to do more than just print the
>> reports from within the Spark Context.
>>
>> My read of the documentation tells me that my spark job should have
>> access to everything in the submitted jar and that jar includes the Java
>> code generated by the proto buff library which defines SensorReports. This
>> is the spark-submit invocation I use after building my job as an assembly
>> with the sbt-assembly plugin:
>>
>> spark-submit --class com.rick.processors.NewReportProcessor --master
>> local[*]
>> ../../../analyzer/spark/target/scala-2.10/rick-processors-assembly-1.0.jar
>>
>> I have also tried adding the jar programmatically using sc.addJar but
>> that does not help. I found a bug from July (
>> https://github.com/apache/spark/pull/181) that seems related but it went
>> into Spark 1.2.0 (which is what I am currently using) so I don't think
>> that's it.
>>
>> Any ideas? Thanks!
>>
>> The Program:
>> ==
>> package com.rick.processors
>>
>>
>>
>> import java.io.File
>>
>> import java.nio.file.{Path, Files, FileSystems}
>>
>> import org.apache.spark.{SparkContext, SparkConf}
>>
>> import com.rick.reports.Reports.SensorReports
>>
>>
>>
>> object NewReportProcessor {
>>
>>   private val sparkConf = new SparkConf().setAppName("ReportProcessor")
>>
>>   private val sc = new SparkContext(sparkConf)
>>
>>
>>
>>   def main(args: Array[String]) = {
>>
>> val protoBuffsBinary = localFileReports()
>>
>> val sensorReportsBundles = protoBuffsBinary.map(bundle =>
>> SensorReports.parseFrom(bundle))
>> // XXX: Printing from within the SparkContext throws class-not-found
>>
>> // exceptions, why?
>>
>> // sc.makeRDD(sensorReportsBundles).foreach((x: SensorReports) =>
>> println(x.toString))
>> sensorReportsBundles.foreach((x: SensorReports) =>
>> println(x.toString))
>>   }
>>
>>
>>
>>   private def localFileReports() = {
>>
>> val reportDir = new File("/tmp/reports")
>>
>> val reportFiles =
>> reportDir.listFiles.filter(_.getName.endsWith(".report"))
>>
>> reportFiles.map(file => {
>>
>>   val path = FileSystems.getDefault().getPath("/tmp/reports",
>> file.getName())
>>   Files.readAllBytes(path)
>>
>> })
>>
>>   }
>>
>> }
>>
>> The Class-not-found exceptions:
>> =
>> Spark assembly has been built with Hive, including Datanucleus jars on
>> classpath
>> Using Spark's default log4j profile:
>> org/apache/spark/log4j-defaults.properties
>> 15/02/23 17:35:03 WARN Utils: Your hostname, ubuntu resolves to a
>> loopback address: 127.0.1.1; using 192.168.241.128 instead (on interface
>> eth0)
>> 15/02/23 17:35:03 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to
>> another address
>> 15/02/23 17:35:04 INFO SecurityManager: Changing view acls to: rick
>> 15/02/23 17:35:04 INFO SecurityManager: Changing modify acls to: rick
>> 15/02/23 17:35:04 INFO SecurityManager: SecurityManager: authentication
>> disabled; ui acls disabled; users with view permissions: Set(rick); users
>> with modify permissions: Set(rick)
>> 15/02/23 17:35:04 INFO Slf4jLogger: Slf4jLogger started
>> 15/02/23 17:35:04 IN

How to get yarn logs to display in the spark or yarn history-server?

2015-02-23 Thread Colin Kincaid Williams
Hi,

I have been trying to get my yarn logs to display in the spark
history-server or yarn history-server. I can see the log information


yarn logs -applicationId application_1424740955620_0009
15/02/23 22:15:14 INFO client.ConfiguredRMFailoverProxyProvider: Failing
over to us3sm2hbqa04r07-comp-prod-local


Container: container_1424740955620_0009_01_02 on
us3sm2hbqa07r07.comp.prod.local_8041
===
LogType: stderr
LogLength: 0
Log Contents:

LogType: stdout
LogLength: 897
Log Contents:
[GC [PSYoungGen: 262656K->23808K(306176K)] 262656K->23880K(1005568K),
0.0283450 secs] [Times: user=0.14 sys=0.03, real=0.03 secs]
Heap
 PSYoungGen  total 306176K, used 111279K [0xeaa8,
0x0001, 0x0001)
  eden space 262656K, 33% used
[0xeaa8,0xeffebbe0,0xfab0)
  from space 43520K, 54% used
[0xfab0,0xfc240320,0xfd58)
  to   space 43520K, 0% used
[0xfd58,0xfd58,0x0001)
 ParOldGen   total 699392K, used 72K [0xbff8,
0xeaa8, 0xeaa8)
  object space 699392K, 0% used
[0xbff8,0xbff92010,0xeaa8)
 PSPermGen   total 35328K, used 34892K [0xbad8,
0xbd00, 0xbff8)
  object space 35328K, 98% used
[0xbad8,0xbcf93088,0xbd00)



Container: container_1424740955620_0009_01_03 on
us3sm2hbqa09r09.comp.prod.local_8041
===
LogType: stderr
LogLength: 0
Log Contents:

LogType: stdout
LogLength: 896
Log Contents:
[GC [PSYoungGen: 262656K->23725K(306176K)] 262656K->23797K(1005568K),
0.0358650 secs] [Times: user=0.28 sys=0.04, real=0.04 secs]
Heap
 PSYoungGen  total 306176K, used 65712K [0xeaa8,
0x0001, 0x0001)
  eden space 262656K, 15% used
[0xeaa8,0xed380bf8,0xfab0)
  from space 43520K, 54% used
[0xfab0,0xfc22b4f8,0xfd58)
  to   space 43520K, 0% used
[0xfd58,0xfd58,0x0001)
 ParOldGen   total 699392K, used 72K [0xbff8,
0xeaa8, 0xeaa8)
  object space 699392K, 0% used
[0xbff8,0xbff92010,0xeaa8)
 PSPermGen   total 29696K, used 29486K [0xbad8,
0xbca8, 0xbff8)
  object space 29696K, 99% used
[0xbad8,0xbca4b838,0xbca8)



Container: container_1424740955620_0009_01_01 on
us3sm2hbqa09r09.comp.prod.local_8041
===
LogType: stderr
LogLength: 0
Log Contents:

LogType: stdout
LogLength: 21
Log Contents:
Pi is roughly 3.1416

I can see some details for the application in the spark history-server at
this url
http://us3sm2hbqa04r07.comp.prod.local:18080/history/application_1424740955620_0009/jobs/
. When running in spark-master mode, I can see the stdout and stderror
somewhere in the spark history-server. Then how do I get the information
which I see above into the Spark history-server ?


Re: Re: About FlumeUtils.createStream

2015-02-23 Thread Akhil Das
That depends on how many machines you have in your cluster. Say you have 6
workers and its most likely it is to be distributed across all worker
(assuming your topic has 6 partitions). Now when you have more than 6
partition, say 12. Then these 6 receivers will start to consume from 2
partitions at a time. And when you have less partitions say 3, then 3 of
the receivers will be idle.
On 24 Feb 2015 10:16, "bit1...@163.com"  wrote:

> Hi, Akhil,Tathagata,
>
> This leads me to another question ,For the Spark Streaming and Kafka
> Integration, If there are more than one Receiver in the cluster, such as
>   val streams = (1 to 6).map ( _ => KafkaUtils.createStream(ssc,
> zkQuorum, group, topicMap).map(_._2) ),
> then these Receivers will stay on one cluster node, or will they
> distributed among the cluster nodes?
>
> --
> bit1...@163.com
>
>
> *From:* Akhil Das 
> *Date:* 2015-02-24 12:58
> *To:* Tathagata Das 
> *CC:* user ; bit1129 
> *Subject:* Re: About FlumeUtils.createStream
>
> I see, thanks for the clarification TD.
> On 24 Feb 2015 09:56, "Tathagata Das"  wrote:
>
>> Akhil, that is incorrect.
>>
>> Spark will list on the given port for Flume to push data into it.
>> When in local mode, it will listen on localhost:
>> When in some kind of cluster, instead of localhost you will have to give
>> the hostname of the cluster node where you want Flume to forward the data.
>> Spark will launch the Flume receiver on that node (assuming the hostname
>> matching is correct), and list on port , for receiving data from Flume.
>> So only the configured machine will listen on port .
>>
>> I suggest trying the other stream. FlumeUtils.createPollingStream. More
>> details here.
>> http://spark.apache.org/docs/latest/streaming-flume-integration.html
>>
>>
>>
>> On Sat, Feb 21, 2015 at 12:17 AM, Akhil Das 
>> wrote:
>>
>>> Spark won't listen on  mate, It basically means you have a flume
>>> source running at port  of your localhost. And when you submit your
>>> application in standalone mode, workers will consume date from that port.
>>>
>>> Thanks
>>> Best Regards
>>>
>>> On Sat, Feb 21, 2015 at 9:22 AM, bit1...@163.com 
>>> wrote:
>>>

 Hi,
 In the spark streaming application, I write the code, 
 FlumeUtils.createStream(ssc,"localhost",),which
 means spark will listen on the  port, and wait for Flume Sink to write
 to it.
 My question is:  when I submit the application to the Spark Standalone
 cluster, will  be opened only on the Driver Machine or all the workers
 will also open the  port and wait for the Flume data?

 --


>>>
>>


Re: Re: About FlumeUtils.createStream

2015-02-23 Thread Tathagata Das
Distributed among cluster nodes.

On Mon, Feb 23, 2015 at 8:45 PM, bit1...@163.com  wrote:

> Hi, Akhil,Tathagata,
>
> This leads me to another question ,For the Spark Streaming and Kafka
> Integration, If there are more than one Receiver in the cluster, such as
>   val streams = (1 to 6).map ( _ => KafkaUtils.createStream(ssc,
> zkQuorum, group, topicMap).map(_._2) ),
> then these Receivers will stay on one cluster node, or will they
> distributed among the cluster nodes?
>
> --
> bit1...@163.com
>
>
> *From:* Akhil Das 
> *Date:* 2015-02-24 12:58
> *To:* Tathagata Das 
> *CC:* user ; bit1129 
> *Subject:* Re: About FlumeUtils.createStream
>
> I see, thanks for the clarification TD.
> On 24 Feb 2015 09:56, "Tathagata Das"  wrote:
>
>> Akhil, that is incorrect.
>>
>> Spark will list on the given port for Flume to push data into it.
>> When in local mode, it will listen on localhost:
>> When in some kind of cluster, instead of localhost you will have to give
>> the hostname of the cluster node where you want Flume to forward the data.
>> Spark will launch the Flume receiver on that node (assuming the hostname
>> matching is correct), and list on port , for receiving data from Flume.
>> So only the configured machine will listen on port .
>>
>> I suggest trying the other stream. FlumeUtils.createPollingStream. More
>> details here.
>> http://spark.apache.org/docs/latest/streaming-flume-integration.html
>>
>>
>>
>> On Sat, Feb 21, 2015 at 12:17 AM, Akhil Das 
>> wrote:
>>
>>> Spark won't listen on  mate, It basically means you have a flume
>>> source running at port  of your localhost. And when you submit your
>>> application in standalone mode, workers will consume date from that port.
>>>
>>> Thanks
>>> Best Regards
>>>
>>> On Sat, Feb 21, 2015 at 9:22 AM, bit1...@163.com 
>>> wrote:
>>>

 Hi,
 In the spark streaming application, I write the code, 
 FlumeUtils.createStream(ssc,"localhost",),which
 means spark will listen on the  port, and wait for Flume Sink to write
 to it.
 My question is:  when I submit the application to the Spark Standalone
 cluster, will  be opened only on the Driver Machine or all the workers
 will also open the  port and wait for the Flume data?

 --


>>>
>>


Task not serializable exception

2015-02-23 Thread Kartheek.R
Hi,

I have a file containig data in the following way:
0.0 0.0 0.0
0.1 0.1 0.1
0.2 0.2 0.2
9.0 9.0 9.0
9.1 9.1 9.1
9.2 9.2 9.2
Now I do the folloowing:

val kPoints = data.takeSample(withReplacement = false, 4, 42).toArray
val thread1= new Thread(new Runnable { 
  def run() {
val dist1 =data.map(x => squaredDistance(x,kPoints(0)))
 
 }  
})
thread1.start

I am facing Task not serializable exception:
Exception in thread "Thread-32" org.apache.spark.SparkException: Task not
serializable
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1435)
at org.apache.spark.rdd.RDD.map(RDD.scala:271)
at org.apache.spark.examples.SparkKart$$anon$1.run(SparkKart.scala:67)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.NotSerializableException:
org.apache.spark.examples.SparkKart$$anon$1
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
... 5 more

Any helpl please!!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Task-not-serializable-exception-tp21776.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Getting to proto buff classes in Spark Context

2015-02-23 Thread Ted Yu
bq. Caused by: java.lang.ClassNotFoundException: com.rick.reports.Reports$
SensorReports

Is Reports$SensorReports class in rick-processors-assembly-1.0.jar ?

Thanks

On Mon, Feb 23, 2015 at 8:43 PM, necro351 .  wrote:

> Hello,
>
> I am trying to deserialize some data encoded using proto buff from within
> Spark and am getting class-not-found exceptions. I have narrowed the
> program down to something very simple that shows the problem exactly (see
> 'The Program' below) and hopefully someone can tell me the easy fix :)
>
> So the situation is I have some proto buff reports in /tmp/reports. I also
> have a Spark project with the below Scala code (under The Program) as well
> as a Java file defining SensorReports all in the same src sub-tree in my
> Spark project. Its built using sbt in the standard way. The Spark job reads
> in the reports from /tmp/reports and then prints them to the console. When
> I build and run my spark job with spark-submit everything works as expected
> and the reports are printed out. When I uncomment the 'XXX' variant in the
> Scala spark program and try to print the reports from within a Spark
> Context I get the class-not-found exceptions. I don't understand why. If I
> get this working then I will want to do more than just print the reports
> from within the Spark Context.
>
> My read of the documentation tells me that my spark job should have access
> to everything in the submitted jar and that jar includes the Java code
> generated by the proto buff library which defines SensorReports. This is
> the spark-submit invocation I use after building my job as an assembly with
> the sbt-assembly plugin:
>
> spark-submit --class com.rick.processors.NewReportProcessor --master
> local[*]
> ../../../analyzer/spark/target/scala-2.10/rick-processors-assembly-1.0.jar
>
> I have also tried adding the jar programmatically using sc.addJar but that
> does not help. I found a bug from July (
> https://github.com/apache/spark/pull/181) that seems related but it went
> into Spark 1.2.0 (which is what I am currently using) so I don't think
> that's it.
>
> Any ideas? Thanks!
>
> The Program:
> ==
> package com.rick.processors
>
>
>
> import java.io.File
>
> import java.nio.file.{Path, Files, FileSystems}
>
> import org.apache.spark.{SparkContext, SparkConf}
>
> import com.rick.reports.Reports.SensorReports
>
>
>
> object NewReportProcessor {
>
>   private val sparkConf = new SparkConf().setAppName("ReportProcessor")
>
>   private val sc = new SparkContext(sparkConf)
>
>
>
>   def main(args: Array[String]) = {
>
> val protoBuffsBinary = localFileReports()
>
> val sensorReportsBundles = protoBuffsBinary.map(bundle =>
> SensorReports.parseFrom(bundle))
> // XXX: Printing from within the SparkContext throws class-not-found
>
> // exceptions, why?
>
> // sc.makeRDD(sensorReportsBundles).foreach((x: SensorReports) =>
> println(x.toString))
> sensorReportsBundles.foreach((x: SensorReports) =>
> println(x.toString))
>   }
>
>
>
>   private def localFileReports() = {
>
> val reportDir = new File("/tmp/reports")
>
> val reportFiles =
> reportDir.listFiles.filter(_.getName.endsWith(".report"))
>
> reportFiles.map(file => {
>
>   val path = FileSystems.getDefault().getPath("/tmp/reports",
> file.getName())
>   Files.readAllBytes(path)
>
> })
>
>   }
>
> }
>
> The Class-not-found exceptions:
> =
> Spark assembly has been built with Hive, including Datanucleus jars on
> classpath
> Using Spark's default log4j profile:
> org/apache/spark/log4j-defaults.properties
> 15/02/23 17:35:03 WARN Utils: Your hostname, ubuntu resolves to a loopback
> address: 127.0.1.1; using 192.168.241.128 instead (on interface eth0)
> 15/02/23 17:35:03 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to
> another address
> 15/02/23 17:35:04 INFO SecurityManager: Changing view acls to: rick
> 15/02/23 17:35:04 INFO SecurityManager: Changing modify acls to: rick
> 15/02/23 17:35:04 INFO SecurityManager: SecurityManager: authentication
> disabled; ui acls disabled; users with view permissions: Set(rick); users
> with modify permissions: Set(rick)
> 15/02/23 17:35:04 INFO Slf4jLogger: Slf4jLogger started
> 15/02/23 17:35:04 INFO Remoting: Starting remoting
> 15/02/23 17:35:04 INFO Remoting: Remoting started; listening on addresses
> :[akka.tcp://sparkDriver@192.168.241.128:38110]
> 15/02/23 17:35:04 INFO Utils: Successfully started service 'sparkDriver'
> on port 38110.
> 15/02/23 17:35:04 INFO SparkEnv: Registering MapOutputTracker
> 15/02/23 17:35:04 INFO SparkEnv: Registering BlockManagerMaster
> 15/02/23 17:35:04 INFO DiskBlockManager: Created local directory at
> /tmp/spark-local-20150223173504-b26c
> 15/02/23 17:35:04 INFO MemoryStore: MemoryStore started with capacity
> 267.3 MB
> 15/02/23 17:35:05 WARN NativeCodeLoader: Unable to load native-hadoop
> library for your platform... using builtin-java classes where applicable
> 15/02/23 17:35:05 INFO H

Re: Re: About FlumeUtils.createStream

2015-02-23 Thread bit1...@163.com
Hi, Akhil,Tathagata,

This leads me to another question ,For the Spark Streaming and Kafka 
Integration, If there are more than one Receiver in the cluster, such as 
  val streams = (1 to 6).map ( _ => KafkaUtils.createStream(ssc, zkQuorum, 
group, topicMap).map(_._2) ), 
then these Receivers will stay on one cluster node, or will they distributed 
among the cluster nodes?



bit1...@163.com
 
From: Akhil Das
Date: 2015-02-24 12:58
To: Tathagata Das
CC: user; bit1129
Subject: Re: About FlumeUtils.createStream
I see, thanks for the clarification TD.
On 24 Feb 2015 09:56, "Tathagata Das"  wrote:
Akhil, that is incorrect. 

Spark will list on the given port for Flume to push data into it. 
When in local mode, it will listen on localhost:
When in some kind of cluster, instead of localhost you will have to give the 
hostname of the cluster node where you want Flume to forward the data. Spark 
will launch the Flume receiver on that node (assuming the hostname matching is 
correct), and list on port , for receiving data from Flume. So only the 
configured machine will listen on port . 

I suggest trying the other stream. FlumeUtils.createPollingStream. More details 
here. 
http://spark.apache.org/docs/latest/streaming-flume-integration.html



On Sat, Feb 21, 2015 at 12:17 AM, Akhil Das  wrote:
Spark won't listen on  mate, It basically means you have a flume source 
running at port  of your localhost. And when you submit your application in 
standalone mode, workers will consume date from that port.

Thanks
Best Regards

On Sat, Feb 21, 2015 at 9:22 AM, bit1...@163.com  wrote:

Hi,
In the spark streaming application, I write the code, 
FlumeUtils.createStream(ssc,"localhost",),which means spark will listen on 
the  port, and wait for Flume Sink to write to it.
My question is:  when I submit the application to the Spark Standalone cluster, 
will  be opened only on the Driver Machine or all the workers will also 
open the  port and wait for the Flume data? 








Getting to proto buff classes in Spark Context

2015-02-23 Thread necro351 .
Hello,

I am trying to deserialize some data encoded using proto buff from within
Spark and am getting class-not-found exceptions. I have narrowed the
program down to something very simple that shows the problem exactly (see
'The Program' below) and hopefully someone can tell me the easy fix :)

So the situation is I have some proto buff reports in /tmp/reports. I also
have a Spark project with the below Scala code (under The Program) as well
as a Java file defining SensorReports all in the same src sub-tree in my
Spark project. Its built using sbt in the standard way. The Spark job reads
in the reports from /tmp/reports and then prints them to the console. When
I build and run my spark job with spark-submit everything works as expected
and the reports are printed out. When I uncomment the 'XXX' variant in the
Scala spark program and try to print the reports from within a Spark
Context I get the class-not-found exceptions. I don't understand why. If I
get this working then I will want to do more than just print the reports
from within the Spark Context.

My read of the documentation tells me that my spark job should have access
to everything in the submitted jar and that jar includes the Java code
generated by the proto buff library which defines SensorReports. This is
the spark-submit invocation I use after building my job as an assembly with
the sbt-assembly plugin:

spark-submit --class com.rick.processors.NewReportProcessor --master
local[*]
../../../analyzer/spark/target/scala-2.10/rick-processors-assembly-1.0.jar

I have also tried adding the jar programmatically using sc.addJar but that
does not help. I found a bug from July (
https://github.com/apache/spark/pull/181) that seems related but it went
into Spark 1.2.0 (which is what I am currently using) so I don't think
that's it.

Any ideas? Thanks!

The Program:
==
package com.rick.processors



import java.io.File

import java.nio.file.{Path, Files, FileSystems}

import org.apache.spark.{SparkContext, SparkConf}

import com.rick.reports.Reports.SensorReports



object NewReportProcessor {

  private val sparkConf = new SparkConf().setAppName("ReportProcessor")

  private val sc = new SparkContext(sparkConf)



  def main(args: Array[String]) = {

val protoBuffsBinary = localFileReports()

val sensorReportsBundles = protoBuffsBinary.map(bundle =>
SensorReports.parseFrom(bundle))
// XXX: Printing from within the SparkContext throws class-not-found

// exceptions, why?

// sc.makeRDD(sensorReportsBundles).foreach((x: SensorReports) =>
println(x.toString))
sensorReportsBundles.foreach((x: SensorReports) => println(x.toString))

  }



  private def localFileReports() = {

val reportDir = new File("/tmp/reports")

val reportFiles =
reportDir.listFiles.filter(_.getName.endsWith(".report"))

reportFiles.map(file => {

  val path = FileSystems.getDefault().getPath("/tmp/reports",
file.getName())
  Files.readAllBytes(path)

})

  }

}

The Class-not-found exceptions:
=
Spark assembly has been built with Hive, including Datanucleus jars on
classpath
Using Spark's default log4j profile:
org/apache/spark/log4j-defaults.properties
15/02/23 17:35:03 WARN Utils: Your hostname, ubuntu resolves to a loopback
address: 127.0.1.1; using 192.168.241.128 instead (on interface eth0)
15/02/23 17:35:03 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to
another address
15/02/23 17:35:04 INFO SecurityManager: Changing view acls to: rick
15/02/23 17:35:04 INFO SecurityManager: Changing modify acls to: rick
15/02/23 17:35:04 INFO SecurityManager: SecurityManager: authentication
disabled; ui acls disabled; users with view permissions: Set(rick); users
with modify permissions: Set(rick)
15/02/23 17:35:04 INFO Slf4jLogger: Slf4jLogger started
15/02/23 17:35:04 INFO Remoting: Starting remoting
15/02/23 17:35:04 INFO Remoting: Remoting started; listening on addresses
:[akka.tcp://sparkDriver@192.168.241.128:38110]
15/02/23 17:35:04 INFO Utils: Successfully started service 'sparkDriver' on
port 38110.
15/02/23 17:35:04 INFO SparkEnv: Registering MapOutputTracker
15/02/23 17:35:04 INFO SparkEnv: Registering BlockManagerMaster
15/02/23 17:35:04 INFO DiskBlockManager: Created local directory at
/tmp/spark-local-20150223173504-b26c
15/02/23 17:35:04 INFO MemoryStore: MemoryStore started with capacity 267.3
MB
15/02/23 17:35:05 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
15/02/23 17:35:05 INFO HttpFileServer: HTTP File server directory is
/tmp/spark-c77dbc9a-d626-4991-a9b7-f593acafbe64
15/02/23 17:35:05 INFO HttpServer: Starting HTTP Server
15/02/23 17:35:05 INFO Utils: Successfully started service 'HTTP file
server' on port 50950.
15/02/23 17:35:05 WARN Utils: Service 'SparkUI' could not bind on port
4040. Attempting port 4041.
15/02/23 17:35:05 WARN Utils: Service 'SparkUI' could not bind on port
4041. Attempting port 4042.

Re: Re: About FlumeUtils.createStream

2015-02-23 Thread bit1...@163.com
Thanks both of you guys on this!



bit1...@163.com
 
From: Akhil Das
Date: 2015-02-24 12:58
To: Tathagata Das
CC: user; bit1129
Subject: Re: About FlumeUtils.createStream
I see, thanks for the clarification TD.
On 24 Feb 2015 09:56, "Tathagata Das"  wrote:
Akhil, that is incorrect. 

Spark will list on the given port for Flume to push data into it. 
When in local mode, it will listen on localhost:
When in some kind of cluster, instead of localhost you will have to give the 
hostname of the cluster node where you want Flume to forward the data. Spark 
will launch the Flume receiver on that node (assuming the hostname matching is 
correct), and list on port , for receiving data from Flume. So only the 
configured machine will listen on port . 

I suggest trying the other stream. FlumeUtils.createPollingStream. More details 
here. 
http://spark.apache.org/docs/latest/streaming-flume-integration.html



On Sat, Feb 21, 2015 at 12:17 AM, Akhil Das  wrote:
Spark won't listen on  mate, It basically means you have a flume source 
running at port  of your localhost. And when you submit your application in 
standalone mode, workers will consume date from that port.

Thanks
Best Regards

On Sat, Feb 21, 2015 at 9:22 AM, bit1...@163.com  wrote:

Hi,
In the spark streaming application, I write the code, 
FlumeUtils.createStream(ssc,"localhost",),which means spark will listen on 
the  port, and wait for Flume Sink to write to it.
My question is:  when I submit the application to the Spark Standalone cluster, 
will  be opened only on the Driver Machine or all the workers will also 
open the  port and wait for the Flume data? 








Re: About FlumeUtils.createStream

2015-02-23 Thread Akhil Das
I see, thanks for the clarification TD.
On 24 Feb 2015 09:56, "Tathagata Das"  wrote:

> Akhil, that is incorrect.
>
> Spark will list on the given port for Flume to push data into it.
> When in local mode, it will listen on localhost:
> When in some kind of cluster, instead of localhost you will have to give
> the hostname of the cluster node where you want Flume to forward the data.
> Spark will launch the Flume receiver on that node (assuming the hostname
> matching is correct), and list on port , for receiving data from Flume.
> So only the configured machine will listen on port .
>
> I suggest trying the other stream. FlumeUtils.createPollingStream. More
> details here.
> http://spark.apache.org/docs/latest/streaming-flume-integration.html
>
>
>
> On Sat, Feb 21, 2015 at 12:17 AM, Akhil Das 
> wrote:
>
>> Spark won't listen on  mate, It basically means you have a flume
>> source running at port  of your localhost. And when you submit your
>> application in standalone mode, workers will consume date from that port.
>>
>> Thanks
>> Best Regards
>>
>> On Sat, Feb 21, 2015 at 9:22 AM, bit1...@163.com  wrote:
>>
>>>
>>> Hi,
>>> In the spark streaming application, I write the code, 
>>> FlumeUtils.createStream(ssc,"localhost",),which
>>> means spark will listen on the  port, and wait for Flume Sink to write
>>> to it.
>>> My question is:  when I submit the application to the Spark Standalone
>>> cluster, will  be opened only on the Driver Machine or all the workers
>>> will also open the  port and wait for the Flume data?
>>>
>>> --
>>>
>>>
>>
>


Re: About FlumeUtils.createStream

2015-02-23 Thread Tathagata Das
Akhil, that is incorrect.

Spark will list on the given port for Flume to push data into it.
When in local mode, it will listen on localhost:
When in some kind of cluster, instead of localhost you will have to give
the hostname of the cluster node where you want Flume to forward the data.
Spark will launch the Flume receiver on that node (assuming the hostname
matching is correct), and list on port , for receiving data from Flume.
So only the configured machine will listen on port .

I suggest trying the other stream. FlumeUtils.createPollingStream. More
details here.
http://spark.apache.org/docs/latest/streaming-flume-integration.html



On Sat, Feb 21, 2015 at 12:17 AM, Akhil Das 
wrote:

> Spark won't listen on  mate, It basically means you have a flume
> source running at port  of your localhost. And when you submit your
> application in standalone mode, workers will consume date from that port.
>
> Thanks
> Best Regards
>
> On Sat, Feb 21, 2015 at 9:22 AM, bit1...@163.com  wrote:
>
>>
>> Hi,
>> In the spark streaming application, I write the code, 
>> FlumeUtils.createStream(ssc,"localhost",),which
>> means spark will listen on the  port, and wait for Flume Sink to write
>> to it.
>> My question is:  when I submit the application to the Spark Standalone
>> cluster, will  be opened only on the Driver Machine or all the workers
>> will also open the  port and wait for the Flume data?
>>
>> --
>>
>>
>


Re: Re: Re_ Re_ Does Spark Streaming depend on Hadoop_(4)

2015-02-23 Thread bit1...@163.com
Thanks Tathagata! You are right, I have packaged the contents of the spark 
shipped example jar into my jarwhich contains serveral HDFS configuration 
files like hdfs-default.xml etc. Thanks!



bit1...@163.com
 
From: Tathagata Das
Date: 2015-02-24 12:04
To: bit1...@163.com
CC: yuzhihong; silvio.fiorito; user
Subject: Re: Re_ Re_ Does Spark Streaming depend on Hadoop_(4)
You could have a hdfs configuration files in the classpath of the program. HDFS 
libraries that Spark uses automatically picks those up and starts using them.

TD

On Mon, Feb 23, 2015 at 7:47 PM, bit1...@163.com  wrote:
I am crazy for frequent mail rejection so I create a new thread SMTP error, 
DOT: 552 spam score (5.7) exceeded threshold 
(FREEMAIL_ENVFROM_END_DIGIT,FREEMAIL_REPLY,HTML_FONT_FACE_BAD,HTML_MESSAGE,RCVD_IN_BL_SPAMCOP_NET,SPF_PASS


Hi Silvio and Ted
I know there is a configuration parameter to control to write log to HDFS, but 
I didn't enable it.
From the stack trace, looks like accessing HDFS is triggered in my code, but I 
didn't use HDFS, following is my code:

object MyKafkaWordCount { 
def main(args: Array[String]) { 
println("Start to run MyKafkaWordCount") 
val conf = new 
SparkConf().setAppName("MyKafkaWordCount").setMaster("local[20]") 
val ssc = new StreamingContext(conf, Seconds(3)) 
val topicMap = Map("topic-p6-r2"->1) 
val zkQuorum = "localhost:2181"; 
val group = "topic-p6-r2-consumer-group" 

//Kakfa has 6 partitions, here create 6 Receiver 
val streams = (1 to 6).map ( _ => 
KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2) 
) 
//repartition to 18, 3 times of the receiver 
val partitions = ssc.union(streams).repartition(18).map("DataReceived: " + _) 

partitions.print() 
ssc.start() 
ssc.awaitTermination() 
} 
}



bit1...@163.com



Re: Accumulator in SparkUI for streaming

2015-02-23 Thread Tathagata Das
Unless I am unaware some latest changes, the SparkUI shows stages, and
jobs, not accumulator results. And the UI not designed to be pluggable for
showing user-defined stuff.

TD

On Fri, Feb 20, 2015 at 12:25 AM, Tim Smith  wrote:

> On Spark 1.2:
>
> I am trying to capture # records read from a kafka topic:
>
> val inRecords = ssc.sparkContext.accumulator(0, "InRecords")
>
> ..
>
> kInStreams.foreach( k =>
> {
>
>  k.foreachRDD ( rdd =>  inRecords += rdd.count().toInt  )
>  inRecords.value
>
>
> Question is how do I get the accumulator to show up in the UI? I tried
> "inRecords.value" but that didn't help. Pretty sure it isn't showing up in
> Stage metrics.
>
> What's the trick here? collect?
>
> Thanks,
>
> Tim
>
>


Re: Executor size and checkpoints

2015-02-23 Thread Tathagata Das
Hey Yana,

I think you posted screenshots, but they are not visible in the email.
Probably better to upload them and post links.

Are you using StreamingContext.getOrCreate? If that is being used, then it
will recreate the SparkContext with SparkConf having whatever configuration
is present in the existing checkpoint files. It may so happen that the
existing checkpoint files were from an old run which had 512 configured. So
the SparkConf in the restarted SparkContext/StremingContext is accidentally
picking up the old configuration. Deleting the checkpoint files avoided a
restart, and the new config took affect. Maybe. :)

TD

On Sat, Feb 21, 2015 at 7:30 PM, Yana Kadiyska 
wrote:

> Hi all,
>
> I had a streaming application and midway through things decided to up the
> executor memory. I spent a long time launching like this:
>
> ~/spark-1.2.0-bin-cdh4/bin/spark-submit --class StreamingTest
> --executor-memory 2G --master...
>
> and observing the executor memory is still at old 512 setting
>
> I was about to ask if this is a bug when I decided to delete the
> checkpoints. Sure enough the setting took after that.
>
> So my question is -- why is it required to remove checkpoints to increase
> memory allowed on an executor? This seems pretty un-intuitive to me.
>
> Thanks for any insights.
>


Many Receiver vs. Many threads per Receiver

2015-02-23 Thread bit1...@163.com
Hi,
I  am experimenting Spark Streaming and Kafka Integration, To read messages 
from Kafka in parallel, basically there are two ways
1. Create many Receivers like (1 to 6).map(_ => KakfaUtils.createStream). 
2. Specifiy many threads when calling KakfaUtils.createStream like val 
topicMap("myTopic"=>6), this will create one receiver with 6 reading threads.

My question is which option is better, sounds option 2 is better is to me 
because it saves a lot of cores(one Receiver one core), but I learned from 
somewhere else that choice 1 is better, so I would ask and see how you guys 
elaborate on this. Thank



bit1...@163.com


Re: Re_ Re_ Does Spark Streaming depend on Hadoop_(4)

2015-02-23 Thread Tathagata Das
You could have a hdfs configuration files in the classpath of the program.
HDFS libraries that Spark uses automatically picks those up and starts
using them.

TD

On Mon, Feb 23, 2015 at 7:47 PM, bit1...@163.com  wrote:

> I am crazy for frequent mail rejection so I create a new thread
> SMTP error, DOT: 552 spam score (5.7) exceeded threshold 
> (FREEMAIL_ENVFROM_END_DIGIT,FREEMAIL_REPLY,HTML_FONT_FACE_BAD,HTML_MESSAGE,RCVD_IN_BL_SPAMCOP_NET,SPF_PASS
>
>
> Hi Silvio and Ted
> I know there is a configuration parameter to control to write log to HDFS,
> but I didn't enable it.
> From the stack trace, looks like accessing HDFS is triggered in my code,
> but I didn't use HDFS, following is my code:
>
> object MyKafkaWordCount {
> def main(args: Array[String]) {
> println("Start to run MyKafkaWordCount")
> val conf = new
> SparkConf().setAppName("MyKafkaWordCount").setMaster("local[20]")
> val ssc = new StreamingContext(conf, Seconds(3))
> val topicMap = Map("topic-p6-r2"->1)
> val zkQuorum = "localhost:2181";
> val group = "topic-p6-r2-consumer-group"
>
> //Kakfa has 6 partitions, here create 6 Receiver
> val streams = (1 to 6).map ( _ =>
> KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
> )
> //repartition to 18, 3 times of the receiver
> val partitions = ssc.union(streams).repartition(18).map("DataReceived: " +
> _)
>
> partitions.print()
> ssc.start()
> ssc.awaitTermination()
> }
> }
>
> --
> bit1...@163.com
>


Re_ Re_ Does Spark Streaming depend on Hadoop_(4)

2015-02-23 Thread bit1...@163.com
I am crazy for frequent mail rejection so I create a new thread SMTP error, 
DOT: 552 spam score (5.7) exceeded threshold 
(FREEMAIL_ENVFROM_END_DIGIT,FREEMAIL_REPLY,HTML_FONT_FACE_BAD,HTML_MESSAGE,RCVD_IN_BL_SPAMCOP_NET,SPF_PASS


Hi Silvio and Ted
I know there is a configuration parameter to control to write log to HDFS, but 
I didn't enable it.
From the stack trace, looks like accessing HDFS is triggered in my code, but I 
didn't use HDFS, following is my code:

object MyKafkaWordCount { 
def main(args: Array[String]) { 
println("Start to run MyKafkaWordCount") 
val conf = new 
SparkConf().setAppName("MyKafkaWordCount").setMaster("local[20]") 
val ssc = new StreamingContext(conf, Seconds(3)) 
val topicMap = Map("topic-p6-r2"->1) 
val zkQuorum = "localhost:2181"; 
val group = "topic-p6-r2-consumer-group" 

//Kakfa has 6 partitions, here create 6 Receiver 
val streams = (1 to 6).map ( _ => 
KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2) 
) 
//repartition to 18, 3 times of the receiver 
val partitions = ssc.union(streams).repartition(18).map("DataReceived: " + _) 

partitions.print() 
ssc.start() 
ssc.awaitTermination() 
} 
}



bit1...@163.com


Re: FW: Submitting jobs to Spark EC2 cluster remotely

2015-02-23 Thread Franc Carter
Is your laptop behind a NAT ?

I got bitten by a similar issue and (I think) it was because I was behind a
NAT that did not forward the public ip back to my private ip unless the
connection originated from my private ip

cheers

On Tue, Feb 24, 2015 at 5:20 AM, Oleg Shirokikh  wrote:

> Dear Patrick,
>
> Thanks a lot again for your help.
>
> > What happens if you submit from the master node itself on ec2 (in client
> mode), does that work? What about in cluster mode?
>
> If I SSH to the machine with Spark master, then everything works - shell,
> and regular submit in both client and cluster mode (after rsyncing the same
> jar I'm using for remote submission). Below is the output when I deploy in
> cluster mode from master machine itself:
>
> //**//
> root@ip-172-31-34-83 spark]$ ./bin/spark-submit --class SparkPi --master
> spark://ec2-52-10-138-75.us-west-2.compute.amazonaws.com:7077
> --deploy-mode=cluster
> /root/spark/sparktest/target/scala-2.10/ec2test_2.10-0.0.1.jar 100
> Spark assembly has been built with Hive, including Datanucleus jars on
> classpath
> Sending launch command to spark://
> ec2-52-10-138-75.us-west-2.compute.amazonaws.com:7077
> Driver successfully submitted as driver-20150223174819-0008
> ... waiting before polling master for driver state
> ... polling master for driver state
> State of driver-20150223174819-0008 is RUNNING
> Driver running on ip-172-31-33-194.us-west-2.compute.internal:56183
> (worker-20150223171519-ip-172-31-33-194.us-west-2.compute.internal-56183)
> //**//
>
> Observation: when I submit the job from remote host (and all these
> warnings [..initial job has not accepted any resources...] and errors
> [..asked to remove non-existent executor..] start appearing) and leave it
> running, I simultaneously try to submit a job (or run a shell) from an EC2
> node with master itself. In this scenario it starts to produce similar
> warnings (not errors) [..initial job has not accepted any resources...] and
> doesn't execute the job. Probably there are not enough cores devoted to 2
> apps running simulateneously.
>
>
> >It would be helpful if you could print the full command that the executor
> is failing. That might show that spark.driver.host is being set strangely.
> IIRC we print the launch command before starting the executor.
>
> I'd be very happy to provide this command but I'm not sure where to find
> it... When I launch the submit script, I immediately see [WARN
> TaskSchedulerImpl:...]s and [ERROR SparkDeploySchedulerBackend]s in the
> terminal output.
>
> In Master Web UI, I have this application running indefinitely (listed in
> "Running APplications" with State=RUNNING). When I go into this app UI I
> see tons of Executors listed in "Executor Summary" - at each moment two of
> them are RUNNING (I have two workers) and all others EXITED.
>
> Here is stderr from one of the RUNNING ones:
>
> /***/
> 15/02/23 18:11:49 INFO executor.CoarseGrainedExecutorBackend: Registered
> signal handlers for [TERM, HUP, INT]
> 15/02/23 18:11:49 INFO spark.SecurityManager: Changing view acls to:
> root,oleg
> 15/02/23 18:11:49 INFO spark.SecurityManager: Changing modify acls to:
> root,oleg
> 15/02/23 18:11:49 INFO spark.SecurityManager: SecurityManager:
> authentication disabled; ui acls disabled; users with view permissions:
> Set(root, oleg); users with modify permissions: Set(root, oleg)
> 15/02/23 18:11:49 INFO slf4j.Slf4jLogger: Slf4jLogger started
> 15/02/23 18:11:50 INFO Remoting: Starting remoting
> 15/02/23 18:11:50 INFO Remoting: Remoting started; listening on addresses
> :[akka.tcp://driverpropsfetc...@ip-172-31-33-195.us-west-2.compute.internal
> :57681]
> 15/02/23 18:11:50 INFO util.Utils: Successfully started service
> 'driverPropsFetcher' on port 57681.
> /*/
>
> Here is stderr from one of the EXITED ones:
>
> /***/
> 15/02/23 18:10:09 INFO executor.CoarseGrainedExecutorBackend: Registered
> signal handlers for [TERM, HUP, INT]
> 15/02/23 18:10:10 INFO spark.SecurityManager: Changing view acls to:
> root,oleg
> 15/02/23 18:10:10 INFO spark.SecurityManager: Changing modify acls to:
> root,oleg
> 15/02/23 18:10:10 INFO spark.SecurityManager: SecurityManager:
> authentication disabled; ui acls disabled; users with view permissions:
> Set(root, oleg); users with modify permissions: Set(root, oleg)
> 15/02/23 18:10:10 INFO slf4j.Slf4jLogger: Slf4jLogger started
> 15/02/23 18:10:10 INFO Remoting: Starting remoting
> 15/02/23 18:10:10 INFO Remoting: Remoting started; listening on addresses
> :[akka.tcp://driverpropsfetc...@ip-172-31-33-194.us-west-2.compute.internal
> :42607]
> 15/02/23 18:10:10 INFO util.Utils: Successfully started service
> 'driverPropsFetcher' on port 42607.
> 15/02/23 18:10:40 ERROR security.UserGroupInformation:
> PriviledgedActionException as:oleg
> cause:java.util.concurrent.TimeoutException: Futures timed out after [30
> seconds]
> Exception in thread "main" java.lang.reflect.U

Re: spark streaming window operations on a large window size

2015-02-23 Thread Tathagata Das
The default persistence level is MEMORY_AND_DISK, so the LRU policy would
discard the blocks to disk, so the streaming app will not fail. However,
since things will get constantly read in and out of disk as windows are
processed, the performance wont be great. So it is best to have sufficient
memory to keep all the window data in memory.

TD

On Mon, Feb 23, 2015 at 8:26 AM, Shao, Saisai  wrote:

> I don't think current Spark Streaming supports window operations which
> beyond its available memory, internally Spark Streaming puts all the data
> in the memory belongs to the effective window, if the memory is not enough,
> BlockManager will discard the blocks at LRU policy, so something unexpected
> will be occurred.
>
> Thanks
> Jerry
>
> -Original Message-
> From: avilevi3 [mailto:avile...@gmail.com]
> Sent: Monday, February 23, 2015 12:57 AM
> To: user@spark.apache.org
> Subject: spark streaming window operations on a large window size
>
> Hi guys,
>
> does spark streaming supports window operations on a sliding window that
> is data is larger than the available memory?
> we would like to
> currently we are using kafka as input, but we could change that if needed.
>
> thanks
> Avi
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-window-operations-on-a-large-window-size-tp21764.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional
> commands, e-mail: user-h...@spark.apache.org
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Does Spark Streaming depend on Hadoop?

2015-02-23 Thread Ted Yu
This thread could be related:
http://search-hadoop.com/m/JW1q592kqi&subj=Re+spark+shell+working+in+scala+2+11+breaking+change+

On Mon, Feb 23, 2015 at 7:08 PM, Silvio Fiorito <
silvio.fior...@granturing.com> wrote:

>  Looks like your Spark config may be trying to log to an HDFS path. Can
> you review your config settings?
>
>  *From:* bit1...@163.com
> *Sent:* ‎Monday‎, ‎February‎ ‎23‎, ‎2015 ‎9‎:‎54‎ ‎PM
> *To:* yuzhihong 
> *Cc:* user@spark.apache.org
>
>  [hadoop@hadoop bin]$ sh submit.log.streaming.kafka.complicated.sh
> Spark assembly has been built with Hive, including Datanucleus jars on
> classpath
> Start to run MyKafkaWordCount
> Exception in thread "main" java.net.ConnectException: Call From
> hadoop.master/192.168.26.137 to hadoop.master:9000 failed on connection
> exception: java.net.ConnectException: Connection refused; For more details
> see: http://wiki.apache.org/hadoop/ConnectionRefused
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
>
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>
> at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
> at org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:783)
> at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:730)
> at org.apache.hadoop.ipc.Client.call(Client.java:1414)
> at org.apache.hadoop.ipc.Client.call(Client.java:1363)
> at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
>
> at com.sun.proxy.$Proxy18.getFileInfo(Unknown Source)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:190)
>
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:103)
>
> at com.sun.proxy.$Proxy18.getFileInfo(Unknown Source)
> at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:699)
>
> at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:1762)
> at
> org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1124)
>
> at
> org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1120)
>
> at
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>
> at
> org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1120)
>
> at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1398)
> at org.apache.spark.util.FileLogger.createLogDir(FileLogger.scala:123)
> at org.apache.spark.util.FileLogger.start(FileLogger.scala:115)
> at
> org.apache.spark.scheduler.EventLoggingListener.start(EventLoggingListener.scala:74)
>
> at org.apache.spark.SparkContext.(SparkContext.scala:353)
> at
> org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:571)
>
> at
> org.apache.spark.streaming.StreamingContext.(StreamingContext.scala:74)
>
> at
> spark.examples.streaming.MyKafkaWordCount$.main(MyKafkaWordCount.scala:14)
> at spark.examples.streaming.MyKafkaWordCount.main(MyKafkaWordCount.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:606)
> at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> Caused by: java.net.ConnectException: Connection refused
> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
> at
> org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206)
>
> at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:529)
> at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:493)
> at
> org.apache.hadoop.ipc.Client$Connection.setupConnection(Client.java:604)
> at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:699)
> at org.apache.hadoop.ipc.Client$Connection.access$2800(Client.java:367)
> at org.apache.hadoop.ipc.Client.getConnection(Client.java:1462)
> at org.apache.hadoop.ipc.Client.call(Client.java:1381)
> ... 32 more
>
>
>  --
>  bit1...@163.com
>
>
>  *From:* Ted Yu 
> *Date:* 2015-02-24 10:24
> *To:* bit1...@163.com
> *CC:* user 
> *Subject:* Re: Does Spark Stre

Re: Repartition and Worker Instances

2015-02-23 Thread Deep Pradhan
Here, I wanted to ask a different thing though.
Let me put it this way.
What is the relationship between the performance of a Spark Job and the
number of cores in the standalone Spark single node cluster.

Thank You

On Tue, Feb 24, 2015 at 8:39 AM, Deep Pradhan 
wrote:

> You mean SPARK_WORKER_CORES in /conf/spark-env.sh?
>
> On Mon, Feb 23, 2015 at 11:06 PM, Sameer Farooqui 
> wrote:
>
>> In Standalone mode, a Worker JVM starts an Executor. Inside the Exec
>> there are slots for task threads. The slot count is configured by the
>> num_cores setting. Generally over subscribe this. So if you have 10 free
>> CPU cores, set num_cores to 20.
>>
>>
>> On Monday, February 23, 2015, Deep Pradhan 
>> wrote:
>>
>>> How is task slot different from # of Workers?
>>>
>>>
>>> >> so don't read into any performance metrics you've collected to
>>> extrapolate what may happen at scale.
>>> I did not get you in this.
>>>
>>> Thank You
>>>
>>> On Mon, Feb 23, 2015 at 10:52 PM, Sameer Farooqui <
>>> same...@databricks.com> wrote:
>>>
 In general you should first figure out how many task slots are in the
 cluster and then repartition the RDD to maybe 2x that #. So if you have a
 100 slots, then maybe RDDs with partition count of 100-300 would be normal.

 But also size of each partition can matter. You want a task to operate
 on a partition for at least 200ms, but no longer than around 20 seconds.

 Even if you have 100 slots, it could be okay to have a RDD with 10,000
 partitions if you've read in a large file.

 So don't repartition your RDD to match the # of Worker JVMs, but rather
 align it to the total # of task slots in the Executors.

 If you're running on a single node, shuffle operations become almost
 free (because there's no network movement), so don't read into any
 performance metrics you've collected to extrapolate what may happen at
 scale.


 On Monday, February 23, 2015, Deep Pradhan 
 wrote:

> Hi,
> If I repartition my data by a factor equal to the number of worker
> instances, will the performance be better or worse?
> As far as I understand, the performance should be better, but in my
> case it is becoming worse.
> I have a single node standalone cluster, is it because of this?
> Am I guaranteed to have a better performance if I do the same thing in
> a multi-node cluster?
>
> Thank You
>

>>>
>


Re: Does Spark Streaming depend on Hadoop?

2015-02-23 Thread Silvio Fiorito
Looks like your Spark config may be trying to log to an HDFS path. Can you 
review your config settings?

From: bit1...@163.com
Sent: ‎Monday‎, ‎February‎ ‎23‎, ‎2015 ‎9‎:‎54‎ ‎PM
To: yuzhihong
Cc: user@spark.apache.org

[hadoop@hadoop bin]$ sh submit.log.streaming.kafka.complicated.sh
Spark assembly has been built with Hive, including Datanucleus jars on classpath
Start to run MyKafkaWordCount
Exception in thread "main" java.net.ConnectException: Call From 
hadoop.master/192.168.26.137 to hadoop.master:9000 failed on connection 
exception: java.net.ConnectException: Connection refused; For more details see: 
http://wiki.apache.org/hadoop/ConnectionRefused
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:783)
at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:730)
at org.apache.hadoop.ipc.Client.call(Client.java:1414)
at org.apache.hadoop.ipc.Client.call(Client.java:1363)
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
at com.sun.proxy.$Proxy18.getFileInfo(Unknown Source)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:190)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:103)
at com.sun.proxy.$Proxy18.getFileInfo(Unknown Source)
at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:699)
at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:1762)
at 
org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1124)
at 
org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1120)
at 
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at 
org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1120)
at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1398)
at org.apache.spark.util.FileLogger.createLogDir(FileLogger.scala:123)
at org.apache.spark.util.FileLogger.start(FileLogger.scala:115)
at 
org.apache.spark.scheduler.EventLoggingListener.start(EventLoggingListener.scala:74)
at org.apache.spark.SparkContext.(SparkContext.scala:353)
at 
org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:571)
at org.apache.spark.streaming.StreamingContext.(StreamingContext.scala:74)
at spark.examples.streaming.MyKafkaWordCount$.main(MyKafkaWordCount.scala:14)
at spark.examples.streaming.MyKafkaWordCount.main(MyKafkaWordCount.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
at 
org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206)
at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:529)
at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:493)
at org.apache.hadoop.ipc.Client$Connection.setupConnection(Client.java:604)
at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:699)
at org.apache.hadoop.ipc.Client$Connection.access$2800(Client.java:367)
at org.apache.hadoop.ipc.Client.getConnection(Client.java:1462)
at org.apache.hadoop.ipc.Client.call(Client.java:1381)
... 32 more



bit1...@163.com

From: Ted Yu
Date: 2015-02-24 10:24
To: bit1...@163.com
CC: user
Subject: Re: Does Spark Streaming depend on Hadoop?
Can you pastebin the whole stack trace ?

Thanks



On Feb 23, 2015, at 6:14 PM, "bit1...@163.com" 
mailto:bit1...@163.com>> wrote:

Hi,

When I submit a spark streaming application with following script,

./spark-submit --name MyKafkaWor

Re: Repartition and Worker Instances

2015-02-23 Thread Deep Pradhan
You mean SPARK_WORKER_CORES in /conf/spark-env.sh?

On Mon, Feb 23, 2015 at 11:06 PM, Sameer Farooqui 
wrote:

> In Standalone mode, a Worker JVM starts an Executor. Inside the Exec there
> are slots for task threads. The slot count is configured by the num_cores
> setting. Generally over subscribe this. So if you have 10 free CPU cores,
> set num_cores to 20.
>
>
> On Monday, February 23, 2015, Deep Pradhan 
> wrote:
>
>> How is task slot different from # of Workers?
>>
>>
>> >> so don't read into any performance metrics you've collected to
>> extrapolate what may happen at scale.
>> I did not get you in this.
>>
>> Thank You
>>
>> On Mon, Feb 23, 2015 at 10:52 PM, Sameer Farooqui > > wrote:
>>
>>> In general you should first figure out how many task slots are in the
>>> cluster and then repartition the RDD to maybe 2x that #. So if you have a
>>> 100 slots, then maybe RDDs with partition count of 100-300 would be normal.
>>>
>>> But also size of each partition can matter. You want a task to operate
>>> on a partition for at least 200ms, but no longer than around 20 seconds.
>>>
>>> Even if you have 100 slots, it could be okay to have a RDD with 10,000
>>> partitions if you've read in a large file.
>>>
>>> So don't repartition your RDD to match the # of Worker JVMs, but rather
>>> align it to the total # of task slots in the Executors.
>>>
>>> If you're running on a single node, shuffle operations become almost
>>> free (because there's no network movement), so don't read into any
>>> performance metrics you've collected to extrapolate what may happen at
>>> scale.
>>>
>>>
>>> On Monday, February 23, 2015, Deep Pradhan 
>>> wrote:
>>>
 Hi,
 If I repartition my data by a factor equal to the number of worker
 instances, will the performance be better or worse?
 As far as I understand, the performance should be better, but in my
 case it is becoming worse.
 I have a single node standalone cluster, is it because of this?
 Am I guaranteed to have a better performance if I do the same thing in
 a multi-node cluster?

 Thank You

>>>
>>


Re: Re: Does Spark Streaming depend on Hadoop?

2015-02-23 Thread bit1...@163.com
[hadoop@hadoop bin]$ sh submit.log.streaming.kafka.complicated.sh 
Spark assembly has been built with Hive, including Datanucleus jars on 
classpath 
Start to run MyKafkaWordCount 
Exception in thread "main" java.net.ConnectException: Call From 
hadoop.master/192.168.26.137 to hadoop.master:9000 failed on connection 
exception: java.net.ConnectException: Connection refused; For more details see: 
http://wiki.apache.org/hadoop/ConnectionRefused 
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) 
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
 
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
 
at java.lang.reflect.Constructor.newInstance(Constructor.java:526) 
at org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:783) 
at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:730) 
at org.apache.hadoop.ipc.Client.call(Client.java:1414) 
at org.apache.hadoop.ipc.Client.call(Client.java:1363) 
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
 
at com.sun.proxy.$Proxy18.getFileInfo(Unknown Source) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 
at java.lang.reflect.Method.invoke(Method.java:606) 
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:190)
 
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:103)
 
at com.sun.proxy.$Proxy18.getFileInfo(Unknown Source) 
at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:699)
 
at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:1762) 
at 
org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1124)
 
at 
org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1120)
 
at 
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
 
at 
org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1120)
 
at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1398) 
at org.apache.spark.util.FileLogger.createLogDir(FileLogger.scala:123) 
at org.apache.spark.util.FileLogger.start(FileLogger.scala:115) 
at 
org.apache.spark.scheduler.EventLoggingListener.start(EventLoggingListener.scala:74)
 
at org.apache.spark.SparkContext.(SparkContext.scala:353) 
at 
org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:571)
 
at 
org.apache.spark.streaming.StreamingContext.(StreamingContext.scala:74) 
at spark.examples.streaming.MyKafkaWordCount$.main(MyKafkaWordCount.scala:14) 
at spark.examples.streaming.MyKafkaWordCount.main(MyKafkaWordCount.scala) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 
at java.lang.reflect.Method.invoke(Method.java:606) 
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) 
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) 
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 
Caused by: java.net.ConnectException: Connection refused 
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) 
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739) 
at 
org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206) 
at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:529) 
at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:493) 
at org.apache.hadoop.ipc.Client$Connection.setupConnection(Client.java:604) 
at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:699) 
at org.apache.hadoop.ipc.Client$Connection.access$2800(Client.java:367) 
at org.apache.hadoop.ipc.Client.getConnection(Client.java:1462) 
at org.apache.hadoop.ipc.Client.call(Client.java:1381) 
... 32 more




bit1...@163.com
 
From: Ted Yu
Date: 2015-02-24 10:24
To: bit1...@163.com
CC: user
Subject: Re: Does Spark Streaming depend on Hadoop?
Can you pastebin the whole stack trace ?

Thanks



On Feb 23, 2015, at 6:14 PM, "bit1...@163.com"  wrote:

Hi,

When I submit a spark streaming application with following script,

./spark-submit --name MyKafkaWordCount --master local[20] --executor-memory 
512M --total-executor-cores 2 --class spark.examples.streaming.MyKafkaWordCount 
 my.kakfa.wordcountjar

An exception occurs:
Exception in thread "main" java.net.ConnectException: Call From 
hadoop.master/192.168.26.137 to hadoop.master:9000 failed on connection 
exception.

From the exception, it tries to connect to 9000 which is for Hadoop/HDF

Re: Spark-SQL 1.2.0 "sort by" results are not consistent with Hive

2015-02-23 Thread Cheng Lian

(Move to user list.)

Hi Kannan,

You need to set |mapred.map.tasks| to 1 in hive-site.xml. The reason is 
this line of code 
, 
which overrides |spark.default.parallelism|. Also, 
|spark.sql.shuffle.parallelism| isn’t used here since there’s no shuffle 
involved (we only need to sort within a partition).


Default value of |mapred.map.tasks| is 2 
. You may see 
that the Spark SQL result can be divided into two sorted parts from the 
middle.


Cheng

On 2/19/15 10:33 AM, Kannan Rajah wrote:


According to hive documentation, "sort by" is supposed to order the results
for each reducer. So if we set a single reducer, then the results should be
sorted, right? But this is not happening. Any idea why? Looks like the
settings I am using to restrict the number of reducers is not having an
effect.

*Tried the following:*

Set spark.default.parallelism to 1

Set spark.sql.shuffle.partitions to 1

These were set in hive-site.xml and also inside spark shell.


*Spark-SQL*

create table if not exists testSortBy (key int, name string, age int);
LOAD DATA LOCAL INPATH '/home/mapr/sample-name-age.txt' OVERWRITE INTO TABLE
testSortBy;
select * from testSortBY;

1Aditya28
2aash25
3prashanth27
4bharath26
5terry27
6nanda26
7pradeep27
8pratyay26


set spark.default.parallelism=1;

set spark.sql.shuffle.partitions=1;

select name,age from testSortBy sort by age; aash 25 bharath 26 prashanth
27 Aditya 28 nanda 26 pratyay 26 terry 27 pradeep 27 *HIVE* select name,age
from testSortBy sort by age;

aash25
bharath26
nanda26
pratyay26
prashanth27
terry27
pradeep27
Aditya28


--
Kannan


​


Re: Does Spark Streaming depend on Hadoop?

2015-02-23 Thread Ted Yu
Can you pastebin the whole stack trace ?

Thanks



> On Feb 23, 2015, at 6:14 PM, "bit1...@163.com"  wrote:
> 
> Hi,
> 
> When I submit a spark streaming application with following script,
> 
> ./spark-submit --name MyKafkaWordCount --master local[20] --executor-memory 
> 512M --total-executor-cores 2 --class 
> spark.examples.streaming.MyKafkaWordCount  my.kakfa.wordcountjar
> 
> An exception occurs:
> Exception in thread "main" java.net.ConnectException: Call From 
> hadoop.master/192.168.26.137 to hadoop.master:9000 failed on connection 
> exception.
> 
> From the exception, it tries to connect to 9000 which is for Hadoop/HDFS. and 
> I don't use Hadoop at all in my code(such as save to HDFS).
> 
> 
> 
> bit1...@163.com


Re: Missing shuffle files

2015-02-23 Thread Corey Nolet
I *think* this may have been related to the default memory overhead setting
being too low. I raised the value to 1G it and tried my job again but i had
to leave the office before it finished. It did get further but I'm not
exactly sure if that's just because i raised the memory. I'll see tomorrow-
but i have a suspicion this may have been the cause of the executors being
killed by the application master.
On Feb 23, 2015 5:25 PM, "Corey Nolet"  wrote:

> I've got the opposite problem with regards to partitioning. I've got over
> 6000 partitions for some of these RDDs which immediately blows the heap
> somehow- I'm still not exactly sure how. If I coalesce them down to about
> 600-800 partitions, I get the problems where the executors are dying
> without any other error messages (other than telling me the executor was
> lost in the UI). If I don't coalesce, I pretty immediately get Java heap
> space exceptions that kill the job altogether.
>
> Putting in the timeouts didn't seem to help the case where I am
> coalescing. Also, I don't see any dfferences between 'disk only' and
> 'memory and disk' storage levels- both of them are having the same
> problems. I notice large shuffle files (30-40gb) that only seem to spill a
> few hundred mb.
>
> On Mon, Feb 23, 2015 at 4:28 PM, Anders Arpteg  wrote:
>
>> Sounds very similar to what I experienced Corey. Something that seems to
>> at least help with my problems is to have more partitions. Am already
>> fighting between ending up with too many partitions in the end and having
>> too few in the beginning. By coalescing at late as possible and avoiding
>> too few in the beginning, the problems seems to decrease. Also, increasing
>> spark.akka.askTimeout and spark.core.connection.ack.wait.timeout
>> significantly (~700 secs), the problems seems to almost disappear. Don't
>> wont to celebrate yet, still long way left before the job complete but it's
>> looking better...
>>
>> On Mon, Feb 23, 2015 at 9:54 PM, Corey Nolet  wrote:
>>
>>> I'm looking @ my yarn container logs for some of the executors which
>>> appear to be failing (with the missing shuffle files). I see exceptions
>>> that say "client.TransportClientFactor: Found inactive connection to
>>> host/ip:port, closing it."
>>>
>>> Right after that I see "shuffle.RetryingBlockFetcher: Exception while
>>> beginning fetch of 1 outstanding blocks. java.io.IOException: Failed to
>>> connect to host/ip:port"
>>>
>>> Right after that exception I see "RECEIVED SIGNAL 15: SIGTERM"
>>>
>>> Finally, following the sigterm, I see "FileNotFoundExcception:
>>> /hdfs/01/yarn/nm/usercache../spark-local-uuid/shuffle_5_09_0.data (No
>>> such file for directory)"
>>>
>>> I'm looking @ the nodemanager and application master logs and I see no
>>> indications whatsoever that there were any memory issues during this period
>>> of time. The Spark UI is telling me none of the executors are really using
>>> too much memory when this happens. It is a big job that's catching several
>>> 100's of GB but each node manager on the cluster has 64gb of ram just for
>>> yarn containers (physical nodes have 128gb). On this cluster, we have 128
>>> nodes. I've also tried using DISK_ONLY storage level but to no avail.
>>>
>>> Any further ideas on how to track this down? Again, we're able to run
>>> this same job on about 1/5th of the data just fine.The only thing that's
>>> pointing me towards a memory issue is that it seems to be happening in the
>>> same stages each time and when I lower the memory that each executor has
>>> allocated it happens in earlier stages but I can't seem to find anything
>>> that says an executor (or container for that matter) has run low on memory.
>>>
>>>
>>>
>>> On Mon, Feb 23, 2015 at 9:24 AM, Anders Arpteg 
>>> wrote:
>>>
 No, unfortunately we're not making use of dynamic allocation or the
 external shuffle service. Hoping that we could reconfigure our cluster to
 make use of it, but since it requires changes to the cluster itself (and
 not just the Spark app), it could take some time.

 Unsure if task 450 was acting as a reducer or not, but seems possible.
 Probably due to a crashed executor as you say. Seems like I need to do some
 more advanced partition tuning to make this job work, as it's currently
 rather high number of partitions.

 Thanks for the help so far! It's certainly a frustrating task to debug
 when everything's working perfectly on sample data locally and crashes hard
 when running on the full dataset on the cluster...

 On Sun, Feb 22, 2015 at 9:27 AM, Sameer Farooqui <
 same...@databricks.com> wrote:

> Do you guys have dynamic allocation turned on for YARN?
>
> Anders, was Task 450 in your job acting like a Reducer and fetching
> the Map spill output data from a different node?
>
> If a Reducer task can't read the remote data it needs, that could
> cause the stage to fail. Sometimes this forces the pr

Does Spark Streaming depend on Hadoop?

2015-02-23 Thread bit1...@163.com
Hi,

When I submit a spark streaming application with following script,

./spark-submit --name MyKafkaWordCount --master local[20] --executor-memory 
512M --total-executor-cores 2 --class spark.examples.streaming.MyKafkaWordCount 
 my.kakfa.wordcountjar

An exception occurs:
Exception in thread "main" java.net.ConnectException: Call From 
hadoop.master/192.168.26.137 to hadoop.master:9000 failed on connection 
exception.

From the exception, it tries to connect to 9000 which is for Hadoop/HDFS. and I 
don't use Hadoop at all in my code(such as save to HDFS).





bit1...@163.com


Re: Spark SQL Where IN support

2015-02-23 Thread Michael Armbrust
Yes.

On Mon, Feb 23, 2015 at 1:45 AM, Paolo Platter 
wrote:

>  I was speaking about 1.2 version of spark
>
>  Paolo
>
>   *Da:* Paolo Platter 
> *Data invio:* ‎lunedì‎ ‎23‎ ‎febbraio‎ ‎2015 ‎10‎:‎41
> *A:* user@spark.apache.org
>
>   Hi guys,
>
>  Is the “IN” operator supported in Spark SQL over Hive Metastore ?
>
>  Thanks
>
>
>  Paolo
>
>


Re: [Spark SQL]: Convert SchemaRDD back to RDD

2015-02-23 Thread Michael Armbrust
This is not currently supported.  Right now you can only get RDD[Row] as
Ted suggested.

On Sun, Feb 22, 2015 at 2:52 PM, Ted Yu  wrote:

> Haven't found the method in
> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.SchemaRDD
>
> The new DataFrame has this method:
>
>   /**
>* Returns the content of the [[DataFrame]] as an [[RDD]] of [[Row]]s.
>* @group rdd
>*/
>   def rdd: RDD[Row] = {
>
> FYI
>
> On Sun, Feb 22, 2015 at 11:51 AM, stephane.collot <
> stephane.col...@gmail.com> wrote:
>
>> Hi Michael,
>>
>> I think that the feature (convert a SchemaRDD to a structured class RDD)
>> is
>> now available. But I didn't understand in the PR how exactly to do this.
>> Can
>> you give an example or doc links?
>>
>> Best regards
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Convert-SchemaRDD-back-to-RDD-tp9071p21753.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Why must the dstream.foreachRDD(...) parameter be serializable?

2015-02-23 Thread Tobias Pfeiffer
Hi,

On Tue, Feb 24, 2015 at 4:34 AM, Tathagata Das 
wrote:

> There are different kinds of checkpointing going on. updateStateByKey
> requires RDD checkpointing which can be enabled only by called
> sparkContext.setCheckpointDirectory. But that does not enable Spark
> Streaming driver checkpoints, which is necessary for recovering from driver
> failures. That is enabled only by streamingContext.checkpoint(...) which
> internally calls sparkContext.setCheckpointDirectory and also enables other
> stuff.
>

I see, thank you very much! I'm happy to see I will not have to rewrite the
entire application :-)

Tobias


Performance Instrumentation for Spark Jobs

2015-02-23 Thread Neil Ferguson
Hi all

I wanted to share some details about something I've been working on with
the folks on the ADAM project: performance instrumentation for Spark jobs.

We've added a module to the bdg-utils project (
https://github.com/bigdatagenomics/bdg-utils) to enable Spark users to
instrument RDD operations, and their associated function calls (as well as
arbitrary Scala function calls).

See the "Instrumentation" section in the documentation for more details.
There's some reasonably detailed documentation there.

To get started, just add the bdg-utils-metrics artifact to your project.
For example, in Maven:


org.bdgenomics.bdg-utils
bdg-utils-metrics
0.1.2


Hope this is useful, and let me know if you have any questions.

Neil


Re: Movie Recommendation tutorial

2015-02-23 Thread Krishna Sankar
   1. The RSME varies a little bit between the versions.
   2. Partitioned the training,validation,test set like so:
   - training = ratings_rdd_01.filter(lambda x: (x[3] % 10) < 6)
  - validation = ratings_rdd_01.filter(lambda x: (x[3] % 10) >= 6 and
  (x[3] % 10) < 8)
  - test = ratings_rdd_01.filter(lambda x: (x[3] % 10) >= 8)
  - Validation MSE :
  -
 - # 1.3.0 Mean Squared Error = 0.871456869392
 - # 1.2.1 Mean Squared Error = 0.877305629074
  3. Itertools results:
  - 1.3.0 - RSME = 1.354839 (rank = 8 and lambda = 1.0, and numIter =
  20)
  - 1.1.1 - RSME = 1.335831 (rank = 8 and lambda = 1.0, and numIter =
  10)

Cheers


On Mon, Feb 23, 2015 at 12:37 PM, Xiangrui Meng  wrote:

> Which Spark version did you use? Btw, there are three datasets from
> MovieLens. The tutorial used the medium one (1 million). -Xiangrui
>
> On Mon, Feb 23, 2015 at 8:36 AM, poiuytrez 
> wrote:
> > What do you mean?
> >
> >
> >
> > --
> > View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Movie-Recommendation-tutorial-tp21769p21771.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > -
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > For additional commands, e-mail: user-h...@spark.apache.org
> >
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Union and reduceByKey will trigger shuffle even same partition?

2015-02-23 Thread Imran Rashid
I think you're getting tripped up lazy evaluation and the way stage
boundaries work (admittedly its pretty confusing in this case).

It is true that up until recently, if you unioned two RDDs with the same
partitioner, the result did not have the same partitioner.  But that was
just fixed here:
https://github.com/apache/spark/pull/4629

That does mean that after you update ranks, it will no longer have a
partitioner, which will effect the join on your second iteration here:
 val contributions = links.join(ranks).flatMap

But, I think most of the shuffles you are pointing to are a different
issue.  I may be belaboring something you already know, but I think this is
easily confusing.  I think
 the first thing is understanding where you get stage boundaries, and how
they are named.  Each shuffle introduces a stage boundary.  However, the
stages get named by
the last thing in a stage, which is not really what is always causing the
shuffle.  Eg., reduceByKey() causes a shuffle, but we don't see that in a
stage name.  Similarly, map()
does not cause a shuffle, but we see a stage with that name.

So, what do the stage boundaries we see actually correspond to?

1) map -- that is doing the shuffle write for the following groupByKey
2) groupByKey -- in addition to reading the shuffle output from your map,
this is *also* doing the shuffle write for the next shuffle you introduce
w/ partitionBy
3) union -- this is doing the shuffle reading from your partitionBy, and
then all the work from there right up until the shuffle write for what is
immediatley after union -- your
 reduceByKey.
4) lookup is an action, which is why that has another stage.

a couple of things to note:
(a) your join does not cause a shuffle, b/c both rdds share a partitioner
(b) you have two shuffles from groupByKey followed by partitionBy -- you
really probably want the 1 arg form of groupByKey(partitioner)


hopefully this is helpful to understand how your stages & shuffles
correspond to your code.

Imran



On Mon, Feb 23, 2015 at 3:35 PM, Shuai Zheng  wrote:

> This also trigger an interesting question:  how can I do this locally by
> code if I want. For example: I have RDD A and B, which has some partition,
> then if I want to join A to B, I might just want to do a mapper side join
> (although B itself might be big, but B’s local partition is known small
> enough put in memory), how can I access other RDD’s local partition in the 
> mapParitition
> method? Is it anyway to do this in Spark?
>
>
>
> *From:* Shao, Saisai [mailto:saisai.s...@intel.com]
> *Sent:* Monday, February 23, 2015 3:13 PM
> *To:* Shuai Zheng
> *Cc:* user@spark.apache.org
> *Subject:* RE: Union and reduceByKey will trigger shuffle even same
> partition?
>
>
>
> If you call reduceByKey(), internally Spark will introduce a shuffle
> operations, not matter the data is already partitioned locally, Spark
> itself do not know the data is already well partitioned.
>
>
>
> So if you want to avoid Shuffle, you have  to write the code explicitly to
> avoid this, from my understanding. You can call mapParitition to get a
> partition of data and reduce by key locally by your logic.
>
>
>
> Thanks
>
> Saisai
>
>
>
> *From:* Shuai Zheng [mailto:szheng.c...@gmail.com ]
>
> *Sent:* Monday, February 23, 2015 12:00 PM
> *To:* user@spark.apache.org
> *Subject:* Union and reduceByKey will trigger shuffle even same partition?
>
>
>
> Hi All,
>
>
>
> I am running a simple page rank program, but it is slow. And I dig out
> part of reason is there is shuffle happen when I call an union action even
> both RDD share the same partition:
>
>
>
> Below is my test code in spark shell:
>
>
>
> import org.apache.spark.HashPartitioner
>
>
>
> sc.getConf.set("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer")
>
> val beta = 0.8
>
> val numOfPartition = 6
>
>   val links =
> sc.textFile("c:/Download/web-Google.txt").filter(!_.contains("#")).map(line=>{val
> part=line.split("\t");
> (part(0).toInt,part(1).toInt)}).groupByKey.partitionBy(new
> HashPartitioner(numOfPartition)).persist
>
>   var ranks = links.mapValues(_ => 1.0)
>
>   var leakedMatrix = links.mapValues(_ => (1.0-beta)).persist
>
>
>
>   for (i <- 1 until 2) {
>
> val contributions = links.join(ranks).flatMap {
>
>   case (pageId, (links, rank)) =>
>
> links.map(dest => (dest, rank / links.size * beta))
>
> }
>
> *ranks = contributions.union(leakedMatrix).reduceByKey(_ + _)*
>
>   }
>
>   ranks.lookup(1)
>
>
>
> In above code, links will join ranks and should preserve the partition,
> and leakedMatrix also share the same partition, so I expect there is no
> shuffle happen on the contributions.union(leakedMatrix), also on the coming
> reduceByKey after that. But finally there is shuffle write for all steps,
> map, groupByKey, Union, partitionBy, etc.
>
>
>
> I expect there should only happen once on the shuffle then all should
> local operation, but the screen shows not, do I have any misunderstanding
> here?

Re: Pyspark save Decison Tree Module with joblib/pickle

2015-02-23 Thread Sebastián Ramírez
In your log it says:

pickle.PicklingError: Can't pickle : it's not found as
thread.lock

As far as I know, you can't pickle Spark models. If you go to the
documentation for Pickle you can see that you can pickle only simple Python
structures and code (written in Python), at least as I understand:
https://docs.python.org/2/library/pickle.html#what-can-be-pickled-and-unpickled

To save a model you can call: model.toDebugString()


That gives you a string in pseudo-code that you can save to a file. Then,
you can parse that pseudo code to write a proper script that runs the
Decision Tree. Actually, that's what I did for a Random Forest (an ensamble
of Decision Trees).

Hope that helps,


*Sebastián Ramírez*
Diseñador de Algoritmos

 

 Tel: (+571) 795 7950 ext: 1012
 Cel: (+57) 300 370 77 10
 Calle 73 No 7 - 06  Piso 4
 Linkedin: co.linkedin.com/in/tiangolo/
 Twitter: @tiangolo 
 Email: sebastian.rami...@senseta.com
 www.senseta.com

On Mon, Feb 23, 2015 at 4:55 AM, Jaggu  wrote:

> Hi Team,
> I was trying to save a DecisionTree model from Pyspark using joblib.
> It is giving me the following error http://pastebin.com/82CFhPNn . Any
> clue
> how to resolve the same or save a model.
>
> Best regards
>
> Jagan
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Pyspark-save-Decison-Tree-Module-with-joblib-pickle-tp21765.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>

-- 
**
*This e-mail transmission, including any attachments, is intended only for 
the named recipient(s) and may contain information that is privileged, 
confidential and/or exempt from disclosure under applicable law. If you 
have received this transmission in error, or are not the named 
recipient(s), please notify Senseta immediately by return e-mail and 
permanently delete this transmission, including any attachments.*


Re: Spark configuration

2015-02-23 Thread Shlomi Babluki
I guess you downloaded the source code.

You can build it with the following command:

mvn -DskipTests clean package

Or just download a compiled version.

Shlomi


> On 24 בפבר׳ 2015, at 00:40, King sami  wrote:
> 
> Hi Experts,
> I am new in Spark, so I want manipulate it locally on my machine with Ubuntu 
> as OS.
> I dowloaded the last version of Spark.
> I ran this command to start it : ./sbin/start-master.sh
> but an error is occured : 
> 
> starting org.apache.spark.deploy.master.Master, logging to 
> /home/spuser/Bureau/spark-1.2.1/sbin/../logs/spark-spuser-org.apache.spark.deploy.master.Master-1-spuser-HP-620.out
> failed to launch org.apache.spark.deploy.master.Master:
>   You need to build Spark before running this program.
>   
> full log in 
> /home/spuser/Bureau/spark-1.2.1/sbin/../logs/spark-spuser-org.apache.spark.deploy.master.Master-1-spuser-HP-620.out
> 
> Should do a configuration to before starting Spark ?
> 
> Many thanks for help


Re: Spark configuration

2015-02-23 Thread Sean Owen
It sounds like you downloaded the source distribution perhaps, but
have not built it. That's what the message is telling you. See
http://spark.apache.org/docs/latest/building-spark.html

Or maybe you intended to get a binary distribution.

On Mon, Feb 23, 2015 at 10:40 PM, King sami  wrote:
> Hi Experts,
> I am new in Spark, so I want manipulate it locally on my machine with Ubuntu
> as OS.
> I dowloaded the last version of Spark.
> I ran this command to start it : ./sbin/start-master.sh
> but an error is occured :
>
> starting org.apache.spark.deploy.master.Master, logging to
> /home/spuser/Bureau/spark-1.2.1/sbin/../logs/spark-spuser-org.apache.spark.deploy.master.Master-1-spuser-HP-620.out
> failed to launch org.apache.spark.deploy.master.Master:
>   You need to build Spark before running this program.
>
> full log in
> /home/spuser/Bureau/spark-1.2.1/sbin/../logs/spark-spuser-org.apache.spark.deploy.master.Master-1-spuser-HP-620.out
>
> Should do a configuration to before starting Spark ?
>
> Many thanks for help

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark configuration

2015-02-23 Thread King sami
Hi Experts,
I am new in Spark, so I want manipulate it locally on my machine with
Ubuntu as OS.
I dowloaded the last version of Spark.
I ran this command to start it : ./sbin/start-master.sh
but an error is occured :






*starting org.apache.spark.deploy.master.Master, logging to
/home/spuser/Bureau/spark-1.2.1/sbin/../logs/spark-spuser-org.apache.spark.deploy.master.Master-1-spuser-HP-620.outfailed
to launch org.apache.spark.deploy.master.Master:  You need to build Spark
before running this program.  full log in
/home/spuser/Bureau/spark-1.2.1/sbin/../logs/spark-spuser-org.apache.spark.deploy.master.Master-1-spuser-HP-620.out*

Should do a configuration to before starting Spark ?

Many thanks for help


Re: Missing shuffle files

2015-02-23 Thread Corey Nolet
I've got the opposite problem with regards to partitioning. I've got over
6000 partitions for some of these RDDs which immediately blows the heap
somehow- I'm still not exactly sure how. If I coalesce them down to about
600-800 partitions, I get the problems where the executors are dying
without any other error messages (other than telling me the executor was
lost in the UI). If I don't coalesce, I pretty immediately get Java heap
space exceptions that kill the job altogether.

Putting in the timeouts didn't seem to help the case where I am coalescing.
Also, I don't see any dfferences between 'disk only' and 'memory and disk'
storage levels- both of them are having the same problems. I notice large
shuffle files (30-40gb) that only seem to spill a few hundred mb.

On Mon, Feb 23, 2015 at 4:28 PM, Anders Arpteg  wrote:

> Sounds very similar to what I experienced Corey. Something that seems to
> at least help with my problems is to have more partitions. Am already
> fighting between ending up with too many partitions in the end and having
> too few in the beginning. By coalescing at late as possible and avoiding
> too few in the beginning, the problems seems to decrease. Also, increasing
> spark.akka.askTimeout and spark.core.connection.ack.wait.timeout
> significantly (~700 secs), the problems seems to almost disappear. Don't
> wont to celebrate yet, still long way left before the job complete but it's
> looking better...
>
> On Mon, Feb 23, 2015 at 9:54 PM, Corey Nolet  wrote:
>
>> I'm looking @ my yarn container logs for some of the executors which
>> appear to be failing (with the missing shuffle files). I see exceptions
>> that say "client.TransportClientFactor: Found inactive connection to
>> host/ip:port, closing it."
>>
>> Right after that I see "shuffle.RetryingBlockFetcher: Exception while
>> beginning fetch of 1 outstanding blocks. java.io.IOException: Failed to
>> connect to host/ip:port"
>>
>> Right after that exception I see "RECEIVED SIGNAL 15: SIGTERM"
>>
>> Finally, following the sigterm, I see "FileNotFoundExcception:
>> /hdfs/01/yarn/nm/usercache../spark-local-uuid/shuffle_5_09_0.data (No
>> such file for directory)"
>>
>> I'm looking @ the nodemanager and application master logs and I see no
>> indications whatsoever that there were any memory issues during this period
>> of time. The Spark UI is telling me none of the executors are really using
>> too much memory when this happens. It is a big job that's catching several
>> 100's of GB but each node manager on the cluster has 64gb of ram just for
>> yarn containers (physical nodes have 128gb). On this cluster, we have 128
>> nodes. I've also tried using DISK_ONLY storage level but to no avail.
>>
>> Any further ideas on how to track this down? Again, we're able to run
>> this same job on about 1/5th of the data just fine.The only thing that's
>> pointing me towards a memory issue is that it seems to be happening in the
>> same stages each time and when I lower the memory that each executor has
>> allocated it happens in earlier stages but I can't seem to find anything
>> that says an executor (or container for that matter) has run low on memory.
>>
>>
>>
>> On Mon, Feb 23, 2015 at 9:24 AM, Anders Arpteg 
>> wrote:
>>
>>> No, unfortunately we're not making use of dynamic allocation or the
>>> external shuffle service. Hoping that we could reconfigure our cluster to
>>> make use of it, but since it requires changes to the cluster itself (and
>>> not just the Spark app), it could take some time.
>>>
>>> Unsure if task 450 was acting as a reducer or not, but seems possible.
>>> Probably due to a crashed executor as you say. Seems like I need to do some
>>> more advanced partition tuning to make this job work, as it's currently
>>> rather high number of partitions.
>>>
>>> Thanks for the help so far! It's certainly a frustrating task to debug
>>> when everything's working perfectly on sample data locally and crashes hard
>>> when running on the full dataset on the cluster...
>>>
>>> On Sun, Feb 22, 2015 at 9:27 AM, Sameer Farooqui >> > wrote:
>>>
 Do you guys have dynamic allocation turned on for YARN?

 Anders, was Task 450 in your job acting like a Reducer and fetching the
 Map spill output data from a different node?

 If a Reducer task can't read the remote data it needs, that could cause
 the stage to fail. Sometimes this forces the previous stage to also be
 re-computed if it's a wide dependency.

 But like Petar said, if you turn the external shuffle service on, YARN
 NodeManager process on the slave machines will serve out the map spill
 data, instead of the Executor JVMs (by default unless you turn external
 shuffle on, the Executor JVM itself serves out the shuffle data which
 causes problems if an Executor dies).

 Core, how often are Executors crashing in your app? How many Executors
 do you have total? And what is the memory size for each? You can change
>>>

RE: Union and reduceByKey will trigger shuffle even same partition?

2015-02-23 Thread Shao, Saisai
I've no context of this book, AFAIK union will not trigger shuffle, as they 
just put the partitions together, the operator reduceByKey() will actually 
trigger shuffle.

Thanks
Jerry

From: Shuai Zheng [mailto:szheng.c...@gmail.com]
Sent: Monday, February 23, 2015 12:26 PM
To: Shao, Saisai
Cc: user@spark.apache.org
Subject: RE: Union and reduceByKey will trigger shuffle even same partition?

In the book of learning spark:

[cid:image002.jpg@01D04F74.28C9F870]

So here it means only no shuffle happen crossing network but still will do 
shuffle locally? Even it is the case, why union will trigger shuffle? I think 
union will only just append the RDD together.

From: Shao, Saisai [mailto:saisai.s...@intel.com]
Sent: Monday, February 23, 2015 3:13 PM
To: Shuai Zheng
Cc: user@spark.apache.org
Subject: RE: Union and reduceByKey will trigger shuffle even same partition?

If you call reduceByKey(), internally Spark will introduce a shuffle 
operations, not matter the data is already partitioned locally, Spark itself do 
not know the data is already well partitioned.

So if you want to avoid Shuffle, you have  to write the code explicitly to 
avoid this, from my understanding. You can call mapParitition to get a 
partition of data and reduce by key locally by your logic.

Thanks
Saisai

From: Shuai Zheng [mailto:szheng.c...@gmail.com]
Sent: Monday, February 23, 2015 12:00 PM
To: user@spark.apache.org
Subject: Union and reduceByKey will trigger shuffle even same partition?

Hi All,

I am running a simple page rank program, but it is slow. And I dig out part of 
reason is there is shuffle happen when I call an union action even both RDD 
share the same partition:

Below is my test code in spark shell:

import org.apache.spark.HashPartitioner

sc.getConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val beta = 0.8
val numOfPartition = 6
  val links = 
sc.textFile("c:/Download/web-Google.txt").filter(!_.contains("#")).map(line=>{val
 part=line.split("\t"); 
(part(0).toInt,part(1).toInt)}).groupByKey.partitionBy(new 
HashPartitioner(numOfPartition)).persist
  var ranks = links.mapValues(_ => 1.0)
  var leakedMatrix = links.mapValues(_ => (1.0-beta)).persist

  for (i <- 1 until 2) {
val contributions = links.join(ranks).flatMap {
  case (pageId, (links, rank)) =>
links.map(dest => (dest, rank / links.size * beta))
}
ranks = contributions.union(leakedMatrix).reduceByKey(_ + _)
  }
  ranks.lookup(1)

In above code, links will join ranks and should preserve the partition, and 
leakedMatrix also share the same partition, so I expect there is no shuffle 
happen on the contributions.union(leakedMatrix), also on the coming reduceByKey 
after that. But finally there is shuffle write for all steps, map, groupByKey, 
Union, partitionBy, etc.

I expect there should only happen once on the shuffle then all should local 
operation, but the screen shows not, do I have any misunderstanding here?

[cid:image003.png@01D04F74.28C9F870]


RE: Union and reduceByKey will trigger shuffle even same partition?

2015-02-23 Thread Shao, Saisai
I think some RDD APIs like zipPartitions or others can do this as you wanted. I 
might check the docs.

Thanks
Jerry

From: Shuai Zheng [mailto:szheng.c...@gmail.com]
Sent: Monday, February 23, 2015 1:35 PM
To: Shao, Saisai
Cc: user@spark.apache.org
Subject: RE: Union and reduceByKey will trigger shuffle even same partition?

This also trigger an interesting question:  how can I do this locally by code 
if I want. For example: I have RDD A and B, which has some partition, then if I 
want to join A to B, I might just want to do a mapper side join (although B 
itself might be big, but B's local partition is known small enough put in 
memory), how can I access other RDD's local partition in the mapParitition 
method? Is it anyway to do this in Spark?

From: Shao, Saisai [mailto:saisai.s...@intel.com]
Sent: Monday, February 23, 2015 3:13 PM
To: Shuai Zheng
Cc: user@spark.apache.org
Subject: RE: Union and reduceByKey will trigger shuffle even same partition?

If you call reduceByKey(), internally Spark will introduce a shuffle 
operations, not matter the data is already partitioned locally, Spark itself do 
not know the data is already well partitioned.

So if you want to avoid Shuffle, you have  to write the code explicitly to 
avoid this, from my understanding. You can call mapParitition to get a 
partition of data and reduce by key locally by your logic.

Thanks
Saisai

From: Shuai Zheng [mailto:szheng.c...@gmail.com]
Sent: Monday, February 23, 2015 12:00 PM
To: user@spark.apache.org
Subject: Union and reduceByKey will trigger shuffle even same partition?

Hi All,

I am running a simple page rank program, but it is slow. And I dig out part of 
reason is there is shuffle happen when I call an union action even both RDD 
share the same partition:

Below is my test code in spark shell:

import org.apache.spark.HashPartitioner

sc.getConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val beta = 0.8
val numOfPartition = 6
  val links = 
sc.textFile("c:/Download/web-Google.txt").filter(!_.contains("#")).map(line=>{val
 part=line.split("\t"); 
(part(0).toInt,part(1).toInt)}).groupByKey.partitionBy(new 
HashPartitioner(numOfPartition)).persist
  var ranks = links.mapValues(_ => 1.0)
  var leakedMatrix = links.mapValues(_ => (1.0-beta)).persist

  for (i <- 1 until 2) {
val contributions = links.join(ranks).flatMap {
  case (pageId, (links, rank)) =>
links.map(dest => (dest, rank / links.size * beta))
}
ranks = contributions.union(leakedMatrix).reduceByKey(_ + _)
  }
  ranks.lookup(1)

In above code, links will join ranks and should preserve the partition, and 
leakedMatrix also share the same partition, so I expect there is no shuffle 
happen on the contributions.union(leakedMatrix), also on the coming reduceByKey 
after that. But finally there is shuffle write for all steps, map, groupByKey, 
Union, partitionBy, etc.

I expect there should only happen once on the shuffle then all should local 
operation, but the screen shows not, do I have any misunderstanding here?

[cid:image001.png@01D04F73.AFB2D330]


RE: Union and reduceByKey will trigger shuffle even same partition?

2015-02-23 Thread Shuai Zheng
This also trigger an interesting question:  how can I do this locally by
code if I want. For example: I have RDD A and B, which has some partition,
then if I want to join A to B, I might just want to do a mapper side join
(although B itself might be big, but B's local partition is known small
enough put in memory), how can I access other RDD's local partition in the
mapParitition method? Is it anyway to do this in Spark?

 

From: Shao, Saisai [mailto:saisai.s...@intel.com] 
Sent: Monday, February 23, 2015 3:13 PM
To: Shuai Zheng
Cc: user@spark.apache.org
Subject: RE: Union and reduceByKey will trigger shuffle even same partition?

 

If you call reduceByKey(), internally Spark will introduce a shuffle
operations, not matter the data is already partitioned locally, Spark itself
do not know the data is already well partitioned.

 

So if you want to avoid Shuffle, you have  to write the code explicitly to
avoid this, from my understanding. You can call mapParitition to get a
partition of data and reduce by key locally by your logic.

 

Thanks

Saisai

 

From: Shuai Zheng [mailto:szheng.c...@gmail.com] 
Sent: Monday, February 23, 2015 12:00 PM
To: user@spark.apache.org
Subject: Union and reduceByKey will trigger shuffle even same partition?

 

Hi All,

 

I am running a simple page rank program, but it is slow. And I dig out part
of reason is there is shuffle happen when I call an union action even both
RDD share the same partition:

 

Below is my test code in spark shell:

 

import org.apache.spark.HashPartitioner

 

sc.getConf.set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")

val beta = 0.8

val numOfPartition = 6

  val links =
sc.textFile("c:/Download/web-Google.txt").filter(!_.contains("#")).map(line=
>{val part=line.split("\t");
(part(0).toInt,part(1).toInt)}).groupByKey.partitionBy(new
HashPartitioner(numOfPartition)).persist

  var ranks = links.mapValues(_ => 1.0)

  var leakedMatrix = links.mapValues(_ => (1.0-beta)).persist

 

  for (i <- 1 until 2) {

val contributions = links.join(ranks).flatMap {

  case (pageId, (links, rank)) =>

links.map(dest => (dest, rank / links.size * beta))

}

ranks = contributions.union(leakedMatrix).reduceByKey(_ + _)

  }  

  ranks.lookup(1)

 

In above code, links will join ranks and should preserve the partition, and
leakedMatrix also share the same partition, so I expect there is no shuffle
happen on the contributions.union(leakedMatrix), also on the coming
reduceByKey after that. But finally there is shuffle write for all steps,
map, groupByKey, Union, partitionBy, etc.

 

I expect there should only happen once on the shuffle then all should local
operation, but the screen shows not, do I have any misunderstanding here?

 





Re: Missing shuffle files

2015-02-23 Thread Anders Arpteg
Sounds very similar to what I experienced Corey. Something that seems to at
least help with my problems is to have more partitions. Am already fighting
between ending up with too many partitions in the end and having too few in
the beginning. By coalescing at late as possible and avoiding too few in
the beginning, the problems seems to decrease. Also, increasing
spark.akka.askTimeout and spark.core.connection.ack.wait.timeout
significantly (~700 secs), the problems seems to almost disappear. Don't
wont to celebrate yet, still long way left before the job complete but it's
looking better...

On Mon, Feb 23, 2015 at 9:54 PM, Corey Nolet  wrote:

> I'm looking @ my yarn container logs for some of the executors which
> appear to be failing (with the missing shuffle files). I see exceptions
> that say "client.TransportClientFactor: Found inactive connection to
> host/ip:port, closing it."
>
> Right after that I see "shuffle.RetryingBlockFetcher: Exception while
> beginning fetch of 1 outstanding blocks. java.io.IOException: Failed to
> connect to host/ip:port"
>
> Right after that exception I see "RECEIVED SIGNAL 15: SIGTERM"
>
> Finally, following the sigterm, I see "FileNotFoundExcception:
> /hdfs/01/yarn/nm/usercache../spark-local-uuid/shuffle_5_09_0.data (No
> such file for directory)"
>
> I'm looking @ the nodemanager and application master logs and I see no
> indications whatsoever that there were any memory issues during this period
> of time. The Spark UI is telling me none of the executors are really using
> too much memory when this happens. It is a big job that's catching several
> 100's of GB but each node manager on the cluster has 64gb of ram just for
> yarn containers (physical nodes have 128gb). On this cluster, we have 128
> nodes. I've also tried using DISK_ONLY storage level but to no avail.
>
> Any further ideas on how to track this down? Again, we're able to run this
> same job on about 1/5th of the data just fine.The only thing that's
> pointing me towards a memory issue is that it seems to be happening in the
> same stages each time and when I lower the memory that each executor has
> allocated it happens in earlier stages but I can't seem to find anything
> that says an executor (or container for that matter) has run low on memory.
>
>
>
> On Mon, Feb 23, 2015 at 9:24 AM, Anders Arpteg  wrote:
>
>> No, unfortunately we're not making use of dynamic allocation or the
>> external shuffle service. Hoping that we could reconfigure our cluster to
>> make use of it, but since it requires changes to the cluster itself (and
>> not just the Spark app), it could take some time.
>>
>> Unsure if task 450 was acting as a reducer or not, but seems possible.
>> Probably due to a crashed executor as you say. Seems like I need to do some
>> more advanced partition tuning to make this job work, as it's currently
>> rather high number of partitions.
>>
>> Thanks for the help so far! It's certainly a frustrating task to debug
>> when everything's working perfectly on sample data locally and crashes hard
>> when running on the full dataset on the cluster...
>>
>> On Sun, Feb 22, 2015 at 9:27 AM, Sameer Farooqui 
>> wrote:
>>
>>> Do you guys have dynamic allocation turned on for YARN?
>>>
>>> Anders, was Task 450 in your job acting like a Reducer and fetching the
>>> Map spill output data from a different node?
>>>
>>> If a Reducer task can't read the remote data it needs, that could cause
>>> the stage to fail. Sometimes this forces the previous stage to also be
>>> re-computed if it's a wide dependency.
>>>
>>> But like Petar said, if you turn the external shuffle service on, YARN
>>> NodeManager process on the slave machines will serve out the map spill
>>> data, instead of the Executor JVMs (by default unless you turn external
>>> shuffle on, the Executor JVM itself serves out the shuffle data which
>>> causes problems if an Executor dies).
>>>
>>> Core, how often are Executors crashing in your app? How many Executors
>>> do you have total? And what is the memory size for each? You can change
>>> what fraction of the Executor heap will be used for your user code vs the
>>> shuffle vs RDD caching with the spark.storage.memoryFraction setting.
>>>
>>> On Sat, Feb 21, 2015 at 2:58 PM, Petar Zecevic 
>>> wrote:
>>>

 Could you try to turn on the external shuffle service?

 spark.shuffle.service.enable = true


 On 21.2.2015. 17:50, Corey Nolet wrote:

 I'm experiencing the same issue. Upon closer inspection I'm noticing
 that executors are being lost as well. Thing is, I can't figure out how
 they are dying. I'm using MEMORY_AND_DISK_SER and i've got over 1.3TB of
 memory allocated for the application. I was thinking perhaps it was
 possible that a single executor was getting a single or a couple large
 partitions but shouldn't the disk persistence kick in at that point?

 On Sat, Feb 21, 2015 at 11:20 AM, Anders Arpteg 
 wrote:

>

Re: Which OutputCommitter to use for S3?

2015-02-23 Thread Darin McBeath
Just to close the loop in case anyone runs into the same problem I had.

By setting --hadoop-major-version=2 when using the ec2 scripts, everything 
worked fine.

Darin.


- Original Message -
From: Darin McBeath 
To: Mingyu Kim ; Aaron Davidson 
Cc: "user@spark.apache.org" 
Sent: Monday, February 23, 2015 3:16 PM
Subject: Re: Which OutputCommitter to use for S3?

Thanks.  I think my problem might actually be the other way around.

I'm compiling with hadoop 2,  but when I startup Spark, using the ec2 scripts, 
I don't specify a 
-hadoop-major-version and the default is 1.   I'm guessing that if I make that 
a 2 that it might work correctly.  I'll try it and post a response.


- Original Message -
From: Mingyu Kim 
To: Darin McBeath ; Aaron Davidson 
Cc: "user@spark.apache.org" 
Sent: Monday, February 23, 2015 3:06 PM
Subject: Re: Which OutputCommitter to use for S3?

Cool, we will start from there. Thanks Aaron and Josh!

Darin, it¹s likely because the DirectOutputCommitter is compiled with
Hadoop 1 classes and you¹re running it with Hadoop 2.
org.apache.hadoop.mapred.JobContext used to be a class in Hadoop 1, and it
became an interface in Hadoop 2.

Mingyu





On 2/23/15, 11:52 AM, "Darin McBeath"  wrote:

>Aaron.  Thanks for the class. Since I'm currently writing Java based
>Spark applications, I tried converting your class to Java (it seemed
>pretty straightforward).
>
>I set up the use of the class as follows:
>
>SparkConf conf = new SparkConf()
>.set("spark.hadoop.mapred.output.committer.class",
>"com.elsevier.common.DirectOutputCommitter");
>
>And I then try and save a file to S3 (which I believe should use the old
>hadoop apis).
>
>JavaPairRDD newBaselineRDDWritable =
>reducedhsfPairRDD.mapToPair(new ConvertToWritableTypes());
>newBaselineRDDWritable.saveAsHadoopFile(baselineOutputBucketFile,
>Text.class, Text.class, SequenceFileOutputFormat.class,
>org.apache.hadoop.io.compress.GzipCodec.class);
>
>But, I get the following error message.
>
>Exception in thread "main" java.lang.IncompatibleClassChangeError: Found
>class org.apache.hadoop.mapred.JobContext, but interface was expected
>at 
>com.elsevier.common.DirectOutputCommitter.commitJob(DirectOutputCommitter.
>java:68)
>at 
>org.apache.spark.SparkHadoopWriter.commitJob(SparkHadoopWriter.scala:127)
>at 
>org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions
>.scala:1075)
>at 
>org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.sc
>ala:940)
>at 
>org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.sc
>ala:902)
>at 
>org.apache.spark.api.java.JavaPairRDD.saveAsHadoopFile(JavaPairRDD.scala:7
>71)
>at com.elsevier.spark.SparkSyncDedup.main(SparkSyncDedup.java:156)
>
>In my class, JobContext is an interface of  type
>org.apache.hadoop.mapred.JobContext.
>
>Is there something obvious that I might be doing wrong (or messed up in
>the translation from Scala to Java) or something I should look into?  I'm
>using Spark 1.2 with hadoop 2.4.
>
>
>Thanks.
>
>Darin.
>
>
>
>
>
>From: Aaron Davidson 
>To: Andrew Ash 
>Cc: Josh Rosen ; Mingyu Kim ;
>"user@spark.apache.org" ; Aaron Davidson
>
>Sent: Saturday, February 21, 2015 7:01 PM
>Subject: Re: Which OutputCommitter to use for S3?
>
>
>
>Here is the class:
>https://urldefense.proofpoint.com/v2/url?u=https-3A__gist.github.com_aaron
>dav_c513916e72101bbe14ec&d=AwIFaQ&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6o
>Onmz8&r=ennQJq47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84&m=_2YAVrYZtQmuKZRf6sFs
>zOvl_-ZnxmkBPHo1K24TfGE&s=cwSCPKlJO-BJcz4UcGck3xOE2N-4V3eoNvgtFCdMLP8&e=
>
>You can use it by setting "mapred.output.committer.class" in the Hadoop
>configuration (or "spark.hadoop.mapred.output.committer.class" in the
>Spark configuration). Note that this only works for the old Hadoop APIs,
>I believe the new Hadoop APIs strongly tie committer to input format (so
>FileInputFormat always uses FileOutputCommitter), which makes this fix
>more difficult to apply.
>
>
>
>
>On Sat, Feb 21, 2015 at 12:12 PM, Andrew Ash  wrote:
>
>Josh is that class something you guys would consider open sourcing, or
>would you rather the community step up and create an OutputCommitter
>implementation optimized for S3?
>>
>>
>>On Fri, Feb 20, 2015 at 4:02 PM, Josh Rosen  wrote:
>>
>>We (Databricks) use our own DirectOutputCommitter implementation, which
>>is a couple tens of lines of Scala code.  The class would almost
>>entirely be a no-op except we took some care to properly handle the
>>_SUCCESS file.
>>>
>>>
>>>On Fri, Feb 20, 2015 at 3:52 PM, Mingyu Kim  wrote:
>>>
>>>I didn¹t get any response. It¹d be really appreciated if anyone using a
>>>special OutputCommitter for S3 can comment on this!


Thanks,
Mingyu


From: Mingyu Kim 
Date: Monday, February 16, 2015 at 1:15 AM
To: "user@spark.apache.org" 
Subject: Which OutputCommitter to use for S3?



HI all,


The default OutputCom

Re: Missing shuffle files

2015-02-23 Thread Corey Nolet
I'm looking @ my yarn container logs for some of the executors which appear
to be failing (with the missing shuffle files). I see exceptions that say
"client.TransportClientFactor: Found inactive connection to host/ip:port,
closing it."

Right after that I see "shuffle.RetryingBlockFetcher: Exception while
beginning fetch of 1 outstanding blocks. java.io.IOException: Failed to
connect to host/ip:port"

Right after that exception I see "RECEIVED SIGNAL 15: SIGTERM"

Finally, following the sigterm, I see "FileNotFoundExcception:
/hdfs/01/yarn/nm/usercache../spark-local-uuid/shuffle_5_09_0.data (No
such file for directory)"

I'm looking @ the nodemanager and application master logs and I see no
indications whatsoever that there were any memory issues during this period
of time. The Spark UI is telling me none of the executors are really using
too much memory when this happens. It is a big job that's catching several
100's of GB but each node manager on the cluster has 64gb of ram just for
yarn containers (physical nodes have 128gb). On this cluster, we have 128
nodes. I've also tried using DISK_ONLY storage level but to no avail.

Any further ideas on how to track this down? Again, we're able to run this
same job on about 1/5th of the data just fine.The only thing that's
pointing me towards a memory issue is that it seems to be happening in the
same stages each time and when I lower the memory that each executor has
allocated it happens in earlier stages but I can't seem to find anything
that says an executor (or container for that matter) has run low on memory.



On Mon, Feb 23, 2015 at 9:24 AM, Anders Arpteg  wrote:

> No, unfortunately we're not making use of dynamic allocation or the
> external shuffle service. Hoping that we could reconfigure our cluster to
> make use of it, but since it requires changes to the cluster itself (and
> not just the Spark app), it could take some time.
>
> Unsure if task 450 was acting as a reducer or not, but seems possible.
> Probably due to a crashed executor as you say. Seems like I need to do some
> more advanced partition tuning to make this job work, as it's currently
> rather high number of partitions.
>
> Thanks for the help so far! It's certainly a frustrating task to debug
> when everything's working perfectly on sample data locally and crashes hard
> when running on the full dataset on the cluster...
>
> On Sun, Feb 22, 2015 at 9:27 AM, Sameer Farooqui 
> wrote:
>
>> Do you guys have dynamic allocation turned on for YARN?
>>
>> Anders, was Task 450 in your job acting like a Reducer and fetching the
>> Map spill output data from a different node?
>>
>> If a Reducer task can't read the remote data it needs, that could cause
>> the stage to fail. Sometimes this forces the previous stage to also be
>> re-computed if it's a wide dependency.
>>
>> But like Petar said, if you turn the external shuffle service on, YARN
>> NodeManager process on the slave machines will serve out the map spill
>> data, instead of the Executor JVMs (by default unless you turn external
>> shuffle on, the Executor JVM itself serves out the shuffle data which
>> causes problems if an Executor dies).
>>
>> Core, how often are Executors crashing in your app? How many Executors do
>> you have total? And what is the memory size for each? You can change what
>> fraction of the Executor heap will be used for your user code vs the
>> shuffle vs RDD caching with the spark.storage.memoryFraction setting.
>>
>> On Sat, Feb 21, 2015 at 2:58 PM, Petar Zecevic 
>> wrote:
>>
>>>
>>> Could you try to turn on the external shuffle service?
>>>
>>> spark.shuffle.service.enable = true
>>>
>>>
>>> On 21.2.2015. 17:50, Corey Nolet wrote:
>>>
>>> I'm experiencing the same issue. Upon closer inspection I'm noticing
>>> that executors are being lost as well. Thing is, I can't figure out how
>>> they are dying. I'm using MEMORY_AND_DISK_SER and i've got over 1.3TB of
>>> memory allocated for the application. I was thinking perhaps it was
>>> possible that a single executor was getting a single or a couple large
>>> partitions but shouldn't the disk persistence kick in at that point?
>>>
>>> On Sat, Feb 21, 2015 at 11:20 AM, Anders Arpteg 
>>> wrote:
>>>
 For large jobs, the following error message is shown that seems to
 indicate that shuffle files for some reason are missing. It's a rather
 large job with many partitions. If the data size is reduced, the problem
 disappears. I'm running a build from Spark master post 1.2 (build at
 2015-01-16) and running on Yarn 2.2. Any idea of how to resolve this
 problem?

  User class threw exception: Job aborted due to stage failure: Task 450
 in stage 450.1 failed 4 times, most recent failure: Lost task 450.3 in
 stage 450.1 (TID 167370, lon4-hadoopslave-b77.lon4.spotify.net):
 java.io.FileNotFoundException:
 /disk/hd06/yarn/local/usercache/arpteg/appcache/application_1424333823218_21217/spark-local-20150221154811-998c/03/rd

On app upgrade, restore sliding window data.

2015-02-23 Thread Matus Faro
Hi,

Our application is being designed to operate at all times on a large
sliding window (day+) of data. The operations performed on the window
of data will change fairly frequently and I need a way to save and
restore the sliding window after an app upgrade without having to wait
the duration of the sliding window to "warm up". Because it's an app
upgrade, checkpointing will not work unfortunately.

I can potentially dump the window to an outside storage periodically
or on app shutdown, but I don't have an ideal way of restoring it.

I thought about two non-ideal solutions:
1. Load the previous data all at once into the sliding window on app
startup. The problem is, at one point I will have double the data in
the sliding window until the initial batch of data goes out of scope.
2. Broadcast the previous state of the window separately from the
window. Perform the operations on both sets of data until it comes out
of scope. The problem is, the data will not fit into memory.

Solutions that would solve my problem:
1. Ability to pre-populate sliding window.
2. Have control over batch slicing. It would be nice for a Receiver to
dictate the current batch timestamp in order to slow down or fast
forward time.

Any feedback would be greatly appreciated!

Thank you,
Matus

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Movie Recommendation tutorial

2015-02-23 Thread Xiangrui Meng
Which Spark version did you use? Btw, there are three datasets from
MovieLens. The tutorial used the medium one (1 million). -Xiangrui

On Mon, Feb 23, 2015 at 8:36 AM, poiuytrez  wrote:
> What do you mean?
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Movie-Recommendation-tutorial-tp21769p21771.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Need some help to create user defined type for ML pipeline

2015-02-23 Thread Xiangrui Meng
Yes, we are going to expose the developer API. There was a long
discussion in the PR: https://github.com/apache/spark/pull/3637. So we
marked them package private and look for feedback on how to improve
it. Please implement your classes under `spark.ml` for now and let us
know your feedback. Thanks! -Xiangrui

On Mon, Feb 23, 2015 at 8:10 AM, Jaonary Rabarisoa  wrote:
> Hi Joseph,
>
> Thank you for you feedback. I've managed to define an image type by
> following VectorUDT implementation.
>
> I have another question about the definition of a user defined transformer.
> The unary tranfromer is private to spark ml. Do you plan
> to give a developer api for transformers ?
>
>
>
> On Sun, Jan 25, 2015 at 2:26 AM, Joseph Bradley 
> wrote:
>>
>> Hi Jao,
>>
>> You're right that defining serialize and deserialize is the main task in
>> implementing a UDT.  They are basically translating between your native
>> representation (ByteImage) and SQL DataTypes.  The sqlType you defined looks
>> correct, and you're correct to use a row of length 4.  Other than that, it
>> should just require copying data to and from SQL Rows.  There are quite a
>> few examples of that in the codebase; I'd recommend searching based on the
>> particular DataTypes you're using.
>>
>> Are there particular issues you're running into?
>>
>> Joseph
>>
>> On Mon, Jan 19, 2015 at 12:59 AM, Jaonary Rabarisoa 
>> wrote:
>>>
>>> Hi all,
>>>
>>> I'm trying to implement a pipeline for computer vision based on the
>>> latest ML package in spark. The first step of my pipeline is to decode image
>>> (jpeg for instance) stored in a parquet file.
>>> For this, I begin to create a UserDefinedType that represents a decoded
>>> image stored in a array of byte. Here is my first attempt :
>>>
>>>
>>> @SQLUserDefinedType(udt = classOf[ByteImageUDT])
>>> class ByteImage(channels: Int, width: Int, height: Int, data:
>>> Array[Byte])
>>>
>>>
>>> private[spark] class ByteImageUDT extends UserDefinedType[ByteImage] {
>>>
>>>   override def sqlType: StructType = {
>>> // type: 0 = sparse, 1 = dense
>>> // We only use "values" for dense vectors, and "size", "indices", and
>>> "values" for sparse
>>> // vectors. The "values" field is nullable because we might want to
>>> add binary vectors later,
>>> // which uses "size" and "indices", but not "values".
>>> StructType(Seq(
>>>   StructField("channels", IntegerType, nullable = false),
>>>   StructField("width", IntegerType, nullable = false),
>>>   StructField("height", IntegerType, nullable = false),
>>>   StructField("data", BinaryType, nullable = false)
>>>   }
>>>
>>>   override def serialize(obj: Any): Row = {
>>>
>>> val row = new GenericMutableRow(4)
>>> val img = obj.asInstanceOf[ByteImage]
>>>
>>>
>>> ...
>>>   }
>>>
>>>   override def deserialize(datum: Any): Vector = {
>>>
>>>
>>> 
>>>
>>>
>>> }
>>>   }
>>>
>>>   override def pyUDT: String = "pyspark.mllib.linalg.VectorUDT"
>>>
>>>   override def userClass: Class[Vector] = classOf[Vector]
>>> }
>>>
>>>
>>> I take the VectorUDT as a starting point but there's a lot of thing that
>>> I don't really understand. So any help on defining serialize and deserialize
>>> methods will be appreciated.
>>>
>>> Best Regards,
>>>
>>> Jao
>>
>>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: shuffle data taking immense disk space during ALS

2015-02-23 Thread Xiangrui Meng
Did you try to use less number of partitions (user/product blocks)?
Did you use implicit feedback? In the current implementation, we only
do checkpointing with implicit feedback. We should adopt the
checkpoint strategy implemented in LDA:
https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/impl/PeriodicGraphCheckpointer.scala
for ALS. Could you try the latest branch-1.3 or master and see whether
it helps? -Xiangrui

On Mon, Feb 23, 2015 at 6:21 AM, Antony Mayi
 wrote:
> Hi,
>
> This has already been briefly discussed here in the past but there seems to
> be more questions...
>
> I am running bigger ALS task with input data ~40GB (~3 billions of ratings).
> The data is partitioned into 512 partitions and I am also using default
> parallelism set to 512. The ALS runs with rank=100, iters=15. Using spark
> 1.2.0.
>
> The issue is the volume of temporal data stored on disks generated during
> the processing. You can see the effect here:
> http://picpaste.com/disk-UKGFOlte.png It stores 12TB!!! of data until it
> reaches the 90% threshold when yarn kills it.
>
> I have checkpoint directory set so allegedly it should be clearing the temp
> data but not sure that's happening (although there is 1 drop seen).
>
> Is there any solution for this? 12TB of temp not getting cleaned seems to be
> wrong.
>
> Thanks,
> Antony.
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Efficient way of scoring all items and users in an ALS model

2015-02-23 Thread Xiangrui Meng
You can use rdd.cartesian then find top-k by key to distribute the
work to executors. There is a trick to boost the performance: you need
to blockify user/product features and then use native matrix-matrix
multiplication. There is a relevant PR from Deb:
https://github.com/apache/spark/pull/3098 . -Xiangrui

On Mon, Feb 23, 2015 at 4:53 AM, Erlend Hamnaberg  wrote:
> Hi.
>
> We are using the ALS model, and would like to get all users and items
> scored.
>
> currently we have these methods.
>
> https://gist.github.com/hamnis/e396854f4654bd46ebe0
>
> We want to be able to distribute the calculations to the slaves so we dont
> have to do this on the master.
>
> Is there an efficient and distributed way of doing this?
>
>
> I suppose we could collect all items in the product features and send that
> into a broadcast, but that needs all items on the master, and we want to
> avoid that.
>
> Regards
>
> Erlend

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Union and reduceByKey will trigger shuffle even same partition?

2015-02-23 Thread Shuai Zheng
In the book of learning spark:

 



 

So here it means only no shuffle happen crossing network but still will do
shuffle locally? Even it is the case, why union will trigger shuffle? I
think union will only just append the RDD together.

 

From: Shao, Saisai [mailto:saisai.s...@intel.com] 
Sent: Monday, February 23, 2015 3:13 PM
To: Shuai Zheng
Cc: user@spark.apache.org
Subject: RE: Union and reduceByKey will trigger shuffle even same partition?

 

If you call reduceByKey(), internally Spark will introduce a shuffle
operations, not matter the data is already partitioned locally, Spark itself
do not know the data is already well partitioned.

 

So if you want to avoid Shuffle, you have  to write the code explicitly to
avoid this, from my understanding. You can call mapParitition to get a
partition of data and reduce by key locally by your logic.

 

Thanks

Saisai

 

From: Shuai Zheng [mailto:szheng.c...@gmail.com] 
Sent: Monday, February 23, 2015 12:00 PM
To: user@spark.apache.org
Subject: Union and reduceByKey will trigger shuffle even same partition?

 

Hi All,

 

I am running a simple page rank program, but it is slow. And I dig out part
of reason is there is shuffle happen when I call an union action even both
RDD share the same partition:

 

Below is my test code in spark shell:

 

import org.apache.spark.HashPartitioner

 

sc.getConf.set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")

val beta = 0.8

val numOfPartition = 6

  val links =
sc.textFile("c:/Download/web-Google.txt").filter(!_.contains("#")).map(line=
>{val part=line.split("\t");
(part(0).toInt,part(1).toInt)}).groupByKey.partitionBy(new
HashPartitioner(numOfPartition)).persist

  var ranks = links.mapValues(_ => 1.0)

  var leakedMatrix = links.mapValues(_ => (1.0-beta)).persist

 

  for (i <- 1 until 2) {

val contributions = links.join(ranks).flatMap {

  case (pageId, (links, rank)) =>

links.map(dest => (dest, rank / links.size * beta))

}

ranks = contributions.union(leakedMatrix).reduceByKey(_ + _)

  }  

  ranks.lookup(1)

 

In above code, links will join ranks and should preserve the partition, and
leakedMatrix also share the same partition, so I expect there is no shuffle
happen on the contributions.union(leakedMatrix), also on the coming
reduceByKey after that. But finally there is shuffle write for all steps,
map, groupByKey, Union, partitionBy, etc.

 

I expect there should only happen once on the shuffle then all should local
operation, but the screen shows not, do I have any misunderstanding here?

 





Re: Spark Performance on Yarn

2015-02-23 Thread Lee Bierman
Thanks for the suggestions.

I removed the  "persist" call from program. Doing so I started it with:

spark-submit --class com.xxx.analytics.spark.AnalyticsJob --master yarn
/tmp/analytics.jar --input_directory hdfs://ip:8020/flume/events/2015/02/


This takes all the default and only runs 2 executors. This runs with no
failures but takes 17 hours.


After this I tried to run it with

spark-submit --class com.extole.analytics.spark.AnalyticsJob
--num-executors 5 --executor-cores 2 --master yarn /tmp/analytics.jar
--input_directory
hdfs://ip-10-142-198-50.ec2.internal:8020/flume/events/2015/02/

This results in lots of executor failures and restarts and failures. I
can't seem to get any kind of parallelism or throughput. The next try will
be to set the yarn memory overhead.


What other configs should I list to help figure out the sweet spot here.




On Sat, Feb 21, 2015 at 12:29 AM, Davies Liu  wrote:

> How many executors you have per machine? It will be helpful if you
> could list all the configs.
>
> Could you also try to run it without persist? Caching do hurt than
> help, if you don't have enough memory.
>
> On Fri, Feb 20, 2015 at 5:18 PM, Lee Bierman  wrote:
> > Thanks for the suggestions.
> > I'm experimenting with different values for spark memoryOverhead and
> > explictly giving the executors more memory, but still have not found the
> > golden medium to get it to finish in a proper time frame.
> >
> > Is my cluster massively undersized at 5 boxes, 8gb 2cpu ?
> > Trying to figure out a memory setting and executor setting so it runs on
> > many containers in parallel.
> >
> > I'm still struggling as pig jobs and hive jobs on the same whole data set
> > don't take as long. I'm wondering too if the logic in our code is just
> doing
> > something silly causing multiple reads of all the data.
> >
> >
> > On Fri, Feb 20, 2015 at 9:45 AM, Sandy Ryza 
> wrote:
> >>
> >> If that's the error you're hitting, the fix is to boost
> >> spark.yarn.executor.memoryOverhead, which will put some extra room in
> >> between the executor heap sizes and the amount of memory requested for
> them
> >> from YARN.
> >>
> >> -Sandy
> >>
> >> On Fri, Feb 20, 2015 at 9:40 AM, lbierman  wrote:
> >>>
> >>> A bit more context on this issue. From the container logs on the
> executor
> >>>
> >>> Given my cluster specs above what would be appropriate parameters to
> pass
> >>> into :
> >>> --num-executors --num-cores --executor-memory
> >>>
> >>> I had tried it with --executor-memory 2500MB
> >>>
> >>> 015-02-20 06:50:09,056 WARN
> >>>
> >>>
> org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
> >>> Container
> [pid=23320,containerID=container_1423083596644_0238_01_004160]
> >>> is
> >>> running beyond physical memory limits. Current usage: 2.8 GB of 2.7 GB
> >>> physical memory used; 4.4 GB of 5.8 GB virtual memory used. Killing
> >>> container.
> >>> Dump of the process-tree for container_1423083596644_0238_01_004160 :
> >>> |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS)
> >>> SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
> >>> |- 23320 23318 23320 23320 (bash) 0 0 108650496 305 /bin/bash
> -c
> >>> /usr/java/latest/bin/java -server -XX:OnOutOfMemoryError='kill %p'
> >>> -Xms2400m
> >>> -Xmx2400m
> >>>
> >>>
> -Djava.io.tmpdir=/dfs/yarn/nm/usercache/root/appcache/application_1423083596644_0238/container_1423083596644_0238_01_004160/tmp
> >>>
> >>>
> -Dspark.yarn.app.container.log.dir=/var/log/hadoop-yarn/container/application_1423083596644_0238/container_1423083596644_0238_01_004160
> >>> org.apache.spark.executor.CoarseGrainedExecutorBackend
> >>>
> >>> akka.tcp://sparkDriver@ip-10-168-86-13.ec2.internal
> :42535/user/CoarseGrainedScheduler
> >>> 8 ip-10-99-162-56.ec2.internal 1 application_1423083596644_0238 1>
> >>>
> >>>
> /var/log/hadoop-yarn/container/application_1423083596644_0238/container_1423083596644_0238_01_004160/stdout
> >>> 2>
> >>>
> >>>
> /var/log/hadoop-yarn/container/application_1423083596644_0238/container_1423083596644_0238_01_004160/stderr
> >>> |- 23323 23320 23320 23320 (java) 922271 12263 461976
> 724218
> >>> /usr/java/latest/bin/java -server -XX:OnOutOfMemoryError=kill %p
> >>> -Xms2400m
> >>> -Xmx2400m
> >>>
> >>>
> -Djava.io.tmpdir=/dfs/yarn/nm/usercache/root/appcache/application_1423083596644_0238/container_1423083596644_0238_01_004160/tmp
> >>>
> >>>
> -Dspark.yarn.app.container.log.dir=/var/log/hadoop-yarn/container/application_1423083596644_0238/container_1423083596644_0238_01_004160
> >>> org.apache.spark.executor.CoarseGrainedExecutorBackend
> >>> akka.tcp://sparkDriver@ip-10-168-86-13.ec2.internal:42535/user/Coarse
> >>>
> >>>
> >>>
> >>>
> >>> --
> >>> View this message in context:
> >>>
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Performance-on-Yarn-tp21729p21739.html
> >>> Sent from the Apache Spark User List mailing list archive at
> Nabble.com.
> >>>
> >>> --

RE: Union and reduceByKey will trigger shuffle even same partition?

2015-02-23 Thread Shao, Saisai
If you call reduceByKey(), internally Spark will introduce a shuffle 
operations, not matter the data is already partitioned locally, Spark itself do 
not know the data is already well partitioned.

So if you want to avoid Shuffle, you have  to write the code explicitly to 
avoid this, from my understanding. You can call mapParitition to get a 
partition of data and reduce by key locally by your logic.

Thanks
Saisai

From: Shuai Zheng [mailto:szheng.c...@gmail.com]
Sent: Monday, February 23, 2015 12:00 PM
To: user@spark.apache.org
Subject: Union and reduceByKey will trigger shuffle even same partition?

Hi All,

I am running a simple page rank program, but it is slow. And I dig out part of 
reason is there is shuffle happen when I call an union action even both RDD 
share the same partition:

Below is my test code in spark shell:

import org.apache.spark.HashPartitioner

sc.getConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val beta = 0.8
val numOfPartition = 6
  val links = 
sc.textFile("c:/Download/web-Google.txt").filter(!_.contains("#")).map(line=>{val
 part=line.split("\t"); 
(part(0).toInt,part(1).toInt)}).groupByKey.partitionBy(new 
HashPartitioner(numOfPartition)).persist
  var ranks = links.mapValues(_ => 1.0)
  var leakedMatrix = links.mapValues(_ => (1.0-beta)).persist

  for (i <- 1 until 2) {
val contributions = links.join(ranks).flatMap {
  case (pageId, (links, rank)) =>
links.map(dest => (dest, rank / links.size * beta))
}
ranks = contributions.union(leakedMatrix).reduceByKey(_ + _)
  }
  ranks.lookup(1)

In above code, links will join ranks and should preserve the partition, and 
leakedMatrix also share the same partition, so I expect there is no shuffle 
happen on the contributions.union(leakedMatrix), also on the coming reduceByKey 
after that. But finally there is shuffle write for all steps, map, groupByKey, 
Union, partitionBy, etc.

I expect there should only happen once on the shuffle then all should local 
operation, but the screen shows not, do I have any misunderstanding here?

[cid:image001.png@01D04F62.180BCC00]


Re: Which OutputCommitter to use for S3?

2015-02-23 Thread Darin McBeath
Thanks.  I think my problem might actually be the other way around.

I'm compiling with hadoop 2,  but when I startup Spark, using the ec2 scripts, 
I don't specify a 
-hadoop-major-version and the default is 1.   I'm guessing that if I make that 
a 2 that it might work correctly.  I'll try it and post a response.


- Original Message -
From: Mingyu Kim 
To: Darin McBeath ; Aaron Davidson 
Cc: "user@spark.apache.org" 
Sent: Monday, February 23, 2015 3:06 PM
Subject: Re: Which OutputCommitter to use for S3?

Cool, we will start from there. Thanks Aaron and Josh!

Darin, it¹s likely because the DirectOutputCommitter is compiled with
Hadoop 1 classes and you¹re running it with Hadoop 2.
org.apache.hadoop.mapred.JobContext used to be a class in Hadoop 1, and it
became an interface in Hadoop 2.

Mingyu





On 2/23/15, 11:52 AM, "Darin McBeath"  wrote:

>Aaron.  Thanks for the class. Since I'm currently writing Java based
>Spark applications, I tried converting your class to Java (it seemed
>pretty straightforward).
>
>I set up the use of the class as follows:
>
>SparkConf conf = new SparkConf()
>.set("spark.hadoop.mapred.output.committer.class",
>"com.elsevier.common.DirectOutputCommitter");
>
>And I then try and save a file to S3 (which I believe should use the old
>hadoop apis).
>
>JavaPairRDD newBaselineRDDWritable =
>reducedhsfPairRDD.mapToPair(new ConvertToWritableTypes());
>newBaselineRDDWritable.saveAsHadoopFile(baselineOutputBucketFile,
>Text.class, Text.class, SequenceFileOutputFormat.class,
>org.apache.hadoop.io.compress.GzipCodec.class);
>
>But, I get the following error message.
>
>Exception in thread "main" java.lang.IncompatibleClassChangeError: Found
>class org.apache.hadoop.mapred.JobContext, but interface was expected
>at 
>com.elsevier.common.DirectOutputCommitter.commitJob(DirectOutputCommitter.
>java:68)
>at 
>org.apache.spark.SparkHadoopWriter.commitJob(SparkHadoopWriter.scala:127)
>at 
>org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions
>.scala:1075)
>at 
>org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.sc
>ala:940)
>at 
>org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.sc
>ala:902)
>at 
>org.apache.spark.api.java.JavaPairRDD.saveAsHadoopFile(JavaPairRDD.scala:7
>71)
>at com.elsevier.spark.SparkSyncDedup.main(SparkSyncDedup.java:156)
>
>In my class, JobContext is an interface of  type
>org.apache.hadoop.mapred.JobContext.
>
>Is there something obvious that I might be doing wrong (or messed up in
>the translation from Scala to Java) or something I should look into?  I'm
>using Spark 1.2 with hadoop 2.4.
>
>
>Thanks.
>
>Darin.
>
>
>
>
>
>From: Aaron Davidson 
>To: Andrew Ash 
>Cc: Josh Rosen ; Mingyu Kim ;
>"user@spark.apache.org" ; Aaron Davidson
>
>Sent: Saturday, February 21, 2015 7:01 PM
>Subject: Re: Which OutputCommitter to use for S3?
>
>
>
>Here is the class:
>https://urldefense.proofpoint.com/v2/url?u=https-3A__gist.github.com_aaron
>dav_c513916e72101bbe14ec&d=AwIFaQ&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6o
>Onmz8&r=ennQJq47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84&m=_2YAVrYZtQmuKZRf6sFs
>zOvl_-ZnxmkBPHo1K24TfGE&s=cwSCPKlJO-BJcz4UcGck3xOE2N-4V3eoNvgtFCdMLP8&e=
>
>You can use it by setting "mapred.output.committer.class" in the Hadoop
>configuration (or "spark.hadoop.mapred.output.committer.class" in the
>Spark configuration). Note that this only works for the old Hadoop APIs,
>I believe the new Hadoop APIs strongly tie committer to input format (so
>FileInputFormat always uses FileOutputCommitter), which makes this fix
>more difficult to apply.
>
>
>
>
>On Sat, Feb 21, 2015 at 12:12 PM, Andrew Ash  wrote:
>
>Josh is that class something you guys would consider open sourcing, or
>would you rather the community step up and create an OutputCommitter
>implementation optimized for S3?
>>
>>
>>On Fri, Feb 20, 2015 at 4:02 PM, Josh Rosen  wrote:
>>
>>We (Databricks) use our own DirectOutputCommitter implementation, which
>>is a couple tens of lines of Scala code.  The class would almost
>>entirely be a no-op except we took some care to properly handle the
>>_SUCCESS file.
>>>
>>>
>>>On Fri, Feb 20, 2015 at 3:52 PM, Mingyu Kim  wrote:
>>>
>>>I didn¹t get any response. It¹d be really appreciated if anyone using a
>>>special OutputCommitter for S3 can comment on this!


Thanks,
Mingyu


From: Mingyu Kim 
Date: Monday, February 16, 2015 at 1:15 AM
To: "user@spark.apache.org" 
Subject: Which OutputCommitter to use for S3?



HI all,


The default OutputCommitter used by RDD, which is FileOutputCommitter,
seems to require moving files at the commit step, which is not a
constant operation in S3, as discussed in
https://urldefense.proofpoint.com/v2/url?u=http-3A__mail-2Darchives.apa
che.org_mod-5Fmbox_spark-2Duser_201410.mbox_-253C543E33FA.2000802-40ent
ropy.be-253E&d=AwIFaQ&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXB

Re: Which OutputCommitter to use for S3?

2015-02-23 Thread Mingyu Kim
Cool, we will start from there. Thanks Aaron and Josh!

Darin, it¹s likely because the DirectOutputCommitter is compiled with
Hadoop 1 classes and you¹re running it with Hadoop 2.
org.apache.hadoop.mapred.JobContext used to be a class in Hadoop 1, and it
became an interface in Hadoop 2.

Mingyu





On 2/23/15, 11:52 AM, "Darin McBeath"  wrote:

>Aaron.  Thanks for the class. Since I'm currently writing Java based
>Spark applications, I tried converting your class to Java (it seemed
>pretty straightforward).
>
>I set up the use of the class as follows:
>
>SparkConf conf = new SparkConf()
>.set("spark.hadoop.mapred.output.committer.class",
>"com.elsevier.common.DirectOutputCommitter");
>
>And I then try and save a file to S3 (which I believe should use the old
>hadoop apis).
>
>JavaPairRDD newBaselineRDDWritable =
>reducedhsfPairRDD.mapToPair(new ConvertToWritableTypes());
>newBaselineRDDWritable.saveAsHadoopFile(baselineOutputBucketFile,
>Text.class, Text.class, SequenceFileOutputFormat.class,
>org.apache.hadoop.io.compress.GzipCodec.class);
>
>But, I get the following error message.
>
>Exception in thread "main" java.lang.IncompatibleClassChangeError: Found
>class org.apache.hadoop.mapred.JobContext, but interface was expected
>at 
>com.elsevier.common.DirectOutputCommitter.commitJob(DirectOutputCommitter.
>java:68)
>at 
>org.apache.spark.SparkHadoopWriter.commitJob(SparkHadoopWriter.scala:127)
>at 
>org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions
>.scala:1075)
>at 
>org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.sc
>ala:940)
>at 
>org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.sc
>ala:902)
>at 
>org.apache.spark.api.java.JavaPairRDD.saveAsHadoopFile(JavaPairRDD.scala:7
>71)
>at com.elsevier.spark.SparkSyncDedup.main(SparkSyncDedup.java:156)
>
>In my class, JobContext is an interface of  type
>org.apache.hadoop.mapred.JobContext.
>
>Is there something obvious that I might be doing wrong (or messed up in
>the translation from Scala to Java) or something I should look into?  I'm
>using Spark 1.2 with hadoop 2.4.
>
>
>Thanks.
>
>Darin.
>
>
>
>
>
>From: Aaron Davidson 
>To: Andrew Ash 
>Cc: Josh Rosen ; Mingyu Kim ;
>"user@spark.apache.org" ; Aaron Davidson
>
>Sent: Saturday, February 21, 2015 7:01 PM
>Subject: Re: Which OutputCommitter to use for S3?
>
>
>
>Here is the class:
>https://urldefense.proofpoint.com/v2/url?u=https-3A__gist.github.com_aaron
>dav_c513916e72101bbe14ec&d=AwIFaQ&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6o
>Onmz8&r=ennQJq47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84&m=_2YAVrYZtQmuKZRf6sFs
>zOvl_-ZnxmkBPHo1K24TfGE&s=cwSCPKlJO-BJcz4UcGck3xOE2N-4V3eoNvgtFCdMLP8&e=
>
>You can use it by setting "mapred.output.committer.class" in the Hadoop
>configuration (or "spark.hadoop.mapred.output.committer.class" in the
>Spark configuration). Note that this only works for the old Hadoop APIs,
>I believe the new Hadoop APIs strongly tie committer to input format (so
>FileInputFormat always uses FileOutputCommitter), which makes this fix
>more difficult to apply.
>
>
>
>
>On Sat, Feb 21, 2015 at 12:12 PM, Andrew Ash  wrote:
>
>Josh is that class something you guys would consider open sourcing, or
>would you rather the community step up and create an OutputCommitter
>implementation optimized for S3?
>>
>>
>>On Fri, Feb 20, 2015 at 4:02 PM, Josh Rosen  wrote:
>>
>>We (Databricks) use our own DirectOutputCommitter implementation, which
>>is a couple tens of lines of Scala code.  The class would almost
>>entirely be a no-op except we took some care to properly handle the
>>_SUCCESS file.
>>>
>>>
>>>On Fri, Feb 20, 2015 at 3:52 PM, Mingyu Kim  wrote:
>>>
>>>I didn¹t get any response. It¹d be really appreciated if anyone using a
>>>special OutputCommitter for S3 can comment on this!


Thanks,
Mingyu


From: Mingyu Kim 
Date: Monday, February 16, 2015 at 1:15 AM
To: "user@spark.apache.org" 
Subject: Which OutputCommitter to use for S3?



HI all,


The default OutputCommitter used by RDD, which is FileOutputCommitter,
seems to require moving files at the commit step, which is not a
constant operation in S3, as discussed in
https://urldefense.proofpoint.com/v2/url?u=http-3A__mail-2Darchives.apa
che.org_mod-5Fmbox_spark-2Duser_201410.mbox_-253C543E33FA.2000802-40ent
ropy.be-253E&d=AwIFaQ&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8&r=e
nnQJq47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84&m=_2YAVrYZtQmuKZRf6sFszOvl_-
ZnxmkBPHo1K24TfGE&s=EQOZaHRANJupdjXCfHSXL2t5BZ9YgMt2pRc3pht4o7o&e= .
People seem to develop their own NullOutputCommitter implementation or
use DirectFileOutputCommitter (as mentioned in SPARK-3595), but I
wanted to check if there is a de facto standard, publicly available
OutputCommitter to use for S3 in conjunction with Spark.


Thanks,
Mingyu
>>>
>>
>
>-

Union and reduceByKey will trigger shuffle even same partition?

2015-02-23 Thread Shuai Zheng
Hi All,

 

I am running a simple page rank program, but it is slow. And I dig out part
of reason is there is shuffle happen when I call an union action even both
RDD share the same partition:

 

Below is my test code in spark shell:

 

import org.apache.spark.HashPartitioner

 

sc.getConf.set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")

val beta = 0.8

val numOfPartition = 6

  val links =
sc.textFile("c:/Download/web-Google.txt").filter(!_.contains("#")).map(line=
>{val part=line.split("\t");
(part(0).toInt,part(1).toInt)}).groupByKey.partitionBy(new
HashPartitioner(numOfPartition)).persist

  var ranks = links.mapValues(_ => 1.0)

  var leakedMatrix = links.mapValues(_ => (1.0-beta)).persist

 

  for (i <- 1 until 2) {

val contributions = links.join(ranks).flatMap {

  case (pageId, (links, rank)) =>

links.map(dest => (dest, rank / links.size * beta))

}

ranks = contributions.union(leakedMatrix).reduceByKey(_ + _)

  }  

  ranks.lookup(1)

 

In above code, links will join ranks and should preserve the partition, and
leakedMatrix also share the same partition, so I expect there is no shuffle
happen on the contributions.union(leakedMatrix), also on the coming
reduceByKey after that. But finally there is shuffle write for all steps,
map, groupByKey, Union, partitionBy, etc.

 

I expect there should only happen once on the shuffle then all should local
operation, but the screen shows not, do I have any misunderstanding here?

 





Re: Which OutputCommitter to use for S3?

2015-02-23 Thread Darin McBeath
Aaron.  Thanks for the class. Since I'm currently writing Java based Spark 
applications, I tried converting your class to Java (it seemed pretty 
straightforward). 

I set up the use of the class as follows:

SparkConf conf = new SparkConf()
.set("spark.hadoop.mapred.output.committer.class", 
"com.elsevier.common.DirectOutputCommitter");

And I then try and save a file to S3 (which I believe should use the old hadoop 
apis).

JavaPairRDD newBaselineRDDWritable = 
reducedhsfPairRDD.mapToPair(new ConvertToWritableTypes());
newBaselineRDDWritable.saveAsHadoopFile(baselineOutputBucketFile, Text.class, 
Text.class, SequenceFileOutputFormat.class, 
org.apache.hadoop.io.compress.GzipCodec.class);

But, I get the following error message.

Exception in thread "main" java.lang.IncompatibleClassChangeError: Found class 
org.apache.hadoop.mapred.JobContext, but interface was expected
at 
com.elsevier.common.DirectOutputCommitter.commitJob(DirectOutputCommitter.java:68)
at org.apache.spark.SparkHadoopWriter.commitJob(SparkHadoopWriter.scala:127)
at 
org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1075)
at 
org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:940)
at 
org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:902)
at org.apache.spark.api.java.JavaPairRDD.saveAsHadoopFile(JavaPairRDD.scala:771)
at com.elsevier.spark.SparkSyncDedup.main(SparkSyncDedup.java:156)

In my class, JobContext is an interface of  type 
org.apache.hadoop.mapred.JobContext.

Is there something obvious that I might be doing wrong (or messed up in the 
translation from Scala to Java) or something I should look into?  I'm using 
Spark 1.2 with hadoop 2.4.


Thanks.

Darin.





From: Aaron Davidson 
To: Andrew Ash  
Cc: Josh Rosen ; Mingyu Kim ; 
"user@spark.apache.org" ; Aaron Davidson 
 
Sent: Saturday, February 21, 2015 7:01 PM
Subject: Re: Which OutputCommitter to use for S3?



Here is the class: https://gist.github.com/aarondav/c513916e72101bbe14ec

You can use it by setting "mapred.output.committer.class" in the Hadoop 
configuration (or "spark.hadoop.mapred.output.committer.class" in the Spark 
configuration). Note that this only works for the old Hadoop APIs, I believe 
the new Hadoop APIs strongly tie committer to input format (so FileInputFormat 
always uses FileOutputCommitter), which makes this fix more difficult to apply.




On Sat, Feb 21, 2015 at 12:12 PM, Andrew Ash  wrote:

Josh is that class something you guys would consider open sourcing, or would 
you rather the community step up and create an OutputCommitter implementation 
optimized for S3?
>
>
>On Fri, Feb 20, 2015 at 4:02 PM, Josh Rosen  wrote:
>
>We (Databricks) use our own DirectOutputCommitter implementation, which is a 
>couple tens of lines of Scala code.  The class would almost entirely be a 
>no-op except we took some care to properly handle the _SUCCESS file.
>>
>>
>>On Fri, Feb 20, 2015 at 3:52 PM, Mingyu Kim  wrote:
>>
>>I didn’t get any response. It’d be really appreciated if anyone using a 
>>special OutputCommitter for S3 can comment on this!
>>>
>>>
>>>Thanks,
>>>Mingyu
>>>
>>>
>>>From: Mingyu Kim 
>>>Date: Monday, February 16, 2015 at 1:15 AM
>>>To: "user@spark.apache.org" 
>>>Subject: Which OutputCommitter to use for S3?
>>>
>>>
>>>
>>>HI all,
>>>
>>>
>>>The default OutputCommitter used by RDD, which is FileOutputCommitter, seems 
>>>to require moving files at the commit step, which is not a constant 
>>>operation in S3, as discussed in 
>>>http://mail-archives.apache.org/mod_mbox/spark-user/201410.mbox/%3c543e33fa.2000...@entropy.be%3E.
>>> People seem to develop their own NullOutputCommitter implementation or use 
>>>DirectFileOutputCommitter (as mentioned in SPARK-3595), but I wanted to 
>>>check if there is a de facto standard, publicly available OutputCommitter to 
>>>use for S3 in conjunction with Spark.
>>>
>>>
>>>Thanks,
>>>Mingyu
>>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Query data in Spark RRD

2015-02-23 Thread Tathagata Das
You could build a rest API, but you may have issue if you want to return
back arbitrary binary data. A more complex but robust alternative is to use
some RPC libraries like Akka, Thrift, etc.

TD

On Mon, Feb 23, 2015 at 12:45 AM, Nikhil Bafna 
wrote:

>
> Tathagata - Yes, I'm thinking on that line.
>
> The problem is how to send to send the query to the backend? Bundle a http
> server into a spark streaming job, that will accept the parameters?
>
> --
> Nikhil Bafna
>
> On Mon, Feb 23, 2015 at 2:04 PM, Tathagata Das 
> wrote:
>
>> You will have a build a split infrastructure - a front end that takes the
>> queries from the UI and sends them to the backend, and the backend (running
>> the Spark Streaming app) will actually run the queries on table created in
>> the contexts. The RPCs necessary between the frontend and backend will need
>> to be implemented by you.
>>
>> On Sat, Feb 21, 2015 at 11:57 PM, Nikhil Bafna > > wrote:
>>
>>>
>>> Yes. As my understanding, it would allow me to write SQLs to query a
>>> spark context. But, the query needs to be specified within a job & deployed.
>>>
>>> What I want is to be able to run multiple dynamic queries specified at
>>> runtime from a dashboard.
>>>
>>>
>>>
>>> --
>>> Nikhil Bafna
>>>
>>> On Sat, Feb 21, 2015 at 8:37 PM, Ted Yu  wrote:
>>>
 Have you looked at
 http://spark.apache.org/docs/1.2.0/api/scala/index.html#org.apache.spark.sql.SchemaRDD
 ?

 Cheers

 On Sat, Feb 21, 2015 at 4:24 AM, Nikhil Bafna <
 nikhil.ba...@flipkart.com> wrote:

>
> Hi.
>
> My use case is building a realtime monitoring system over
> multi-dimensional data.
>
> The way I'm planning to go about it is to use Spark Streaming to store
> aggregated count over all dimensions in 10 sec interval.
>
> Then, from a dashboard, I would be able to specify a query over some
> dimensions, which will need re-aggregation from the already computed job.
>
> My query is, how can I run dynamic queries over data in schema RDDs?
>
> --
> Nikhil Bafna
>


>>>
>>
>


Re: How to print more lines in spark-shell

2015-02-23 Thread Mark Hamstra
Yes, if you're willing to add an explicit foreach(println), then that is
the simplest solution.  Else changing maxPrintString should modify the
default output of the Scala/Spark REPL.

On Mon, Feb 23, 2015 at 11:25 AM, Sean Owen  wrote:

> I'd imagine that myRDD.take(10).foreach(println) is the most
> straightforward thing but yeah you can probably change shell default
> behavior too.
>
> On Mon, Feb 23, 2015 at 7:15 PM, Mark Hamstra 
> wrote:
> > That will produce very different output than just the 10 items that Manas
> > wants.
> >
> > This is essentially a Scala shell issue, so this should apply:
> >
> http://stackoverflow.com/questions/9516567/settings-maxprintstring-for-scala-2-9-repl
> >
> > On Mon, Feb 23, 2015 at 10:25 AM, Akhil Das 
> > wrote:
> >>
> >> You can do it like myRDD.foreach(println(_)) to print everything.
> >>
> >> Thanks
> >> Best Regards
> >>
> >> On Mon, Feb 23, 2015 at 11:49 PM, Manas Kar  >
> >> wrote:
> >>>
> >>> Hi experts,
> >>>  I am using Spark 1.2 from CDH5.3.
> >>>  When I issue commands like
> >>>  myRDD.take(10) the result gets truncated after 4-5 records.
> >>>
> >>> Is there a way to configure the same to show more items?
> >>>
> >>> ..Manas
> >>
> >>
> >
>


Re: Why must the dstream.foreachRDD(...) parameter be serializable?

2015-02-23 Thread Tathagata Das
There are different kinds of checkpointing going on. updateStateByKey
requires RDD checkpointing which can be enabled only by called
sparkContext.setCheckpointDirectory. But that does not enable Spark
Streaming driver checkpoints, which is necessary for recovering from driver
failures. That is enabled only by streamingContext.checkpoint(...) which
internally calls sparkContext.setCheckpointDirectory and also enables other
stuff.

TD

On Mon, Feb 23, 2015 at 1:28 AM, Tobias Pfeiffer  wrote:

> Sean,
>
> thanks for your message!
>
> On Mon, Feb 23, 2015 at 6:03 PM, Sean Owen  wrote:
>>
>> What I haven't investigated is whether you can enable checkpointing
>> for the state in updateStateByKey separately from this mechanism,
>> which is exactly your question. What happens if you set a checkpoint
>> dir, but do *not* use StreamingContext.getOrCreate, but *do* call
>> DStream.checkpoint?
>>
>
> I didn't even use StreamingContext.getOrCreate(), just calling
> streamingContext.checkpoint(...) blew everything up. Well, "blew up" in the
> sense that actor.OneForOneStrategy will print the stack trace of
> the java.io.NotSerializableException every couple of seconds and
> "something" is not going right with execution (I think).
>
> BUT, indeed, just calling sparkContext.setCheckpointDir seems to be
> sufficient for updateStateByKey! Looking at what
> streamingContext.checkpoint() does, I don't get why ;-) and I am not sure
> that this is a robust solution, but in fact that seems to work!
>
> Thanks a lot,
> Tobias
>
>


Re: Access time to an elemnt in cached RDD

2015-02-23 Thread Sean Owen
It may involve access an element of an RDD from a remote machine and
copying it back to the driver. That and the small overhead of job
scheduling could be a millisecond.

You're comparing to just reading an entry from memory, which is of
course faster.

I don't think you should think of an RDD as something you query at
scale in real-time. It's not a NoSQL store.

On Mon, Feb 23, 2015 at 6:02 PM, shahab  wrote:
> Hi,
>
> I just wonder what would be the access time to "take" one element from a
> cached RDD? if I have understood correctly, access to RDD elements is not as
> fast as accessing e.g. HashMap and it could take up to  mili seconds compare
> to nano seconds in HashMap, which is quite significant difference if you
> plan for near real-time response from Spark ?!
>
> best,
>
> /Shahab
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to print more lines in spark-shell

2015-02-23 Thread Sean Owen
I'd imagine that myRDD.take(10).foreach(println) is the most
straightforward thing but yeah you can probably change shell default
behavior too.

On Mon, Feb 23, 2015 at 7:15 PM, Mark Hamstra  wrote:
> That will produce very different output than just the 10 items that Manas
> wants.
>
> This is essentially a Scala shell issue, so this should apply:
> http://stackoverflow.com/questions/9516567/settings-maxprintstring-for-scala-2-9-repl
>
> On Mon, Feb 23, 2015 at 10:25 AM, Akhil Das 
> wrote:
>>
>> You can do it like myRDD.foreach(println(_)) to print everything.
>>
>> Thanks
>> Best Regards
>>
>> On Mon, Feb 23, 2015 at 11:49 PM, Manas Kar 
>> wrote:
>>>
>>> Hi experts,
>>>  I am using Spark 1.2 from CDH5.3.
>>>  When I issue commands like
>>>  myRDD.take(10) the result gets truncated after 4-5 records.
>>>
>>> Is there a way to configure the same to show more items?
>>>
>>> ..Manas
>>
>>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to print more lines in spark-shell

2015-02-23 Thread Mark Hamstra
That will produce very different output than just the 10 items that Manas
wants.

This is essentially a Scala shell issue, so this should apply:
http://stackoverflow.com/questions/9516567/settings-maxprintstring-for-scala-2-9-repl

On Mon, Feb 23, 2015 at 10:25 AM, Akhil Das 
wrote:

> You can do it like myRDD.foreach(println(_)) to print everything.
>
> Thanks
> Best Regards
>
> On Mon, Feb 23, 2015 at 11:49 PM, Manas Kar 
> wrote:
>
>> Hi experts,
>>  I am using Spark 1.2 from CDH5.3.
>>  When I issue commands like
>>  myRDD.take(10) the result gets truncated after 4-5 records.
>>
>> Is there a way to configure the same to show more items?
>>
>> ..Manas
>>
>
>


Re: RDD groupBy

2015-02-23 Thread Vijayasarathy Kannan
You are right. I was looking at the wrong logs. I ran it on my local
machine and saw that the println actually wrote the vertexIds. I was then
able to find the same in the executors' logs in the remote machine.

Thanks for the clarification.

On Mon, Feb 23, 2015 at 2:00 PM, Sean Owen  wrote:

> Here, println isn't happening on the driver. Are you sure you are
> looking at the right machine's logs?
>
> Yes this may be parallelized over many machines.
>
> On Mon, Feb 23, 2015 at 6:37 PM, kvvt  wrote:
> > In the snippet below,
> >
> > graph.edges.groupBy[VertexId](f1).foreach {
> >   edgesBySrc => {
> > f2(edgesBySrc).foreach {
> >   vertexId => {
> > *println(vertexId)*
> >   }
> > }
> >   }
> > }
> >
> > "f1" is a function that determines how to group the edges (in my case it
> > groups by source vertex)
> > "f2" is another function that does some computation on the edges. It
> returns
> > an iterable (Iterable[VertexId]).
> >
> > *Questions:*
> >
> > 1. The problem is that "println(vertexId)" doesn't printing anything. I
> have
> > made sure that "f2" doesn't return an empty iterable. I am not sure what
> I
> > am missing here.
> >
> > 2. I am assuming that "f2" is called for each group in parallel. Is this
> > correct? If not, what is the correct way to operate on each group in
> > parallel?
> >
> >
> > Thanks!
> >
> >
> >
> > --
> > View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/RDD-groupBy-tp21773.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > -
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > For additional commands, e-mail: user-h...@spark.apache.org
> >
>


Re: RDD groupBy

2015-02-23 Thread Sean Owen
Here, println isn't happening on the driver. Are you sure you are
looking at the right machine's logs?

Yes this may be parallelized over many machines.

On Mon, Feb 23, 2015 at 6:37 PM, kvvt  wrote:
> In the snippet below,
>
> graph.edges.groupBy[VertexId](f1).foreach {
>   edgesBySrc => {
> f2(edgesBySrc).foreach {
>   vertexId => {
> *println(vertexId)*
>   }
> }
>   }
> }
>
> "f1" is a function that determines how to group the edges (in my case it
> groups by source vertex)
> "f2" is another function that does some computation on the edges. It returns
> an iterable (Iterable[VertexId]).
>
> *Questions:*
>
> 1. The problem is that "println(vertexId)" doesn't printing anything. I have
> made sure that "f2" doesn't return an empty iterable. I am not sure what I
> am missing here.
>
> 2. I am assuming that "f2" is called for each group in parallel. Is this
> correct? If not, what is the correct way to operate on each group in
> parallel?
>
>
> Thanks!
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/RDD-groupBy-tp21773.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Requested array size exceeds VM limit

2015-02-23 Thread Sean Owen
It doesn't mean 'out of memory'; it means 'you can't allocate a byte[]
over 2GB in the JVM'. Something is serializing a huge block somewhere.
there are a number of related JIRAs and discussions on JIRA and this
mailing list; have a browse of those first for back story.

On Mon, Feb 23, 2015 at 6:44 PM, insperatum  wrote:
> Hi, I'm using MLLib to train a random forest. It's working fine to depth 15,
> but if I use depth 20 I get a java.lang.OutOfMemoryError: Requested array
> size exceeds VM limit on the driver, from the collectAsMap operation in
> DecisionTree.scala, around line 642. It doesn't happen until a good hour
> into training. I'm using 50 treees on 36 slaves with maxMemoryInMB=250, but
> still get an error even if I use a driver memory of 240G. Has anybody seen
> this error in this context before, and can advise on what might be
> triggering it? Best, Luke
> 
> View this message in context: Requested array size exceeds VM limit
> Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Requested array size exceeds VM limit

2015-02-23 Thread insperatum
Hi,I'm using MLLib to train a random forest. It's working fine to depth 15,
but if I use depth 20 I get a*java.lang.OutOfMemoryError: Requested array
size exceeds VM limit* on the driver, from the collectAsMap operation in
DecisionTree.scala, around line 642.It doesn't happen until a good hour into
training. I'm using 50 treees on 36 slaves with maxMemoryInMB=250, but still
get an error even if I use a driver memory of 240G. Has anybody seen this
error in this context before, and can advise on what might be triggering
it?Best,Luke



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Requested-array-size-exceeds-VM-limit-tp21774.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

RDD groupBy

2015-02-23 Thread kvvt
In the snippet below,

graph.edges.groupBy[VertexId](f1).foreach {
  edgesBySrc => {
f2(edgesBySrc).foreach {
  vertexId => {
*println(vertexId)*
  }
}
  }
}

"f1" is a function that determines how to group the edges (in my case it
groups by source vertex)
"f2" is another function that does some computation on the edges. It returns
an iterable (Iterable[VertexId]).

*Questions:*

1. The problem is that "println(vertexId)" doesn't printing anything. I have
made sure that "f2" doesn't return an empty iterable. I am not sure what I
am missing here.

2. I am assuming that "f2" is called for each group in parallel. Is this
correct? If not, what is the correct way to operate on each group in
parallel?


Thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/RDD-groupBy-tp21773.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to print more lines in spark-shell

2015-02-23 Thread Akhil Das
You can do it like myRDD.foreach(println(_)) to print everything.

Thanks
Best Regards

On Mon, Feb 23, 2015 at 11:49 PM, Manas Kar 
wrote:

> Hi experts,
>  I am using Spark 1.2 from CDH5.3.
>  When I issue commands like
>  myRDD.take(10) the result gets truncated after 4-5 records.
>
> Is there a way to configure the same to show more items?
>
> ..Manas
>


Re: How to integrate HBASE on Spark

2015-02-23 Thread sandeep vura
Hi Deepak,

Thanks for posting the link.Looks Like it supports only for cloudera
distributions as per given in github.

We are using apache hadoop multinode cluster not cloudera
distribution.Please confirm me whether i can use it on apache hadoop
cluster.

Regards,
Sandeep.v

On Mon, Feb 23, 2015 at 10:53 PM, Deepak Vohra 
wrote:

> Or, use the SparkOnHBase lab.
> http://blog.cloudera.com/blog/2014/12/new-in-cloudera-labs-sparkonhbase/
>
>  --
>  *From:* Ted Yu 
> *To:* Akhil Das 
> *Cc:* sandeep vura ; "user@spark.apache.org" <
> user@spark.apache.org>
> *Sent:* Monday, February 23, 2015 8:52 AM
> *Subject:* Re: How to integrate HBASE on Spark
>
> Installing hbase on hadoop cluster would allow hbase to utilize features
> provided by hdfs, such as short circuit read (See '90.2. Leveraging local
> data' under http://hbase.apache.org/book.html#perf.hdfs).
>
> Cheers
>
> On Sun, Feb 22, 2015 at 11:38 PM, Akhil Das 
> wrote:
>
> If you are having both the clusters on the same network, then i'd suggest
> you installing it on the hadoop cluster. If you install it on the spark
> cluster itself, then hbase might take up a few cpu cycles and there's a
> chance for the job to lag.
>
> Thanks
> Best Regards
>
> On Mon, Feb 23, 2015 at 12:48 PM, sandeep vura 
> wrote:
>
> Hi
>
> I had installed spark on 3 node cluster. Spark services are up and
> running.But i want to integrate hbase on spark
>
> Do i need to install HBASE on hadoop cluster or spark cluster.
>
> Please let me know asap.
>
> Regards,
> Sandeep.v
>
>
>
>
>
>


RE: FW: Submitting jobs to Spark EC2 cluster remotely

2015-02-23 Thread Oleg Shirokikh
Dear Patrick,

Thanks a lot again for your help.

> What happens if you submit from the master node itself on ec2 (in client 
> mode), does that work? What about in cluster mode?

If I SSH to the machine with Spark master, then everything works - shell, and 
regular submit in both client and cluster mode (after rsyncing the same jar I'm 
using for remote submission). Below is the output when I deploy in cluster mode 
from master machine itself:

//**//
root@ip-172-31-34-83 spark]$ ./bin/spark-submit --class SparkPi --master 
spark://ec2-52-10-138-75.us-west-2.compute.amazonaws.com:7077 
--deploy-mode=cluster 
/root/spark/sparktest/target/scala-2.10/ec2test_2.10-0.0.1.jar 100
Spark assembly has been built with Hive, including Datanucleus jars on classpath
Sending launch command to 
spark://ec2-52-10-138-75.us-west-2.compute.amazonaws.com:7077
Driver successfully submitted as driver-20150223174819-0008
... waiting before polling master for driver state
... polling master for driver state
State of driver-20150223174819-0008 is RUNNING
Driver running on ip-172-31-33-194.us-west-2.compute.internal:56183 
(worker-20150223171519-ip-172-31-33-194.us-west-2.compute.internal-56183)
//**//

Observation: when I submit the job from remote host (and all these warnings 
[..initial job has not accepted any resources...] and errors [..asked to remove 
non-existent executor..] start appearing) and leave it running, I 
simultaneously try to submit a job (or run a shell) from an EC2 node with 
master itself. In this scenario it starts to produce similar warnings (not 
errors) [..initial job has not accepted any resources...] and doesn't execute 
the job. Probably there are not enough cores devoted to 2 apps running 
simulateneously.


>It would be helpful if you could print the full command that the executor is 
>failing. That might show that spark.driver.host is being set strangely. IIRC 
>we print the launch command before starting the executor.

I'd be very happy to provide this command but I'm not sure where to find it... 
When I launch the submit script, I immediately see [WARN 
TaskSchedulerImpl:...]s and [ERROR SparkDeploySchedulerBackend]s in the 
terminal output.

In Master Web UI, I have this application running indefinitely (listed in 
"Running APplications" with State=RUNNING). When I go into this app UI I see 
tons of Executors listed in "Executor Summary" - at each moment two of them are 
RUNNING (I have two workers) and all others EXITED.

Here is stderr from one of the RUNNING ones:

/***/
15/02/23 18:11:49 INFO executor.CoarseGrainedExecutorBackend: Registered signal 
handlers for [TERM, HUP, INT]
15/02/23 18:11:49 INFO spark.SecurityManager: Changing view acls to: root,oleg
15/02/23 18:11:49 INFO spark.SecurityManager: Changing modify acls to: root,oleg
15/02/23 18:11:49 INFO spark.SecurityManager: SecurityManager: authentication 
disabled; ui acls disabled; users with view permissions: Set(root, oleg); users 
with modify permissions: Set(root, oleg)
15/02/23 18:11:49 INFO slf4j.Slf4jLogger: Slf4jLogger started
15/02/23 18:11:50 INFO Remoting: Starting remoting
15/02/23 18:11:50 INFO Remoting: Remoting started; listening on addresses 
:[akka.tcp://driverpropsfetc...@ip-172-31-33-195.us-west-2.compute.internal:57681]
15/02/23 18:11:50 INFO util.Utils: Successfully started service 
'driverPropsFetcher' on port 57681.
/*/

Here is stderr from one of the EXITED ones:

/***/
15/02/23 18:10:09 INFO executor.CoarseGrainedExecutorBackend: Registered signal 
handlers for [TERM, HUP, INT]
15/02/23 18:10:10 INFO spark.SecurityManager: Changing view acls to: root,oleg
15/02/23 18:10:10 INFO spark.SecurityManager: Changing modify acls to: root,oleg
15/02/23 18:10:10 INFO spark.SecurityManager: SecurityManager: authentication 
disabled; ui acls disabled; users with view permissions: Set(root, oleg); users 
with modify permissions: Set(root, oleg)
15/02/23 18:10:10 INFO slf4j.Slf4jLogger: Slf4jLogger started
15/02/23 18:10:10 INFO Remoting: Starting remoting
15/02/23 18:10:10 INFO Remoting: Remoting started; listening on addresses 
:[akka.tcp://driverpropsfetc...@ip-172-31-33-194.us-west-2.compute.internal:42607]
15/02/23 18:10:10 INFO util.Utils: Successfully started service 
'driverPropsFetcher' on port 42607.
15/02/23 18:10:40 ERROR security.UserGroupInformation: 
PriviledgedActionException as:oleg cause:java.util.concurrent.TimeoutException: 
Futures timed out after [30 seconds]
Exception in thread "main" java.lang.reflect.UndeclaredThrowableException: 
Unknown exception in doAs
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1134)
at 
org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:59)
at 
org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:115)
at 
org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(C

How to print more lines in spark-shell

2015-02-23 Thread Manas Kar
Hi experts,
 I am using Spark 1.2 from CDH5.3.
 When I issue commands like
 myRDD.take(10) the result gets truncated after 4-5 records.

Is there a way to configure the same to show more items?

..Manas


Access time to an elemnt in cached RDD

2015-02-23 Thread shahab
Hi,

I just wonder what would be the access time to "take" one element from a
cached RDD? if I have understood correctly, access to RDD elements is not
as fast as accessing e.g. HashMap and it could take up to  mili seconds
compare to nano seconds in HashMap, which is quite significant difference
if you plan for near real-time response from Spark ?!

best,

/Shahab


Re: Repartition and Worker Instances

2015-02-23 Thread Sameer Farooqui
In Standalone mode, a Worker JVM starts an Executor. Inside the Exec there
are slots for task threads. The slot count is configured by the num_cores
setting. Generally over subscribe this. So if you have 10 free CPU cores,
set num_cores to 20.

On Monday, February 23, 2015, Deep Pradhan 
wrote:

> How is task slot different from # of Workers?
>
>
> >> so don't read into any performance metrics you've collected to
> extrapolate what may happen at scale.
> I did not get you in this.
>
> Thank You
>
> On Mon, Feb 23, 2015 at 10:52 PM, Sameer Farooqui  > wrote:
>
>> In general you should first figure out how many task slots are in the
>> cluster and then repartition the RDD to maybe 2x that #. So if you have a
>> 100 slots, then maybe RDDs with partition count of 100-300 would be normal.
>>
>> But also size of each partition can matter. You want a task to operate on
>> a partition for at least 200ms, but no longer than around 20 seconds.
>>
>> Even if you have 100 slots, it could be okay to have a RDD with 10,000
>> partitions if you've read in a large file.
>>
>> So don't repartition your RDD to match the # of Worker JVMs, but rather
>> align it to the total # of task slots in the Executors.
>>
>> If you're running on a single node, shuffle operations become almost free
>> (because there's no network movement), so don't read into any
>> performance metrics you've collected to extrapolate what may happen at
>> scale.
>>
>>
>> On Monday, February 23, 2015, Deep Pradhan > > wrote:
>>
>>> Hi,
>>> If I repartition my data by a factor equal to the number of worker
>>> instances, will the performance be better or worse?
>>> As far as I understand, the performance should be better, but in my case
>>> it is becoming worse.
>>> I have a single node standalone cluster, is it because of this?
>>> Am I guaranteed to have a better performance if I do the same thing in a
>>> multi-node cluster?
>>>
>>> Thank You
>>>
>>
>


Re: Repartition and Worker Instances

2015-02-23 Thread Deep Pradhan
How is task slot different from # of Workers?


>> so don't read into any performance metrics you've collected to
extrapolate what may happen at scale.
I did not get you in this.

Thank You

On Mon, Feb 23, 2015 at 10:52 PM, Sameer Farooqui 
wrote:

> In general you should first figure out how many task slots are in the
> cluster and then repartition the RDD to maybe 2x that #. So if you have a
> 100 slots, then maybe RDDs with partition count of 100-300 would be normal.
>
> But also size of each partition can matter. You want a task to operate on
> a partition for at least 200ms, but no longer than around 20 seconds.
>
> Even if you have 100 slots, it could be okay to have a RDD with 10,000
> partitions if you've read in a large file.
>
> So don't repartition your RDD to match the # of Worker JVMs, but rather
> align it to the total # of task slots in the Executors.
>
> If you're running on a single node, shuffle operations become almost free
> (because there's no network movement), so don't read into any performance
> metrics you've collected to extrapolate what may happen at scale.
>
>
> On Monday, February 23, 2015, Deep Pradhan 
> wrote:
>
>> Hi,
>> If I repartition my data by a factor equal to the number of worker
>> instances, will the performance be better or worse?
>> As far as I understand, the performance should be better, but in my case
>> it is becoming worse.
>> I have a single node standalone cluster, is it because of this?
>> Am I guaranteed to have a better performance if I do the same thing in a
>> multi-node cluster?
>>
>> Thank You
>>
>


Re: Running Example Spark Program

2015-02-23 Thread Deepak Vohra
The Spark cluster has no memory allocated.
Memory: 0.0 B Total, 0.0 B Used
   From: Surendran Duraisamy <2013ht12...@wilp.bits-pilani.ac.in>
 To: user@spark.apache.org 
 Sent: Sunday, February 22, 2015 6:00 AM
 Subject: Running Example Spark Program
   
  Hello All,
 
 I am new to Apache Spark, I am trying to run JavaKMeans.java from Spark 
Examples in my Ubuntu System.
 
 I downloaded spark-1.2.1-bin-hadoop2.4.tgz and started sbin/start-master.sh
 
 After starting Spark and access http://localhost:8080/ to look at the status 
of my Spark Instance, and it shows as follows.
 

   - URL: spark://vm:7077
   - Workers: 0
   - Cores: 0 Total, 0 Used
   - Memory: 0.0 B Total, 0.0 B Used
   - Applications: 0 Running, 4 Completed
   - Drivers: 0 Running, 0 Completed
   - Status: ALIVE
 Number of Cores is 0 and Memory is 0.0B. I think because of this I am getting 
following error when I try to run JavaKMeans.java
  "Initial job has not accepted any resources; check your cluster UI to ensure 
that workers are registered and have sufficient memory"
  Am I missing any configuration before running sbin/start-master.sh?
  Regards,
 Surendran
 

  

Re: Force RDD evaluation

2015-02-23 Thread Nicholas Pritchard
Thanks, Sean! Yes, I agree that this logging would still have some cost and
so would not be used in production.

On Sat, Feb 21, 2015 at 1:37 AM, Sean Owen  wrote:

> I think the cheapest possible way to force materialization is something
> like
>
> rdd.foreachPartition(i => None)
>
> I get the use case, but as you can see there is a cost: you are forced
> to materialize an RDD and cache it just to measure the computation
> time. In principle this could be taking significantly more time than
> not doing so, since otherwise several RDD stages might proceed without
> ever even having to persist intermediate results in memory.
>
> Consider looking at the Spark UI to see how much time a stage took,
> although it's measuring end to end wall clock time, which may overlap
> with other computations.
>
> (or maybe you are disabling / enabling this logging for prod / test anyway)
>
> On Sat, Feb 21, 2015 at 4:46 AM, pnpritchard
>  wrote:
> > Is there a technique for forcing the evaluation of an RDD?
> >
> > I have used actions to do so but even the most basic "count" has a
> > non-negligible cost (even on a cached RDD, repeated calls to count take
> > time).
> >
> > My use case is for logging the execution time of the major components in
> my
> > application. At the end of each component I have a statement like
> > "rdd.cache().count()" and time how long it takes.
> >
> > Thanks in advance for any advice!
> > Nick
> >
> >
> >
> > --
> > View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Force-RDD-evaluation-tp21748.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > -
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > For additional commands, e-mail: user-h...@spark.apache.org
> >
>


  1   2   >