RE: Spark on YARN

2015-07-30 Thread Shao, Saisai
You’d better also check the log of nodemanager, sometimes because your memory 
usage exceeds the limit of Yarn container’s configuration.

I’ve met similar problem before, here is the warning log in nodemanager:

2015-07-07 17:06:07,141 WARN 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
 Container [pid=17385,containerID=container_1436259427993_0001_02_01] is 
running beyond virtual memory limits. Current usage: 318.1 MB of 1 GB physical 
memory used; 2.2 GB of 2.1 GB virtual memory used. Killing container.

The default pmem-vmem ratio is 2.1, but seems executor requires more vmem when 
started, so nodemanager will kill it. If you met similar problem, you could 
increase this configuration “yarn.nodemanager.vmem-pmem-ratio”.

Thanks
Jerry

From: Jeff Zhang [mailto:zjf...@gmail.com]
Sent: Thursday, July 30, 2015 4:36 PM
To: Jeetendra Gangele
Cc: user
Subject: Re: Spark on YARN

 15/07/30 12:13:35 ERROR yarn.ApplicationMaster: RECEIVED SIGNAL 15: SIGTERM

AM is killed somehow, may due to preemption. Does it always happen ? Resource 
manager log would be helpful.



On Thu, Jul 30, 2015 at 4:17 PM, Jeetendra Gangele 
gangele...@gmail.commailto:gangele...@gmail.com wrote:
I can't see the application logs here. All the logs are going into stderr. can 
anybody help here?

On 30 July 2015 at 12:21, Jeetendra Gangele 
gangele...@gmail.commailto:gangele...@gmail.com wrote:
I am running below command this is default spark PI program but this is not 
running all the log are going in stderr but at the terminal job is succeeding 
.I guess there are con issue job it not at all launching

/bin/spark-submit --class org.apache.spark.examples.SparkPi --master 
yarn-cluster lib/spark-examples-1.4.1-hadoop2.6.0.jar 10


Complete log


SLF4J: Class path contains multiple SLF4J bindings.

SLF4J: Found binding in 
[jar:file:/home/hadoop/tmp/nm-local-dir/usercache/hadoop/filecache/23/spark-assembly-1.4.1-hadoop2.6.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: Found binding in 
[jar:file:/opt/hadoop-2.7.0/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.

SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]

15/07/30 12:13:31 INFO yarn.ApplicationMaster: Registered signal handlers for 
[TERM, HUP, INT]

15/07/30 12:13:32 INFO yarn.ApplicationMaster: ApplicationAttemptId: 
appattempt_1438090734187_0010_01

15/07/30 12:13:33 INFO spark.SecurityManager: Changing view acls to: hadoop

15/07/30 12:13:33 INFO spark.SecurityManager: Changing modify acls to: hadoop

15/07/30 12:13:33 INFO spark.SecurityManager: SecurityManager: authentication 
disabled; ui acls disabled; users with view permissions: Set(hadoop); users 
with modify permissions: Set(hadoop)

15/07/30 12:13:33 INFO yarn.ApplicationMaster: Starting the user application in 
a separate Thread

15/07/30 12:13:33 INFO yarn.ApplicationMaster: Waiting for spark context 
initialization

15/07/30 12:13:33 INFO yarn.ApplicationMaster: Waiting for spark context 
initialization ...

15/07/30 12:13:33 INFO spark.SparkContext: Running Spark version 1.4.1

15/07/30 12:13:33 WARN spark.SparkConf:

SPARK_JAVA_OPTS was detected (set to '-Dspark.driver.port=53411').

This is deprecated in Spark 1.0+.



Please instead use:

 - ./spark-submit with conf/spark-defaults.conf to set defaults for an 
application

 - ./spark-submit with --driver-java-options to set -X options for a driver

 - spark.executor.extraJavaOptions to set -X options for executors

 - SPARK_DAEMON_JAVA_OPTS to set java options for standalone daemons (master or 
worker)



15/07/30 12:13:33 WARN spark.SparkConf: Setting 
'spark.executor.extraJavaOptions' to '-Dspark.driver.port=53411' as a 
work-around.

15/07/30 12:13:33 WARN spark.SparkConf: Setting 'spark.driver.extraJavaOptions' 
to '-Dspark.driver.port=53411' as a work-around.

15/07/30 12:13:33 INFO spark.SecurityManager: Changing view acls to: hadoop

15/07/30 12:13:33 INFO spark.SecurityManager: Changing modify acls to: hadoop

15/07/30 12:13:33 INFO spark.SecurityManager: SecurityManager: authentication 
disabled; ui acls disabled; users with view permissions: Set(hadoop); users 
with modify permissions: Set(hadoop)

15/07/30 12:13:33 INFO slf4j.Slf4jLogger: Slf4jLogger started

15/07/30 12:13:33 INFO Remoting: Starting remoting

15/07/30 12:13:34 INFO Remoting: Remoting started; listening on addresses 
:[akka.tcp://sparkDriver@10.21.1.77:53411http://sparkDriver@10.21.1.77:53411]

15/07/30 12:13:34 INFO util.Utils: Successfully started service 'sparkDriver' 
on port 53411.

15/07/30 12:13:34 INFO spark.SparkEnv: Registering MapOutputTracker

15/07/30 12:13:34 INFO spark.SparkEnv: Registering BlockManagerMaster

15/07/30 12:13:34 INFO storage.DiskBlockManager: Created local directory at 

RE: kafka offset commit in spark streaming 1.2

2015-07-06 Thread Shao, Saisai
If you’re using WAL with Kafka, Spark Streaming will ignore this 
configuration(autocommit.enable) and explicitly call commitOffset to update 
offset to Kafka AFTER WAL is done.

No matter what you’re setting with autocommit.enable, internally Spark 
Streaming will set it to false to turn off autocommit mechanism.

Thanks
Jerry

From: Shushant Arora [mailto:shushantaror...@gmail.com]
Sent: Monday, July 6, 2015 8:11 PM
To: user
Subject: kafka offset commit in spark streaming 1.2

In spark streaming 1.2 , Is offset of kafka message consumed are updated in 
zookeeper only after writing in WAL if WAL and checkpointig are enabled or is 
it depends upon kafkaparams while initialing the kafkaDstream.


MapString,String kafkaParams = new HashMapString, String();
kafkaParams.put(zookeeper.connect,ip:2181);
kafkaParams.put(group.idhttp://group.id, testgroup);

kafkaParams.put(zookeeper.session.timeout.mshttp://zookeeper.session.timeout.ms,
 1);
kafkaParams.put(autocommit.enable,true);

kafkaParams.put(zookeeper.sync.time.mshttp://zookeeper.sync.time.ms, 250);

 kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, 
byte[].class,kafka.serializer.DefaultDecoder.class , 
kafka.serializer.DefaultDecoder.class,
kafkaParams, topicsMap, 
StorageLevel.MEMORY_ONLY()));


Here since I have set autocommit.enable to true , does spark streaming will 
ignore this and always call explicit commitOffset high level  consumer 
connector or does it depends on parameter passed?

Since if it depends upon parameter and receiver calls explicit commit only when 
autocommit is false, then I should override the default autocommit to false 
from true while enabling WAL, since it may give duplicate in case of failure if 
WAL is enabled and autocommit is true.


RE: kafka offset commit in spark streaming 1.2

2015-07-06 Thread Shao, Saisai
Please see the inline comments.

From: Shushant Arora [mailto:shushantaror...@gmail.com]
Sent: Monday, July 6, 2015 8:51 PM
To: Shao, Saisai
Cc: user
Subject: Re: kafka offset commit in spark streaming 1.2

So If WAL is disabled, how developer can commit offset explicitly in spark 
streaming app since we don't write code which will be executed in receiver ?

I think it is difficult for user to commit offset explicitly in receiver-based 
Spark Streaming Kafka API.

If you want to explicitly commit offset, you could try Spark Streaming Kafka 
direct API, which is newly added in Spark 1.3+, where you could manage the 
offsets yourself, it is implemented based on Kafka’s low-level API.

Plus since offset commitment is asynchronoous, is it possible -it may happen 
last offset is not commited yet and next stream batch started on receiver and 
it may get duplicate data ?

Yes, it is possible, so receiver based Spark Streaming Kafka API cannot 
guarantee no duplication and no data lost. If you enable WAL, no data lost can 
be guaranteed by still you will meet duplication. So the best way is to use 
Spark Streaming Kafka direct API with your own offset management to make sure 
exact-once.



On Mon, Jul 6, 2015 at 6:16 PM, Shao, Saisai 
saisai.s...@intel.commailto:saisai.s...@intel.com wrote:
If you disable WAL, Spark Streaming itself will not manage any offset related 
things, is auto commit is enabled by true, Kafka itself will update offsets in 
a time-based way, if auto commit is disabled, no any part will call 
commitOffset, you need to call this API yourself.

Also Kafka’s offset commitment mechanism is actually a timer way, so it is 
asynchronized with replication.

From: Shushant Arora 
[mailto:shushantaror...@gmail.commailto:shushantaror...@gmail.com]
Sent: Monday, July 6, 2015 8:30 PM
To: Shao, Saisai
Cc: user
Subject: Re: kafka offset commit in spark streaming 1.2

And what if I disable WAL and use replication of receiver data using 
StorageLevel.MEMORY_ONLY2(). Will it commit offset after replicating the 
message or will it use autocommit.enable value. And if it uses this value what 
if autocommit.enable is set to false then when does receiver calls commitOffset?

On Mon, Jul 6, 2015 at 5:53 PM, Shao, Saisai 
saisai.s...@intel.commailto:saisai.s...@intel.com wrote:
If you’re using WAL with Kafka, Spark Streaming will ignore this 
configuration(autocommit.enable) and explicitly call commitOffset to update 
offset to Kafka AFTER WAL is done.

No matter what you’re setting with autocommit.enable, internally Spark 
Streaming will set it to false to turn off autocommit mechanism.

Thanks
Jerry

From: Shushant Arora 
[mailto:shushantaror...@gmail.commailto:shushantaror...@gmail.com]
Sent: Monday, July 6, 2015 8:11 PM
To: user
Subject: kafka offset commit in spark streaming 1.2

In spark streaming 1.2 , Is offset of kafka message consumed are updated in 
zookeeper only after writing in WAL if WAL and checkpointig are enabled or is 
it depends upon kafkaparams while initialing the kafkaDstream.


MapString,String kafkaParams = new HashMapString, String();
kafkaParams.put(zookeeper.connect,ip:2181);
kafkaParams.put(group.idhttp://group.id, testgroup);

kafkaParams.put(zookeeper.session.timeout.mshttp://zookeeper.session.timeout.ms,
 1);
kafkaParams.put(autocommit.enable,true);

kafkaParams.put(zookeeper.sync.time.mshttp://zookeeper.sync.time.ms, 250);

 kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, 
byte[].class,kafka.serializer.DefaultDecoder.class , 
kafka.serializer.DefaultDecoder.class,
kafkaParams, topicsMap, 
StorageLevel.MEMORY_ONLY()));


Here since I have set autocommit.enable to true , does spark streaming will 
ignore this and always call explicit commitOffset high level  consumer 
connector or does it depends on parameter passed?

Since if it depends upon parameter and receiver calls explicit commit only when 
autocommit is false, then I should override the default autocommit to false 
from true while enabling WAL, since it may give duplicate in case of failure if 
WAL is enabled and autocommit is true.




RE: kafka offset commit in spark streaming 1.2

2015-07-06 Thread Shao, Saisai
If you disable WAL, Spark Streaming itself will not manage any offset related 
things, is auto commit is enabled by true, Kafka itself will update offsets in 
a time-based way, if auto commit is disabled, no any part will call 
commitOffset, you need to call this API yourself.

Also Kafka’s offset commitment mechanism is actually a timer way, so it is 
asynchronized with replication.

From: Shushant Arora [mailto:shushantaror...@gmail.com]
Sent: Monday, July 6, 2015 8:30 PM
To: Shao, Saisai
Cc: user
Subject: Re: kafka offset commit in spark streaming 1.2

And what if I disable WAL and use replication of receiver data using 
StorageLevel.MEMORY_ONLY2(). Will it commit offset after replicating the 
message or will it use autocommit.enable value. And if it uses this value what 
if autocommit.enable is set to false then when does receiver calls commitOffset?

On Mon, Jul 6, 2015 at 5:53 PM, Shao, Saisai 
saisai.s...@intel.commailto:saisai.s...@intel.com wrote:
If you’re using WAL with Kafka, Spark Streaming will ignore this 
configuration(autocommit.enable) and explicitly call commitOffset to update 
offset to Kafka AFTER WAL is done.

No matter what you’re setting with autocommit.enable, internally Spark 
Streaming will set it to false to turn off autocommit mechanism.

Thanks
Jerry

From: Shushant Arora 
[mailto:shushantaror...@gmail.commailto:shushantaror...@gmail.com]
Sent: Monday, July 6, 2015 8:11 PM
To: user
Subject: kafka offset commit in spark streaming 1.2

In spark streaming 1.2 , Is offset of kafka message consumed are updated in 
zookeeper only after writing in WAL if WAL and checkpointig are enabled or is 
it depends upon kafkaparams while initialing the kafkaDstream.


MapString,String kafkaParams = new HashMapString, String();
kafkaParams.put(zookeeper.connect,ip:2181);
kafkaParams.put(group.idhttp://group.id, testgroup);

kafkaParams.put(zookeeper.session.timeout.mshttp://zookeeper.session.timeout.ms,
 1);
kafkaParams.put(autocommit.enable,true);

kafkaParams.put(zookeeper.sync.time.mshttp://zookeeper.sync.time.ms, 250);

 kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, 
byte[].class,kafka.serializer.DefaultDecoder.class , 
kafka.serializer.DefaultDecoder.class,
kafkaParams, topicsMap, 
StorageLevel.MEMORY_ONLY()));


Here since I have set autocommit.enable to true , does spark streaming will 
ignore this and always call explicit commitOffset high level  consumer 
connector or does it depends on parameter passed?

Since if it depends upon parameter and receiver calls explicit commit only when 
autocommit is false, then I should override the default autocommit to false 
from true while enabling WAL, since it may give duplicate in case of failure if 
WAL is enabled and autocommit is true.



RE: [SparkStreaming 1.3.0] Broadcast failure after setting spark.cleaner.ttl

2015-06-09 Thread Shao, Saisai
The shuffle data can be deleted through weak reference mechanism, you could 
check the code of ContextCleaner, also you could trigger a full gc manually 
with JVisualVM or some other tools to see if shuffle files are deleted.

Thanks
Jerry

From: Haopu Wang [mailto:hw...@qilinsoft.com]
Sent: Tuesday, June 9, 2015 5:28 PM
To: Shao, Saisai; user
Subject: RE: [SparkStreaming 1.3.0] Broadcast failure after setting 
spark.cleaner.ttl


Jerry, I agree with you.



However, in my case, I kept the monitoring the blockmanager folder. I do see 
sometimes the number of files decreased, but the folder's size kept increasing.



And below is a screenshot of the folder. You can see some old files are not 
deleted somehow.



[cid:image001.jpg@01D0A2DB.739904D0]



-Original Message-
From: Shao, Saisai [mailto:saisai.s...@intel.com]
Sent: Tuesday, June 09, 2015 4:33 PM
To: Haopu Wang; user
Subject: RE: [SparkStreaming 1.3.0] Broadcast failure after setting 
spark.cleaner.ttl



From the stack I think this problem may be due to the deletion of broadcast 
variable, as you set the spark.cleaner.ttl, so after this timeout limit, the 
old broadcast variable will be deleted,  you will meet this exception when you 
want to use it again after that time limit.



Basically I think you don't need to use this configuration, Spark Streaming 
will automatically delete the old, unused data, also Spark itself will delete 
this metadata using weak reference. Also this configuration will be deprecated 
in the coming release.





Thanks

Jerry



-Original Message-

From: Haopu Wang [mailto:hw...@qilinsoft.com]

Sent: Tuesday, June 9, 2015 3:30 PM

To: user

Subject: [SparkStreaming 1.3.0] Broadcast failure after setting 
spark.cleaner.ttl



When I ran a spark streaming application longer, I noticed the local 
directory's size was kept increasing.



I set spark.cleaner.ttl to 1800 seconds in order clean the metadata.



The spark streaming batch duration is 10 seconds and checkpoint duration is 10 
minutes.



The setting took effect but after that, below exception happened.



Do you have any idea about this error? Thank you!







15/06/09 12:57:30 WARN TaskSetManager: Lost task 3.0 in stage 5038.0 (TID 
27045, host2): java.io.IOException:

org.apache.spark.SparkException: Failed to get broadcast_82_piece0 of

broadcast_82

at

org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1155)

at

org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBr

oadcast.scala:164)

at

org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBro

adcast.scala:64)

at

org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scal

a:64)

at

org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.sc

ala:87)

at

org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)

at

org.apache.spark.streaming.dstream.HashmapEnrichDStream$$anonfun$compute

$3.apply(HashmapEnrichDStream.scala:39)

at

org.apache.spark.streaming.dstream.HashmapEnrichDStream$$anonfun$compute

$3.apply(HashmapEnrichDStream.scala:39)

at

scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390)

at

scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)

at

scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)

at

scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)

at

scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)

at

scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)

at

scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)

at

scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)

at

scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)

at

org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter

.scala:202)

at

org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.

scala:56)

at

org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:6

8)

at

org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:4

1)

at org.apache.spark.scheduler.Task.run(Task.scala:64)

at

org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)

at

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.jav

a:1145)

at

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.ja

va:615)

at java.lang.Thread.run(Thread.java:745)

Caused by: org.apache.spark.SparkException: Failed to get

broadcast_82_piece0 of broadcast_82

at

org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br

oadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast

.scala:137)

at

org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org

RE: [SparkStreaming 1.3.0] Broadcast failure after setting spark.cleaner.ttl

2015-06-09 Thread Shao, Saisai
From the stack I think this problem may be due to the deletion of broadcast 
variable, as you set the spark.cleaner.ttl, so after this timeout limit, the 
old broadcast variable will be deleted,  you will meet this exception when you 
want to use it again after that time limit.

Basically I think you don't need to use this configuration, Spark Streaming 
will automatically delete the old, unused data, also Spark itself will delete 
this metadata using weak reference. Also this configuration will be deprecated 
in the coming release.

Thanks
Jerry

-Original Message-
From: Haopu Wang [mailto:hw...@qilinsoft.com] 
Sent: Tuesday, June 9, 2015 3:30 PM
To: user
Subject: [SparkStreaming 1.3.0] Broadcast failure after setting 
spark.cleaner.ttl

When I ran a spark streaming application longer, I noticed the local 
directory's size was kept increasing.

I set spark.cleaner.ttl to 1800 seconds in order clean the metadata.

The spark streaming batch duration is 10 seconds and checkpoint duration is 10 
minutes.

The setting took effect but after that, below exception happened.

Do you have any idea about this error? Thank you!



15/06/09 12:57:30 WARN TaskSetManager: Lost task 3.0 in stage 5038.0 (TID 
27045, host2): java.io.IOException:
org.apache.spark.SparkException: Failed to get broadcast_82_piece0 of
broadcast_82
at
org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1155)
at
org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBr
oadcast.scala:164)
at
org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBro
adcast.scala:64)
at
org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scal
a:64)
at
org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.sc
ala:87)
at
org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
at
org.apache.spark.streaming.dstream.HashmapEnrichDStream$$anonfun$compute
$3.apply(HashmapEnrichDStream.scala:39)
at
org.apache.spark.streaming.dstream.HashmapEnrichDStream$$anonfun$compute
$3.apply(HashmapEnrichDStream.scala:39)
at
scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390)
at
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at
scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
at
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at
scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
at
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter
.scala:202)
at
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.
scala:56)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:6
8)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:4
1)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.jav
a:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.ja
va:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: Failed to get
broadcast_82_piece0 of broadcast_82
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br
oadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast
.scala:137)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br
oadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast
.scala:137)
at scala.Option.getOrElse(Option.scala:120)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br
oadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.sc
ala:136)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br
oadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:119)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br
oadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:119)
at scala.collection.immutable.List.foreach(List.scala:318)
at
org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$T
orrentBroadcast$$readBlocks(TorrentBroadcast.scala:119)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$
1.apply(TorrentBroadcast.scala:174)
at
org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1152)
... 25 more

15/06/09 12:57:30 ERROR 

RE: Possible long lineage issue when using DStream to update a normal RDD

2015-05-08 Thread Shao, Saisai
I think you could use checkpoint to cut the lineage of `MyRDD`, I have a 
similar scenario and I use checkpoint to workaround this problem :)

Thanks
Jerry

-Original Message-
From: yaochunnan [mailto:yaochun...@gmail.com] 
Sent: Friday, May 8, 2015 1:57 PM
To: user@spark.apache.org
Subject: Possible long lineage issue when using DStream to update a normal RDD

Hi all,
Recently in our project, we need to update a RDD using data regularly received 
from DStream, I plan to use foreachRDD API to achieve this:
var MyRDD = ...
dstream.foreachRDD { rdd =
  MyRDD = MyRDD.join(rdd)...
  ...
}

Is this usage correct? My concern is, as I am repeatedly and endlessly 
reassigning MyRDD in order to update it, will it create a too long RDD lineage 
to process when I want to query MyRDD later on (similar as
https://issues.apache.org/jira/browse/SPARK-4672) ? 

Maybe I should:
1. cache or checkpoint latest MyRDD and unpersist old MyRDD every time a 
dstream comes in.
2. use the unpublished IndexedRDD
(https://github.com/amplab/spark-indexedrdd) to conduct efficient RDD update.

As I lack experience using Spark Streaming and indexedRDD, I am here to make 
sure my thoughts are on the right track. Your wise suggestions will be greatly 
appreciated.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Possible-long-lineage-issue-when-using-DStream-to-update-a-normal-RDD-tp22812.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Possible long lineage issue when using DStream to update a normal RDD

2015-05-08 Thread Shao, Saisai
IIUC only checkpoint will clean the lineage information, cache will not cut the 
lineage. Also checkpoint will put the data in HDFS, not local disk :)

I think you can use foreachRDD to do such RDD update work, it’s OK as I know 
from your code snippet.

From: Chunnan Yao [mailto:yaochun...@gmail.com]
Sent: Friday, May 8, 2015 2:51 PM
To: Shao, Saisai
Cc: user@spark.apache.org
Subject: Re: Possible long lineage issue when using DStream to update a normal 
RDD

Thank you for this suggestion! But may I ask what's the advantage to use 
checkpoint instead of cache here? Cuz they both cut lineage. I only know 
checkpoint saves RDD in disk, while cache in memory. So may be it's for 
reliability?

Also on http://spark.apache.org/docs/latest/streaming-programming-guide.html, I 
have not seen usage of foreachRDD like mine. Here I am not pushing data to 
external system. I just use it to update an RDD in Spark. Is this right?



2015-05-08 14:03 GMT+08:00 Shao, Saisai 
saisai.s...@intel.commailto:saisai.s...@intel.com:
I think you could use checkpoint to cut the lineage of `MyRDD`, I have a 
similar scenario and I use checkpoint to workaround this problem :)

Thanks
Jerry

-Original Message-
From: yaochunnan [mailto:yaochun...@gmail.commailto:yaochun...@gmail.com]
Sent: Friday, May 8, 2015 1:57 PM
To: user@spark.apache.orgmailto:user@spark.apache.org
Subject: Possible long lineage issue when using DStream to update a normal RDD

Hi all,
Recently in our project, we need to update a RDD using data regularly received 
from DStream, I plan to use foreachRDD API to achieve this:
var MyRDD = ...
dstream.foreachRDD { rdd =
  MyRDD = MyRDD.join(rdd)...
  ...
}

Is this usage correct? My concern is, as I am repeatedly and endlessly 
reassigning MyRDD in order to update it, will it create a too long RDD lineage 
to process when I want to query MyRDD later on (similar as
https://issues.apache.org/jira/browse/SPARK-4672) ?

Maybe I should:
1. cache or checkpoint latest MyRDD and unpersist old MyRDD every time a 
dstream comes in.
2. use the unpublished IndexedRDD
(https://github.com/amplab/spark-indexedrdd) to conduct efficient RDD update.

As I lack experience using Spark Streaming and indexedRDD, I am here to make 
sure my thoughts are on the right track. Your wise suggestions will be greatly 
appreciated.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Possible-long-lineage-issue-when-using-DStream-to-update-a-normal-RDD-tp22812.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org For 
additional commands, e-mail: 
user-h...@spark.apache.orgmailto:user-h...@spark.apache.org



RE: shuffle.FetchFailedException in spark on YARN job

2015-04-20 Thread Shao, Saisai
I don’t think this problem is related to Netty or NIO, switching to nio will 
not change this part of code path to get the index file for sort-based shuffle 
reader.

I think you could check your system from some aspects:

1. Is there any hardware problem like disk full or others which makes this file 
lost or non-exist, this can introduce such exception.
2. Do you have any other exception besides this one, mainly the shuffle fetch 
failed problem means your job is in abnormal status, some other problems may 
also introduce this error.

Thanks
Jerry

From: Akhil Das [mailto:ak...@sigmoidanalytics.com]
Sent: Monday, April 20, 2015 2:56 PM
To: roy
Cc: user@spark.apache.org
Subject: Re: shuffle.FetchFailedException in spark on YARN job

Which version of Spark are you using? Did you try using 
spark.shuffle.blockTransferService=nio

Thanks
Best Regards

On Sat, Apr 18, 2015 at 11:14 PM, roy rp...@njit.edumailto:rp...@njit.edu 
wrote:
Hi,

 My spark job is failing with following error message

org.apache.spark.shuffle.FetchFailedException:
/mnt/ephemeral12/yarn/nm/usercache/abc/appcache/application_1429353954024_1691/spark-local-20150418132335-0723/28/shuffle_3_1_0.index
(No such file or directory)
at
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
at
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
at
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at
org.apache.spark.graphx.impl.EdgePartition.updateVertices(EdgePartition.scala:89)
at
org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:75)
at
org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:73)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:210)
at
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:65)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.FileNotFoundException:
/mnt/ephemeral12/yarn/nm/usercache/abc/appcache/application_1429353954024_1691/spark-local-20150418132335-0723/28/shuffle_3_1_0.index
(No such file or directory)
at java.io.FileInputStream.open(Native Method)
at java.io.FileInputStream.init(FileInputStream.java:146)
at
org.apache.spark.shuffle.IndexShuffleBlockManager.getBlockData(IndexShuffleBlockManager.scala:109)
at
org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:305)
at
org.apache.spark.storage.ShuffleBlockFetcherIterator.fetchLocalBlocks(ShuffleBlockFetcherIterator.scala:235)
at
org.apache.spark.storage.ShuffleBlockFetcherIterator.initialize(ShuffleBlockFetcherIterator.scala:268)
at
org.apache.spark.storage.ShuffleBlockFetcherIterator.init(ShuffleBlockFetcherIterator.scala:115)
at
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:76)
at
org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at
org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
... 7 more

)


my job 

RE: Spark Directed Acyclic Graph / Jobs

2015-04-17 Thread Shao, Saisai
I think this paper will be a good resource 
(https://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf), also the paper 
of Dryad is also a good one.

Thanks
Jerry

From: James King [mailto:jakwebin...@gmail.com]
Sent: Friday, April 17, 2015 3:26 PM
To: user
Subject: Spark Directed Acyclic Graph / Jobs

Is there a good resource that explains how Spark jobs gets broken down to tasks 
and executions.

I just need to get a better understanding of this.

Regards
j



RE: Spark + Kafka

2015-04-01 Thread Shao, Saisai
OK, seems there’s nothing strange from your code. So maybe we need to narrow 
down the cause, would you please run KafkaWordCount example in Spark to see if 
it is OK, if this is OK, then we should focus on your implementation, otherwise 
Kafka potentially has some problems.

Thanks
Jerry

From: James King [mailto:jakwebin...@gmail.com]
Sent: Wednesday, April 1, 2015 6:59 PM
To: Saisai Shao
Cc: bit1...@163.com; user
Subject: Re: Spark + Kafka

This is the  code. And I couldn't find anything like the log you suggested.

public KafkaLogConsumer(int duration, String master) {
JavaStreamingContext spark = 
createSparkContext(duration, master);

MapString, Integer topics = new HashMapString, 
Integer();
topics.put(test, 1);

JavaPairDStreamString, String input = 
KafkaUtils.createStream(spark, somesparkhost:2181, groupid, topics);
input.print();

spark.start();
spark.awaitTermination();
}

private JavaStreamingContext createSparkContext(int duration, 
String master) {

SparkConf sparkConf = new SparkConf()

.setAppName(this.getClass().getSimpleName())

.setMaster(master);
JavaStreamingContext ssc = new 
JavaStreamingContext(sparkConf,


Durations.seconds(duration));
return ssc;
}

On Wed, Apr 1, 2015 at 11:37 AM, James King 
jakwebin...@gmail.commailto:jakwebin...@gmail.com wrote:
Thanks Saisai,

Sure will do.

But just a quick note that when i set master as local[*] Spark started 
showing Kafka messages as expected, so the problem in my view was to do with 
not enough threads to process the incoming data.

Thanks.


On Wed, Apr 1, 2015 at 10:53 AM, Saisai Shao 
sai.sai.s...@gmail.commailto:sai.sai.s...@gmail.com wrote:
Would you please share your code snippet please, so we can identify is there 
anything wrong in your code.

Beside would you please grep your driver's debug log to see if there's any 
debug log about Stream xxx received block xxx, this means that Spark 
Streaming is keeping receiving data from sources like Kafka.


2015-04-01 16:18 GMT+08:00 James King 
jakwebin...@gmail.commailto:jakwebin...@gmail.com:
Thank you bit1129,

From looking at the web UI i can see 2 cores

Also looking at http://spark.apache.org/docs/1.2.1/configuration.html

But can't see obvious configuration for number of receivers can you help please.


On Wed, Apr 1, 2015 at 9:39 AM, bit1...@163.commailto:bit1...@163.com 
bit1...@163.commailto:bit1...@163.com wrote:
Please make sure that you have given more cores than Receiver numbers.




From: James Kingmailto:jakwebin...@gmail.com
Date: 2015-04-01 15:21
To: usermailto:user@spark.apache.org
Subject: Spark + Kafka
I have a simple setup/runtime of Kafka and Sprak.

I have a command line consumer displaying arrivals to Kafka topic. So i know 
messages are being received.

But when I try to read from Kafka topic I get no messages, here are some logs 
below.

I'm thinking there aren't enough threads. How do i check that.

Thank you.

2015-04-01 08:56:50 INFO  JobScheduler:59 - Starting job streaming job 
142787141 ms.0 from job set of time 142787141 ms
2015-04-01 08:56:50 INFO  JobScheduler:59 - Finished job streaming job 
142787141 ms.0 from job set of time 142787141 ms
2015-04-01 08:56:50 INFO  JobScheduler:59 - Total delay: 0.002 s for time 
142787141 ms (execution: 0.000 s)
2015-04-01 08:56:50 DEBUG JobGenerator:63 - Got event 
ClearMetadata(142787141 ms)
2015-04-01 08:56:50 DEBUG DStreamGraph:63 - Clearing metadata for time 
142787141 ms
2015-04-01 08:56:50 DEBUG ForEachDStream:63 - Clearing references to old RDDs: 
[]
2015-04-01 08:56:50 DEBUG ForEachDStream:63 - Unpersisting old RDDs:
2015-04-01 08:56:50 DEBUG ForEachDStream:63 - Cleared 0 RDDs that were older 
than 1427871405000 ms:
2015-04-01 08:56:50 DEBUG KafkaInputDStream:63 - Clearing references to old 
RDDs: [1427871405000 ms - 8]
2015-04-01 08:56:50 DEBUG KafkaInputDStream:63 - Unpersisting old RDDs: 8
2015-04-01 08:56:50 INFO  BlockRDD:59 - Removing RDD 8 from persistence list
2015-04-01 08:56:50 DEBUG BlockManagerMasterActor:50 - [actor] received message 
RemoveRdd(8) from Actor[akka://sparkDriver/temp/$n]
2015-04-01 08:56:50 DEBUG BlockManagerSlaveActor:50 - [actor] received message 
RemoveRdd(8) from Actor[akka://sparkDriver/temp/$o]
2015-04-01 08:56:50 DEBUG BlockManagerMasterActor:56 - [actor] handled message 
(0.287257 ms) RemoveRdd(8) from 

RE: [spark-streaming] can shuffle write to disk be disabled?

2015-03-18 Thread Shao, Saisai
From the log you pasted I think this (-rw-r--r--  1 root root  80K Mar 18 
16:54 shuffle_47_519_0.data) is not shuffle spilled data, but the final 
shuffle result. As I said, did you think shuffle is the bottleneck which makes 
your job running slowly? Maybe you should identify the cause at first. Besides 
from the log it looks your memory is not enough the cache the data, maybe you 
should increase the memory size of the executor.



Thanks

Jerry

From: Darren Hoo [mailto:darren@gmail.com]
Sent: Wednesday, March 18, 2015 6:41 PM
To: Akhil Das
Cc: user@spark.apache.org
Subject: Re: [spark-streaming] can shuffle write to disk be disabled?

I've already done that:

From SparkUI Environment  Spark properties has:

spark.shuffle.spill

false



On Wed, Mar 18, 2015 at 6:34 PM, Akhil Das 
ak...@sigmoidanalytics.commailto:ak...@sigmoidanalytics.com wrote:
I think you can disable it with spark.shuffle.spill=false

Thanks
Best Regards

On Wed, Mar 18, 2015 at 3:39 PM, Darren Hoo 
darren@gmail.commailto:darren@gmail.com wrote:
Thanks, Shao

On Wed, Mar 18, 2015 at 3:34 PM, Shao, Saisai 
saisai.s...@intel.commailto:saisai.s...@intel.com wrote:
Yeah, as I said your job processing time is much larger than the sliding 
window, and streaming job is executed one by one in sequence, so the next job 
will wait until the first job is finished, so the total latency will be 
accumulated.

I think you need to identify the bottleneck of your job at first. If the 
shuffle is so slow, you could enlarge the shuffle fraction of memory to reduce 
the spill, but finally the shuffle data will be written to disk, this cannot be 
disabled, unless you mount your spark.tmp.dir on ramdisk.


I have increased spark.shuffle.memoryFraction  to  0.8  which I can see from 
SparKUI's environment variables

But spill  always happens even from start when latency is less than slide 
window(I changed it to 10 seconds),
the shuflle data disk written is really a snow ball effect,  it slows down 
eventually.

I noticed that the files spilled to disk are all very small in size but huge in 
numbers:


total 344K

drwxr-xr-x  2 root root 4.0K Mar 18 16:55 .

drwxr-xr-x 66 root root 4.0K Mar 18 16:39 ..

-rw-r--r--  1 root root  80K Mar 18 16:54 shuffle_47_519_0.data

-rw-r--r--  1 root root  75K Mar 18 16:54 shuffle_48_419_0.data

-rw-r--r--  1 root root  36K Mar 18 16:54 shuffle_48_518_0.data

-rw-r--r--  1 root root  69K Mar 18 16:55 shuffle_49_319_0.data

-rw-r--r--  1 root root  330 Mar 18 16:55 shuffle_49_418_0.data

-rw-r--r--  1 root root  65K Mar 18 16:55 shuffle_49_517_0.data

MemStore says:


15/03/18 17:59:43 WARN MemoryStore: Failed to reserve initial memory threshold 
of 1024.0 KB for computing block rdd_1338_2 in memory.

15/03/18 17:59:43 WARN MemoryStore: Not enough space to cache rdd_1338_2 in 
memory! (computed 512.0 B so far)

15/03/18 17:59:43 INFO MemoryStore: Memory use = 529.0 MB (blocks) + 0.0 B 
(scratch space shared across 0 thread(s)) = 529.0 MB. Storage limit = 529.9 MB.
Not enough space even for 512 byte??


The executors still has plenty free memory:
0

   slave1:40778

0

  0.0 B / 529.9 MB

0.0 B

16

0

15047

15063

2.17 h

0.0 B

402.3 MB

768.0 B

1

slave2:50452

0

0.0 B / 529.9 MB

0.0 B

16

0

14447

14463

2.17 h

0.0 B

388.8 MB

1248.0 B



1

lvs02:47325

   116

27.6 MB / 529.9 MB

0.0 B

8

0

58169

58177

3.16 h

893.5 MB

624.0 B

1189.9 MB



driver

lvs02:47041

0

0.0 B / 529.9 MB

0.0 B

0

0

0

0

0 ms

0.0 B

0.0 B

0.0 B



Besides if CPU or network is the bottleneck, you might need to add more 
resources to your cluster.

 3 dedicated servers each with CPU 16 cores + 16GB memory and Gigabyte network.
 CPU load is quite low , about 1~3 from top,  and network usage  is far from 
saturated.

 I don't even  do any usefull complex calculations in this small Simple App yet.






RE: [spark-streaming] can shuffle write to disk be disabled?

2015-03-18 Thread Shao, Saisai
Yeah, as I said your job processing time is much larger than the sliding 
window, and streaming job is executed one by one in sequence, so the next job 
will wait until the first job is finished, so the total latency will be 
accumulated.

I think you need to identify the bottleneck of your job at first. If the 
shuffle is so slow, you could enlarge the shuffle fraction of memory to reduce 
the spill, but finally the shuffle data will be written to disk, this cannot be 
disabled, unless you mount your spark.tmp.dir on ramdisk.

Besides if CPU or network is the bottleneck, you might need to add more 
resources to your cluster.

Thanks
Jerry

From: Darren Hoo [mailto:darren@gmail.com]
Sent: Wednesday, March 18, 2015 3:24 PM
To: Shao, Saisai
Cc: user@spark.apache.org
Subject: Re: [spark-streaming] can shuffle write to disk be disabled?

Hi, Saisai

Here is the duration of one of the jobs, 22 seconds in total, it is longer than 
the sliding window.

Stage Id  Description   Submitted Duration
Tasks: Succeeded/Total Input Output Shuffle Read  Shuffle Write
342 foreach at SimpleApp.scala:58   2015/03/18 15:06:58   16 s 
288/28810.6 MB
341   window at SimpleApp.scala:512015/03/18 15:06:52 6s   
288/288   12.3 MB  10.6 MB

And part of the driver log:


15/03/18 15:16:36 INFO DStreamGraph: Cleared checkpoint data for time 
1426662996000 ms

15/03/18 15:16:36 INFO ReceivedBlockTracker: Deleting batches 
ArrayBuffer(1426662932000 ms)

15/03/18 15:16:36 INFO TaskSetManager: Starting task 81.0 in stage 392.0 (TID 
100515, lvs02, PROCESS_LOCAL, 1122 bytes)

15/03/18 15:16:36 INFO TaskSetManager: Finished task 75.0 in stage 392.0 (TID 
100509) in 370 ms on lvs02 (75/291)

15/03/18 15:16:36 INFO TaskSetManager: Starting task 82.0 in stage 392.0 (TID 
100516, lvs02, PROCESS_LOCAL, 1122 bytes)

15/03/18 15:16:36 INFO TaskSetManager: Finished task 77.0 in stage 392.0 (TID 
100511) in 261 ms on lvs02 (76/291)

15/03/18 15:16:36 INFO TaskSetManager: Starting task 83.0 in stage 392.0 (TID 
100517, lvs02, PROCESS_LOCAL, 1122 bytes)

15/03/18 15:16:36 INFO TaskSetManager: Finished task 78.0 in stage 392.0 (TID 
100512) in 274 ms on lvs02 (77/291)

15/03/18 15:16:36 INFO TaskSetManager: Starting task 84.0 in stage 392.0 (TID 
100518, lvs02, PROCESS_LOCAL, 1122 bytes)

15/03/18 15:16:36 INFO TaskSetManager: Finished task 74.0 in stage 392.0 (TID 
100508) in 569 ms on lvs02 (78/291)

15/03/18 15:16:36 INFO BlockManagerInfo: Added input-0-1426662996000 in memory 
on lvs02:38954 (size: 398.3 KB, free: 1073.7 MB)

15/03/18 15:16:36 INFO TaskSetManager: Starting task 85.0 in stage 392.0 (TID 
100519, lvs02, PROCESS_LOCAL, 1122 bytes)

15/03/18 15:16:36 INFO TaskSetManager: Finished task 76.0 in stage 392.0 (TID 
100510) in 539 ms on lvs02 (79/291)

15/03/18 15:16:36 INFO TaskSetManager: Starting task 86.0 in stage 392.0 (TID 
100520, lvs02, PROCESS_LOCAL, 1122 bytes)

15/03/18 15:16:36 INFO TaskSetManager: Finished task 80.0 in stage 392.0 (TID 
100514) in 296 ms on lvs02 (80/291)

15/03/18 15:16:36 INFO TaskSetManager: Starting task 87.0 in stage 392.0 (TID 
100521, lvs02, PROCESS_LOCAL, 1122 bytes)

15/03/18 15:16:36 INFO TaskSetManager: Finished task 81.0 in stage 392.0 (TID 
100515) in 292 ms on lvs02 (81/291)

15/03/18 15:16:36 INFO TaskSetManager: Starting task 88.0 in stage 392.0 (TID 
100522, lvs02, PROCESS_LOCAL, 1122 bytes)

15/03/18 15:16:36 INFO TaskSetManager: Finished task 82.0 in stage 392.0 (TID 
100516) in 331 ms on lvs02 (82/291)

15/03/18 15:16:36 INFO TaskSetManager: Starting task 89.0 in stage 392.0 (TID 
100523, lvs02, PROCESS_LOCAL, 1122 bytes)

15/03/18 15:16:36 INFO TaskSetManager: Finished task 83.0 in stage 392.0 (TID 
100517) in 271 ms on lvs02 (83/291)

15/03/18 15:16:36 INFO BlockManagerInfo: Added input-0-1426662996200 in memory 
on lvs02:38954 (size: 31.0 KB, free: 1073.7 MB)

15/03/18 15:16:36 INFO TaskSetManager: Starting task 90.0 in stage 392.0 (TID 
100524, lvs02, PROCESS_LOCAL, 1122 bytes)

15/03/18 15:16:36 INFO TaskSetManager: Finished task 79.0 in stage 392.0 (TID 
100513) in 549 ms on lvs02 (84/291)

15/03/18 15:16:36 INFO TaskSetManager: Starting task 91.0 in stage 392.0 (TID 
100525, lvs02, PROCESS_LOCAL, 1122 bytes)

15/03/18 15:16:36 INFO TaskSetManager: Finished task 84.0 in stage 392.0 (TID 
100518) in 327 ms on lvs02 (85/291)

15/03/18 15:16:36 INFO TaskSetManager: Starting task 92.0 in stage 392.0 (TID 
100526, lvs02, PROCESS_LOCAL, 1122 bytes)

15/03/18 15:16:36 INFO TaskSetManager: Finished task 86.0 in stage 392.0 (TID 
100520) in 293 ms on lvs02 (86/291)

15/03/18 15:16:36 INFO TaskSetManager: Starting task 93.0 in stage 392.0 (TID 
100527, lvs02, PROCESS_LOCAL, 1122 bytes)

15/03/18 15:16:36 INFO TaskSetManager: Finished task 87.0 in stage 392.0 (TID 
100521) in 257 ms on lvs02 (87/291)

15/03/18 15:16:36 INFO TaskSetManager

RE: [spark-streaming] can shuffle write to disk be disabled?

2015-03-18 Thread Shao, Saisai
Please see the inline comments.

Thanks
Jerry

From: Darren Hoo [mailto:darren@gmail.com]
Sent: Wednesday, March 18, 2015 9:30 PM
To: Shao, Saisai
Cc: user@spark.apache.org; Akhil Das
Subject: Re: [spark-streaming] can shuffle write to disk be disabled?



On Wed, Mar 18, 2015 at 8:31 PM, Shao, Saisai 
saisai.s...@intel.commailto:saisai.s...@intel.com wrote:

From the log you pasted I think this (-rw-r--r--  1 root root  80K Mar 18 
16:54 shuffle_47_519_0.data) is not shuffle spilled data, but the final 
shuffle result.

why the shuffle result  is written to disk?

This is the internal mechanism for Spark.



As I said, did you think shuffle is the bottleneck which makes your job running 
slowly?

I am quite new to spark, So I am just doing wild guesses. which information 
should I provide further that
can help to find the real bottleneck?

You can monitor the system metrics, as well as JVM, also information from web 
UI is very useful.



Maybe you should identify the cause at first. Besides from the log it looks 
your memory is not enough the cache the data, maybe you should increase the 
memory size of the executor.



 running two executors, the memory ussage is quite low:

executor 0  8.6 MB / 4.1 GB
executor 1  23.9 MB / 4.1 GB
driver 0.0B / 529.9 MB


submitted with args : --executor-memory 8G  --num-executors 2 --driver-memory 1G




RE: [spark-streaming] can shuffle write to disk be disabled?

2015-03-18 Thread Shao, Saisai
Would you please check your driver log or streaming web UI to see each job's 
latency, including processing latency and total latency.

Seems from your code, sliding window is just 3 seconds, so you will process 
each 60 second's data in 3 seconds, if processing latency is larger than the 
sliding window, so maybe you computation power cannot reach to the qps you 
wanted.

I think you need to identify the bottleneck at first, and then trying to tune 
your code, balance the data, add more computation resources.

Thanks
Jerry

From: Darren Hoo [mailto:darren@gmail.com]
Sent: Wednesday, March 18, 2015 1:39 PM
To: user@spark.apache.org
Subject: [spark-streaming] can shuffle write to disk be disabled?

I use spark-streaming reading  messages from a Kafka,  the producer creates 
messages about 1500 per second


 def hash(x: String): Int = {

MurmurHash3.stringHash(x)

 }



 val stream = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap, 
StorageLevel.MEMORY_ONLY_SER).map(_._2)



 val clickstream = stream.map(log = {

   //parse log

   ...

  (hash(log.url), HashSet(hash(log.userid)))

}).window(Seconds(60), Seconds(3))



val upv = clickstream.transform( rdd = rdd.reduceByKey(_ ++ _ ).map{ 
case(url, visits) = {

 val uv = visits.size

 (uv, url)

}})



upv.foreach(rdd = println(new Date() + 
\n---\n + rdd.top(20).mkString(\n) + 
\n))



it is quite quick upon startup, but after running for a few minutes, it goes 
slower and slower and the latency can be minutes.



I found a lot of shuffle writes at /tmp/spark- in several gigabytes.



with 1500 qps of message and window size of 60 seconds, I think it should be 
done within memory without writing to disk at all



I've set executor-memory to 8G, So there is plenty of memory.



$SPARK_HOME/bin/spark-submit \

  --class SimpleApp \

  --master spark://localhost:7077 \

  --driver-memory 16G  \

  --executor-memory 8G  \

  target/scala-2.10/simple-assembly-1.0.jar



I also tries these settings, but it still spill to disk.



spark.master spark://localhost:7077

#spark.driver.memory  4g

#spark.shuffle.file.buffer.kb 4096

#spark.shuffle.memoryFraction 0.8

#spark.storage.unrollFraction 0.8

#spark.storage.unrollMemoryThreshold 1073741824

spark.io.compression.codec   lz4

spark.shuffle.spill  false

spark.serializer org.apache.spark.serializer.KryoSerializer



where am I wrong?


RE: MappedStream vs Transform API

2015-03-16 Thread Shao, Saisai
I think these two ways are both OK for you to write streaming job, `transform` 
is a more general way for you to transform from one DStream to another if 
there’s no related DStream API (but have related RDD API). But using map maybe 
more straightforward and easy to understand.

Thanks
Jerry

From: madhu phatak [mailto:phatak@gmail.com]
Sent: Monday, March 16, 2015 4:32 PM
To: user@spark.apache.org
Subject: MappedStream vs Transform API

Hi,
  Current implementation of map function in spark streaming looks as below.

  def map[U: ClassTag](mapFunc: T = U): DStream[U] = {

  new MappedDStream(this, context.sparkContext.clean(mapFunc))
}
It creates an instance of MappedDStream which is a subclass of DStream.

The same function can be also implemented using transform API


def map[U: ClassTag](mapFunc: T = U): DStream[U] =

this.transform(rdd = {

  rdd.map(mapFunc)
})

Both implementation looks same. If they are same, is there any advantage having 
a subclass of DStream?. Why can't we just use transform API?


Regards,
Madhukara Phatak
http://datamantra.io/


RE: Building spark over specified tachyon

2015-03-15 Thread Shao, Saisai
I think you could change the pom file under Spark project to update the Tachyon 
related dependency version and rebuild it again (in case API is compatible, and 
behavior is the same).

I'm not sure is there any command you can use to compile against Tachyon 
version.

Thanks
Jerry

From: fightf...@163.com [mailto:fightf...@163.com]
Sent: Monday, March 16, 2015 11:01 AM
To: user
Subject: Building spark over specified tachyon

Hi, all
Noting that the current spark releases are built-in with tachyon 0.5.0 ,
if we want to recompile spark with maven and targeting on specific tachyon 
version (let's say the most recent 0.6.0 release),
how should that be done? What maven compile command should be like ?

Thanks,
Sun.


fightf...@163.commailto:fightf...@163.com


RE: Spark Streaming input data source list

2015-03-09 Thread Shao, Saisai
Hi Lin,

AFAIK, currently there's no built-in receiver API for RDBMs, but you can 
customize your own receiver to get data from RDBMs, for the details you can 
refer to the docs.

Thanks
Jerry

From: Cui Lin [mailto:cui@hds.com]
Sent: Tuesday, March 10, 2015 8:36 AM
To: Tathagata Das
Cc: user@spark.apache.org
Subject: Re: Spark Streaming input data source list

Tathagata,

Thanks for your quick response. The link is helpful to me.
Do you know any API for streaming data from RMDB ?


Best regards,

Cui Lin

From: Tathagata Das t...@databricks.commailto:t...@databricks.com
Date: Monday, March 9, 2015 at 11:28 AM
To: Cui Lin cui@hds.commailto:cui@hds.com
Cc: user@spark.apache.orgmailto:user@spark.apache.org 
user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: Spark Streaming input data source list

Spark Streaming has StreamingContext.socketStream()
http://spark.apache.org/docs/1.2.1/api/java/org/apache/spark/streaming/StreamingContext.html#socketStream(java.lang.String,
 int, scala.Function1, org.apache.spark.storage.StorageLevel, 
scala.reflect.ClassTag)

TD

On Mon, Mar 9, 2015 at 11:37 AM, Cui Lin 
cui@hds.commailto:cui@hds.com wrote:
Dear all,

Could you send me a list for input data source that spark streaming could 
support?
My list is HDFS, Kafka, textfile?...

I am wondering if spark streaming could directly read data from certain port 
(443 e.g.) that my devices directly send to?



Best regards,

Cui Lin



RE: distribution of receivers in spark streaming

2015-03-04 Thread Shao, Saisai
Hi Du,

You could try to sleep for several seconds after creating streaming context to 
let all the executors registered, then all the receivers can distribute to the 
nodes more evenly. Also setting locality is another way as you mentioned.

Thanks
Jerry


From: Du Li [mailto:l...@yahoo-inc.com.INVALID]
Sent: Thursday, March 5, 2015 1:50 PM
To: User
Subject: Re: distribution of receivers in spark streaming

Figured it out: I need to override method preferredLocation() in MyReceiver 
class.

On Wednesday, March 4, 2015 3:35 PM, Du Li 
l...@yahoo-inc.com.INVALIDmailto:l...@yahoo-inc.com.INVALID wrote:

Hi,

I have a set of machines (say 5) and want to evenly launch a number (say 8) of 
kafka receivers on those machines. In my code I did something like the 
following, as suggested in the spark docs:
val streams = (1 to numReceivers).map(_ = ssc.receiverStream(new 
MyKafkaReceiver()))
ssc.union(streams)

However, from the spark UI, I saw that some machines are not running any 
instance of the receiver while some get three. The mapping changed every time 
the system was restarted. This impacts the receiving and also the processing 
speeds.

I wonder if it's possible to control/suggest the distribution so that it would 
be more even. How is the decision made in spark?

Thanks,
Du





RE: distribution of receivers in spark streaming

2015-03-04 Thread Shao, Saisai
Yes, hostname is enough.

I think currently it is hard for user code to get the worker list from 
standalone master. If you can get the Master object, you could get the worker 
list, but AFAIK may be it is difficult to get this object. All you could do is 
to manually get the worker list and assigned its hostname to each receiver.

Thanks
Jerry

From: Du Li [mailto:l...@yahoo-inc.com]
Sent: Thursday, March 5, 2015 2:29 PM
To: Shao, Saisai; User
Subject: Re: distribution of receivers in spark streaming

Hi Jerry,

Thanks for your response.

Is there a way to get the list of currently registered/live workers? Even in 
order to provide preferredLocation, it would be safer to know which workers are 
active. Guess I only need to provide the hostname, right?

Thanks,
Du

On Wednesday, March 4, 2015 10:08 PM, Shao, Saisai 
saisai.s...@intel.commailto:saisai.s...@intel.com wrote:

Hi Du,

You could try to sleep for several seconds after creating streaming context to 
let all the executors registered, then all the receivers can distribute to the 
nodes more evenly. Also setting locality is another way as you mentioned.

Thanks
Jerry


From: Du Li [mailto:l...@yahoo-inc.com.INVALID]
Sent: Thursday, March 5, 2015 1:50 PM
To: User
Subject: Re: distribution of receivers in spark streaming

Figured it out: I need to override method preferredLocation() in MyReceiver 
class.

On Wednesday, March 4, 2015 3:35 PM, Du Li 
l...@yahoo-inc.com.INVALIDmailto:l...@yahoo-inc.com.INVALID wrote:

Hi,

I have a set of machines (say 5) and want to evenly launch a number (say 8) of 
kafka receivers on those machines. In my code I did something like the 
following, as suggested in the spark docs:
val streams = (1 to numReceivers).map(_ = ssc.receiverStream(new 
MyKafkaReceiver()))
ssc.union(streams)

However, from the spark UI, I saw that some machines are not running any 
instance of the receiver while some get three. The mapping changed every time 
the system was restarted. This impacts the receiving and also the processing 
speeds.

I wonder if it's possible to control/suggest the distribution so that it would 
be more even. How is the decision made in spark?

Thanks,
Du






RE: Having lots of FetchFailedException in join

2015-03-04 Thread Shao, Saisai
Yes, if one key has too many values, there still has a chance to meet the OOM.

Thanks
Jerry

From: Jianshi Huang [mailto:jianshi.hu...@gmail.com]
Sent: Thursday, March 5, 2015 3:49 PM
To: Shao, Saisai
Cc: Cheng, Hao; user
Subject: Re: Having lots of FetchFailedException in join

I see. I'm using core's join. The data might have some skewness (checking).

I understand shuffle can spill data to disk but when consuming it, say in 
cogroup or groupByKey, it still needs to read the whole group elements, right? 
I guess OOM happened there when reading very large groups.

Jianshi

On Thu, Mar 5, 2015 at 3:38 PM, Shao, Saisai 
saisai.s...@intel.commailto:saisai.s...@intel.com wrote:
I think what you could do is to monitor through web UI to see if there’s any 
skew or other symptoms in shuffle write and read. For GC you could use the 
below configuration as you mentioned.

From Spark core side, all the shuffle related operations can spill the data 
into disk and no need to read the whole partition into memory. But if you uses 
SparkSQL, it depends on how SparkSQL uses this operators.

CC @hao if he has some thoughts on it.

Thanks
Jerry

From: Jianshi Huang 
[mailto:jianshi.hu...@gmail.commailto:jianshi.hu...@gmail.com]
Sent: Thursday, March 5, 2015 3:28 PM
To: Shao, Saisai

Cc: user
Subject: Re: Having lots of FetchFailedException in join

Hi Saisai,

What's your suggested settings on monitoring shuffle? I've enabled 
-XX:+PrintGCDetails -XX:+PrintGCTimeStamps for GC logging.

I found SPARK-3461 (Support external groupByKey using 
repartitionAndSortWithinPartitions) want to make groupByKey using external 
storage. It's still open status. Does that mean now 
groupByKey/cogroup/join(implemented as cogroup + flatmap) will still read the 
group as a whole during consuming?

How can I deal with the key skewness in joins? Is there a skew-join 
implementation?


Jianshi



On Thu, Mar 5, 2015 at 2:44 PM, Shao, Saisai 
saisai.s...@intel.commailto:saisai.s...@intel.com wrote:
Hi Jianshi,

From my understanding, it may not be the problem of NIO or Netty, looking at 
your stack trace, the OOM is occurred in EAOM(ExternalAppendOnlyMap), 
theoretically EAOM can spill the data into disk if memory is not enough, but 
there might some issues when join key is skewed or key number is smaller, so 
you will meet OOM.

Maybe you could monitor each stage or task’s shuffle and GC status also system 
status to identify the problem.

Thanks
Jerry

From: Jianshi Huang 
[mailto:jianshi.hu...@gmail.commailto:jianshi.hu...@gmail.com]
Sent: Thursday, March 5, 2015 2:32 PM
To: Aaron Davidson
Cc: user
Subject: Re: Having lots of FetchFailedException in join

One really interesting is that when I'm using the netty-based 
spark.shuffle.blockTransferService, there's no OOM error messages 
(java.lang.OutOfMemoryError: Java heap space).

Any idea why it's not here?

I'm using Spark 1.2.1.

Jianshi

On Thu, Mar 5, 2015 at 1:56 PM, Jianshi Huang 
jianshi.hu...@gmail.commailto:jianshi.hu...@gmail.com wrote:
I changed spark.shuffle.blockTransferService to nio and now I'm getting OOM 
errors, I'm doing a big join operation.


15/03/04 19:04:07 ERROR Executor: Exception in task 107.0 in stage 2.0 (TID 
6207)
java.lang.OutOfMemoryError: Java heap space
at 
org.apache.spark.util.collection.CompactBuffer.growToSize(CompactBuffer.scala:142)
at 
org.apache.spark.util.collection.CompactBuffer.$plus$eq(CompactBuffer.scala:74)
at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:179)
at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:178)
at 
org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:122)
at 
org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:121)
at 
org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:138)
at 
org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)
at 
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130)
at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160)
at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)
at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247

RE: Having lots of FetchFailedException in join

2015-03-04 Thread Shao, Saisai
Hi Jianshi,

From my understanding, it may not be the problem of NIO or Netty, looking at 
your stack trace, the OOM is occurred in EAOM(ExternalAppendOnlyMap), 
theoretically EAOM can spill the data into disk if memory is not enough, but 
there might some issues when join key is skewed or key number is smaller, so 
you will meet OOM.

Maybe you could monitor each stage or task’s shuffle and GC status also system 
status to identify the problem.

Thanks
Jerry

From: Jianshi Huang [mailto:jianshi.hu...@gmail.com]
Sent: Thursday, March 5, 2015 2:32 PM
To: Aaron Davidson
Cc: user
Subject: Re: Having lots of FetchFailedException in join

One really interesting is that when I'm using the netty-based 
spark.shuffle.blockTransferService, there's no OOM error messages 
(java.lang.OutOfMemoryError: Java heap space).

Any idea why it's not here?

I'm using Spark 1.2.1.

Jianshi

On Thu, Mar 5, 2015 at 1:56 PM, Jianshi Huang 
jianshi.hu...@gmail.commailto:jianshi.hu...@gmail.com wrote:
I changed spark.shuffle.blockTransferService to nio and now I'm getting OOM 
errors, I'm doing a big join operation.


15/03/04 19:04:07 ERROR Executor: Exception in task 107.0 in stage 2.0 (TID 
6207)
java.lang.OutOfMemoryError: Java heap space
at 
org.apache.spark.util.collection.CompactBuffer.growToSize(CompactBuffer.scala:142)
at 
org.apache.spark.util.collection.CompactBuffer.$plus$eq(CompactBuffer.scala:74)
at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:179)
at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:178)
at 
org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:122)
at 
org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:121)
at 
org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:138)
at 
org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)
at 
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130)
at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160)
at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)
at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at 
org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at 
org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:56)

Is join/cogroup still memory bound?


Jianshi



On Wed, Mar 4, 2015 at 2:11 PM, Jianshi Huang 
jianshi.hu...@gmail.commailto:jianshi.hu...@gmail.com wrote:
Hmm... ok, previous errors are still block fetch errors.

15/03/03 10:22:40 ERROR RetryingBlockFetcher: Exception while beginning fetch 
of 11 outstanding blocks
java.io.IOException: Failed to connect to host-/:55597
at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191)
at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
at 
org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78)
at 
org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
at 
org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
at 
org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:87)
 

RE: Having lots of FetchFailedException in join

2015-03-04 Thread Shao, Saisai
I think what you could do is to monitor through web UI to see if there’s any 
skew or other symptoms in shuffle write and read. For GC you could use the 
below configuration as you mentioned.

From Spark core side, all the shuffle related operations can spill the data 
into disk and no need to read the whole partition into memory. But if you uses 
SparkSQL, it depends on how SparkSQL uses this operators.

CC @hao if he has some thoughts on it.

Thanks
Jerry

From: Jianshi Huang [mailto:jianshi.hu...@gmail.com]
Sent: Thursday, March 5, 2015 3:28 PM
To: Shao, Saisai
Cc: user
Subject: Re: Having lots of FetchFailedException in join

Hi Saisai,

What's your suggested settings on monitoring shuffle? I've enabled 
-XX:+PrintGCDetails -XX:+PrintGCTimeStamps for GC logging.

I found SPARK-3461 (Support external groupByKey using 
repartitionAndSortWithinPartitions) want to make groupByKey using external 
storage. It's still open status. Does that mean now 
groupByKey/cogroup/join(implemented as cogroup + flatmap) will still read the 
group as a whole during consuming?

How can I deal with the key skewness in joins? Is there a skew-join 
implementation?


Jianshi



On Thu, Mar 5, 2015 at 2:44 PM, Shao, Saisai 
saisai.s...@intel.commailto:saisai.s...@intel.com wrote:
Hi Jianshi,

From my understanding, it may not be the problem of NIO or Netty, looking at 
your stack trace, the OOM is occurred in EAOM(ExternalAppendOnlyMap), 
theoretically EAOM can spill the data into disk if memory is not enough, but 
there might some issues when join key is skewed or key number is smaller, so 
you will meet OOM.

Maybe you could monitor each stage or task’s shuffle and GC status also system 
status to identify the problem.

Thanks
Jerry

From: Jianshi Huang 
[mailto:jianshi.hu...@gmail.commailto:jianshi.hu...@gmail.com]
Sent: Thursday, March 5, 2015 2:32 PM
To: Aaron Davidson
Cc: user
Subject: Re: Having lots of FetchFailedException in join

One really interesting is that when I'm using the netty-based 
spark.shuffle.blockTransferService, there's no OOM error messages 
(java.lang.OutOfMemoryError: Java heap space).

Any idea why it's not here?

I'm using Spark 1.2.1.

Jianshi

On Thu, Mar 5, 2015 at 1:56 PM, Jianshi Huang 
jianshi.hu...@gmail.commailto:jianshi.hu...@gmail.com wrote:
I changed spark.shuffle.blockTransferService to nio and now I'm getting OOM 
errors, I'm doing a big join operation.


15/03/04 19:04:07 ERROR Executor: Exception in task 107.0 in stage 2.0 (TID 
6207)
java.lang.OutOfMemoryError: Java heap space
at 
org.apache.spark.util.collection.CompactBuffer.growToSize(CompactBuffer.scala:142)
at 
org.apache.spark.util.collection.CompactBuffer.$plus$eq(CompactBuffer.scala:74)
at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:179)
at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:178)
at 
org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:122)
at 
org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:121)
at 
org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:138)
at 
org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)
at 
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130)
at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160)
at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)
at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at 
org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at 
org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31

RE: Monitoring Spark with Graphite and Grafana

2015-02-26 Thread Shao, Saisai
Cool, great job☺.

Thanks
Jerry

From: Ryan Williams [mailto:ryan.blake.willi...@gmail.com]
Sent: Thursday, February 26, 2015 6:11 PM
To: user; d...@spark.apache.org
Subject: Monitoring Spark with Graphite and Grafana

If anyone is curious to try exporting Spark metrics to Graphite, I just 
published a post about my experience doing that, building dashboards in 
Grafanahttp://grafana.org/, and using them to monitor Spark jobs: 
http://www.hammerlab.org/2015/02/27/monitoring-spark-with-graphite-and-grafana/

Code for generating Grafana dashboards tailored to the metrics emitted by Spark 
is here: https://github.com/hammerlab/grafana-spark-dashboards.

If anyone else is interested in working on expanding MetricsSystem to make this 
sort of thing more useful, let me know, I've been working on it a fair amount 
and have a bunch of ideas about where it should go.

Thanks,

-Ryan




RE: spark streaming window operations on a large window size

2015-02-23 Thread Shao, Saisai
I don't think current Spark Streaming supports window operations which beyond 
its available memory, internally Spark Streaming puts all the data in the 
memory belongs to the effective window, if the memory is not enough, 
BlockManager will discard the blocks at LRU policy, so something unexpected 
will be occurred.

Thanks
Jerry

-Original Message-
From: avilevi3 [mailto:avile...@gmail.com] 
Sent: Monday, February 23, 2015 12:57 AM
To: user@spark.apache.org
Subject: spark streaming window operations on a large window size

Hi guys, 

does spark streaming supports window operations on a sliding window that is 
data is larger than the available memory?
we would like to
currently we are using kafka as input, but we could change that if needed.

thanks
Avi



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-window-operations-on-a-large-window-size-tp21764.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Union and reduceByKey will trigger shuffle even same partition?

2015-02-23 Thread Shao, Saisai
If you call reduceByKey(), internally Spark will introduce a shuffle 
operations, not matter the data is already partitioned locally, Spark itself do 
not know the data is already well partitioned.

So if you want to avoid Shuffle, you have  to write the code explicitly to 
avoid this, from my understanding. You can call mapParitition to get a 
partition of data and reduce by key locally by your logic.

Thanks
Saisai

From: Shuai Zheng [mailto:szheng.c...@gmail.com]
Sent: Monday, February 23, 2015 12:00 PM
To: user@spark.apache.org
Subject: Union and reduceByKey will trigger shuffle even same partition?

Hi All,

I am running a simple page rank program, but it is slow. And I dig out part of 
reason is there is shuffle happen when I call an union action even both RDD 
share the same partition:

Below is my test code in spark shell:

import org.apache.spark.HashPartitioner

sc.getConf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer)
val beta = 0.8
val numOfPartition = 6
  val links = 
sc.textFile(c:/Download/web-Google.txt).filter(!_.contains(#)).map(line={val
 part=line.split(\t); 
(part(0).toInt,part(1).toInt)}).groupByKey.partitionBy(new 
HashPartitioner(numOfPartition)).persist
  var ranks = links.mapValues(_ = 1.0)
  var leakedMatrix = links.mapValues(_ = (1.0-beta)).persist

  for (i - 1 until 2) {
val contributions = links.join(ranks).flatMap {
  case (pageId, (links, rank)) =
links.map(dest = (dest, rank / links.size * beta))
}
ranks = contributions.union(leakedMatrix).reduceByKey(_ + _)
  }
  ranks.lookup(1)

In above code, links will join ranks and should preserve the partition, and 
leakedMatrix also share the same partition, so I expect there is no shuffle 
happen on the contributions.union(leakedMatrix), also on the coming reduceByKey 
after that. But finally there is shuffle write for all steps, map, groupByKey, 
Union, partitionBy, etc.

I expect there should only happen once on the shuffle then all should local 
operation, but the screen shows not, do I have any misunderstanding here?

[cid:image001.png@01D04F62.180BCC00]


RE: Union and reduceByKey will trigger shuffle even same partition?

2015-02-23 Thread Shao, Saisai
I think some RDD APIs like zipPartitions or others can do this as you wanted. I 
might check the docs.

Thanks
Jerry

From: Shuai Zheng [mailto:szheng.c...@gmail.com]
Sent: Monday, February 23, 2015 1:35 PM
To: Shao, Saisai
Cc: user@spark.apache.org
Subject: RE: Union and reduceByKey will trigger shuffle even same partition?

This also trigger an interesting question:  how can I do this locally by code 
if I want. For example: I have RDD A and B, which has some partition, then if I 
want to join A to B, I might just want to do a mapper side join (although B 
itself might be big, but B's local partition is known small enough put in 
memory), how can I access other RDD's local partition in the mapParitition 
method? Is it anyway to do this in Spark?

From: Shao, Saisai [mailto:saisai.s...@intel.com]
Sent: Monday, February 23, 2015 3:13 PM
To: Shuai Zheng
Cc: user@spark.apache.orgmailto:user@spark.apache.org
Subject: RE: Union and reduceByKey will trigger shuffle even same partition?

If you call reduceByKey(), internally Spark will introduce a shuffle 
operations, not matter the data is already partitioned locally, Spark itself do 
not know the data is already well partitioned.

So if you want to avoid Shuffle, you have  to write the code explicitly to 
avoid this, from my understanding. You can call mapParitition to get a 
partition of data and reduce by key locally by your logic.

Thanks
Saisai

From: Shuai Zheng [mailto:szheng.c...@gmail.com]
Sent: Monday, February 23, 2015 12:00 PM
To: user@spark.apache.orgmailto:user@spark.apache.org
Subject: Union and reduceByKey will trigger shuffle even same partition?

Hi All,

I am running a simple page rank program, but it is slow. And I dig out part of 
reason is there is shuffle happen when I call an union action even both RDD 
share the same partition:

Below is my test code in spark shell:

import org.apache.spark.HashPartitioner

sc.getConf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer)
val beta = 0.8
val numOfPartition = 6
  val links = 
sc.textFile(c:/Download/web-Google.txt).filter(!_.contains(#)).map(line={val
 part=line.split(\t); 
(part(0).toInt,part(1).toInt)}).groupByKey.partitionBy(new 
HashPartitioner(numOfPartition)).persist
  var ranks = links.mapValues(_ = 1.0)
  var leakedMatrix = links.mapValues(_ = (1.0-beta)).persist

  for (i - 1 until 2) {
val contributions = links.join(ranks).flatMap {
  case (pageId, (links, rank)) =
links.map(dest = (dest, rank / links.size * beta))
}
ranks = contributions.union(leakedMatrix).reduceByKey(_ + _)
  }
  ranks.lookup(1)

In above code, links will join ranks and should preserve the partition, and 
leakedMatrix also share the same partition, so I expect there is no shuffle 
happen on the contributions.union(leakedMatrix), also on the coming reduceByKey 
after that. But finally there is shuffle write for all steps, map, groupByKey, 
Union, partitionBy, etc.

I expect there should only happen once on the shuffle then all should local 
operation, but the screen shows not, do I have any misunderstanding here?

[cid:image001.png@01D04F73.AFB2D330]


RE: Union and reduceByKey will trigger shuffle even same partition?

2015-02-23 Thread Shao, Saisai
I've no context of this book, AFAIK union will not trigger shuffle, as they 
just put the partitions together, the operator reduceByKey() will actually 
trigger shuffle.

Thanks
Jerry

From: Shuai Zheng [mailto:szheng.c...@gmail.com]
Sent: Monday, February 23, 2015 12:26 PM
To: Shao, Saisai
Cc: user@spark.apache.org
Subject: RE: Union and reduceByKey will trigger shuffle even same partition?

In the book of learning spark:

[cid:image002.jpg@01D04F74.28C9F870]

So here it means only no shuffle happen crossing network but still will do 
shuffle locally? Even it is the case, why union will trigger shuffle? I think 
union will only just append the RDD together.

From: Shao, Saisai [mailto:saisai.s...@intel.com]
Sent: Monday, February 23, 2015 3:13 PM
To: Shuai Zheng
Cc: user@spark.apache.orgmailto:user@spark.apache.org
Subject: RE: Union and reduceByKey will trigger shuffle even same partition?

If you call reduceByKey(), internally Spark will introduce a shuffle 
operations, not matter the data is already partitioned locally, Spark itself do 
not know the data is already well partitioned.

So if you want to avoid Shuffle, you have  to write the code explicitly to 
avoid this, from my understanding. You can call mapParitition to get a 
partition of data and reduce by key locally by your logic.

Thanks
Saisai

From: Shuai Zheng [mailto:szheng.c...@gmail.com]
Sent: Monday, February 23, 2015 12:00 PM
To: user@spark.apache.orgmailto:user@spark.apache.org
Subject: Union and reduceByKey will trigger shuffle even same partition?

Hi All,

I am running a simple page rank program, but it is slow. And I dig out part of 
reason is there is shuffle happen when I call an union action even both RDD 
share the same partition:

Below is my test code in spark shell:

import org.apache.spark.HashPartitioner

sc.getConf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer)
val beta = 0.8
val numOfPartition = 6
  val links = 
sc.textFile(c:/Download/web-Google.txt).filter(!_.contains(#)).map(line={val
 part=line.split(\t); 
(part(0).toInt,part(1).toInt)}).groupByKey.partitionBy(new 
HashPartitioner(numOfPartition)).persist
  var ranks = links.mapValues(_ = 1.0)
  var leakedMatrix = links.mapValues(_ = (1.0-beta)).persist

  for (i - 1 until 2) {
val contributions = links.join(ranks).flatMap {
  case (pageId, (links, rank)) =
links.map(dest = (dest, rank / links.size * beta))
}
ranks = contributions.union(leakedMatrix).reduceByKey(_ + _)
  }
  ranks.lookup(1)

In above code, links will join ranks and should preserve the partition, and 
leakedMatrix also share the same partition, so I expect there is no shuffle 
happen on the contributions.union(leakedMatrix), also on the coming reduceByKey 
after that. But finally there is shuffle write for all steps, map, groupByKey, 
Union, partitionBy, etc.

I expect there should only happen once on the shuffle then all should local 
operation, but the screen shows not, do I have any misunderstanding here?

[cid:image003.png@01D04F74.28C9F870]


RE: Spark Metrics Servlet for driver and executor

2015-02-06 Thread Shao, Saisai
Hi Judy,

For driver, it is /metrics/json, there's no metricsServlet for executor.

Thanks
Jerry

From: Judy Nash [mailto:judyn...@exchange.microsoft.com]
Sent: Friday, February 6, 2015 3:47 PM
To: user@spark.apache.org
Subject: Spark Metrics Servlet for driver and executor

Hi all,

Looking at spark metricsServlet.

What is the url exposing driver  executor json response?

Found master and worker successfully, but can't find url that return json for 
the other 2 sources.


Thanks!
Judy


RE: Error KafkaStream

2015-02-05 Thread Shao, Saisai
Did you include Kafka jars? This StringDecoder is under kafka/serializer, You 
can refer to the unit test KafkaStreamSuite in Spark to see how to use this API.

Thanks
Jerry

From: Eduardo Costa Alfaia [mailto:e.costaalf...@unibs.it]
Sent: Friday, February 6, 2015 9:44 AM
To: Shao, Saisai
Cc: Sean Owen; user@spark.apache.org
Subject: Re: Error KafkaStream

Hi Shao,
When I changed to StringDecoder I’ve get this compiling error:

[error] 
/sata_disk/workspace/spark-1.2.0/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaW
ordCount.scala:78: not found: type StringDecoder
[error] KafkaUtils.createStream[String, String, StringDecoder, 
StringDecoder](ssc, kafkaParams, topicMap,stora
geLevel = StorageLevel.MEMORY_ONLY_SER).map(_._2)
[error] ^
[error] 
/sata_disk/workspace/spark-1.2.0/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaW
ordCount.scala:85: value split is not a member of Nothing
[error] val words = unifiedStream.flatMap(_.split( ))
[error] ^
[error] 
/sata_disk/workspace/spark-1.2.0/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaW
ordCount.scala:86: value reduceByKeyAndWindow is not a member of 
org.apache.spark.streaming.dstream.DStream[(Nothing,
Long)]
[error] val wordCounts = words.map(x = (x, 1L)).reduceByKeyAndWindow(_ + 
_, _ - _, Seconds(20), Seconds(10), 2)
[error]  ^
[error] three errors found
[error] (examples/compile:compile) Compilation failed


On Feb 6, 2015, at 02:11, Shao, Saisai 
saisai.s...@intel.commailto:saisai.s...@intel.com wrote:

Hi,

I think you should change the `DefaultDecoder` of your type parameter into 
`StringDecoder`, seems you want to decode the message into String. 
`DefaultDecoder` is to return Array[Byte], not String, so here class casting 
will meet error.

Thanks
Jerry

-Original Message-
From: Eduardo Costa Alfaia [mailto:e.costaalf...@unibs.it]
Sent: Friday, February 6, 2015 12:04 AM
To: Sean Owen
Cc: user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: Error KafkaStream

I don’t think so Sean.


On Feb 5, 2015, at 16:57, Sean Owen 
so...@cloudera.commailto:so...@cloudera.com wrote:

Is SPARK-4905 / https://github.com/apache/spark/pull/4371/files the same issue?

On Thu, Feb 5, 2015 at 7:03 AM, Eduardo Costa Alfaia
e.costaalf...@unibs.itmailto:e.costaalf...@unibs.it wrote:

Hi Guys,
I’m getting this error in KafkaWordCount;

TaskSetManager: Lost task 0.0 in stage 4095.0 (TID 1281, 10.20.10.234):
java.lang.ClassCastException: [B cannot be cast to java.lang.String
  at
org.apache.spark.examples.streaming.KafkaWordCount$$anonfun$4$$anonfu
n$apply$1.apply(KafkaWordCount.scala:7


Some idea that could be?


Bellow the piece of code



val kafkaStream = {
  val kafkaParams = Map[String, String](
  zookeeper.connect - achab3:2181,
  group.id - mygroup,
  zookeeper.connect.timeout.ms - 1,
  kafka.fetch.message.max.bytes - 400,
  auto.offset.reset - largest)

  val topicpMap = topics.split(,).map((_,numThreads.toInt)).toMap
//val lines = KafkaUtils.createStream[String, String, DefaultDecoder,
DefaultDecoder](ssc, kafkaParams, topicpMa p, storageLevel =
StorageLevel.MEMORY_ONLY_SER).map(_._2)
  val KafkaDStreams = (1 to numStreams).map {_ =
  KafkaUtils.createStream[String, String, DefaultDecoder,
DefaultDecoder](ssc, kafkaParams, topicpMap, storageLe vel =
StorageLevel.MEMORY_ONLY_SER).map(_._2)
  }
  val unifiedStream = ssc.union(KafkaDStreams)
  unifiedStream.repartition(sparkProcessingParallelism)
}

Thanks Guys

Informativa sulla Privacy: http://www.unibs.it/node/8155


--
Informativa sulla Privacy: http://www.unibs.it/node/8155

-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org For 
additional commands, e-mail: 
user-h...@spark.apache.orgmailto:user-h...@spark.apache.org


Informativa sulla Privacy: http://www.unibs.it/node/8155


RE: Error KafkaStream

2015-02-05 Thread Shao, Saisai
Hi,

I think you should change the `DefaultDecoder` of your type parameter into 
`StringDecoder`, seems you want to decode the message into String. 
`DefaultDecoder` is to return Array[Byte], not String, so here class casting 
will meet error.

Thanks
Jerry

-Original Message-
From: Eduardo Costa Alfaia [mailto:e.costaalf...@unibs.it] 
Sent: Friday, February 6, 2015 12:04 AM
To: Sean Owen
Cc: user@spark.apache.org
Subject: Re: Error KafkaStream

I don’t think so Sean.

 On Feb 5, 2015, at 16:57, Sean Owen so...@cloudera.com wrote:
 
 Is SPARK-4905 / https://github.com/apache/spark/pull/4371/files the same 
 issue?
 
 On Thu, Feb 5, 2015 at 7:03 AM, Eduardo Costa Alfaia 
 e.costaalf...@unibs.it wrote:
 Hi Guys,
 I’m getting this error in KafkaWordCount;
 
 TaskSetManager: Lost task 0.0 in stage 4095.0 (TID 1281, 10.20.10.234):
 java.lang.ClassCastException: [B cannot be cast to java.lang.String
at
 org.apache.spark.examples.streaming.KafkaWordCount$$anonfun$4$$anonfu
 n$apply$1.apply(KafkaWordCount.scala:7
 
 
 Some idea that could be?
 
 
 Bellow the piece of code
 
 
 
 val kafkaStream = {
val kafkaParams = Map[String, String](
zookeeper.connect - achab3:2181,
group.id - mygroup,
zookeeper.connect.timeout.ms - 1,
kafka.fetch.message.max.bytes - 400,
auto.offset.reset - largest)
 
val topicpMap = topics.split(,).map((_,numThreads.toInt)).toMap
  //val lines = KafkaUtils.createStream[String, String, DefaultDecoder,
 DefaultDecoder](ssc, kafkaParams, topicpMa p, storageLevel = 
 StorageLevel.MEMORY_ONLY_SER).map(_._2)
val KafkaDStreams = (1 to numStreams).map {_ =
KafkaUtils.createStream[String, String, DefaultDecoder, 
 DefaultDecoder](ssc, kafkaParams, topicpMap, storageLe vel = 
 StorageLevel.MEMORY_ONLY_SER).map(_._2)
}
val unifiedStream = ssc.union(KafkaDStreams)
unifiedStream.repartition(sparkProcessingParallelism)
 }
 
 Thanks Guys
 
 Informativa sulla Privacy: http://www.unibs.it/node/8155


--
Informativa sulla Privacy: http://www.unibs.it/node/8155

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org



Questions about Spark standalone resource scheduler

2015-02-02 Thread Shao, Saisai
Hi all,

I have some questions about the future development of Spark's standalone 
resource scheduler. We've heard some users have the requirements to have 
multi-tenant support in standalone mode, like multi-user management, resource 
management and isolation, whitelist of users. Seems current Spark standalone do 
not support such kind of functionalities, while resource schedulers like Yarn 
offers such kind of advanced managements, I'm not sure what's the future target 
of standalone resource scheduler, will it only target on simple implementation, 
and for advanced usage shift to YARN? Or will it plan to add some simple 
multi-tenant related functionalities?

Thanks a lot for your comments.

BR
Jerry


RE: Questions about Spark standalone resource scheduler

2015-02-02 Thread Shao, Saisai
Hi Patrick,

Thanks a lot for your detailed explanation. For now we have such requirements: 
whitelist the application submitter, user resources (CPU, MEMORY) quotas, 
resources allocations in Spark Standalone mode. These are quite specific 
requirements for production-use, generally these problem will become whether we 
need to offer a more advanced resource scheduler compared to current simple 
FIFO one. I think our aim is to not provide a general resource scheduler like 
Mesos/Yarn, we only support Spark, but we hope to add some Mesos/Yarn 
functionalities to better use of Spark standalone mode.

I admitted that resource scheduler may have some overlaps with cloud manager, 
whether to offer a powerful scheduler or use cloud manager is really a dilemma.

I think we can break down to some small features to improve the standalone 
mode. What's your opinion?

Thanks
Jerry

-Original Message-
From: Patrick Wendell [mailto:pwend...@gmail.com] 
Sent: Monday, February 2, 2015 4:49 PM
To: Shao, Saisai
Cc: d...@spark.apache.org; user@spark.apache.org
Subject: Re: Questions about Spark standalone resource scheduler

Hey Jerry,

I think standalone mode will still add more features over time, but the goal 
isn't really for it to become equivalent to what Mesos/YARN are today. Or at 
least, I doubt Spark Standalone will ever attempt to manage _other_ frameworks 
outside of Spark and become a general purpose resource manager.

In terms of having better support for multi tenancy, meaning multiple
*Spark* instances, this is something I think could be in scope in the future. 
For instance, we added H/A to the standalone scheduler a while back, because it 
let us support H/A streaming apps in a totally native way. It's a trade off of 
adding new features and keeping the scheduler very simple and easy to use. 
We've tended to bias towards simplicity as the main goal, since this is 
something we want to be really easy out of the box.

One thing to point out, a lot of people use the standalone mode with some 
coarser grained scheduler, such as running in a cloud service. In this case 
they really just want a simple inner cluster manager. This may even be the 
majority of all Spark installations. This is slightly different than Hadoop 
environments, where they might just want nice integration into the existing 
Hadoop stack via something like YARN.

- Patrick

On Mon, Feb 2, 2015 at 12:24 AM, Shao, Saisai saisai.s...@intel.com wrote:
 Hi all,



 I have some questions about the future development of Spark's 
 standalone resource scheduler. We've heard some users have the 
 requirements to have multi-tenant support in standalone mode, like 
 multi-user management, resource management and isolation, whitelist of 
 users. Seems current Spark standalone do not support such kind of 
 functionalities, while resource schedulers like Yarn offers such kind 
 of advanced managements, I'm not sure what's the future target of 
 standalone resource scheduler, will it only target on simple 
 implementation, and for advanced usage shift to YARN? Or will it plan to add 
 some simple multi-tenant related functionalities?



 Thanks a lot for your comments.



 BR

 Jerry

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: reduceByKeyAndWindow, but using log timestamps instead of clock seconds

2015-01-28 Thread Shao, Saisai
That's definitely a good supplement to the current Spark Streaming, I've heard 
many guys want to process the data using log time. Looking forward to the code.

Thanks
Jerry

-Original Message-
From: Tathagata Das [mailto:tathagata.das1...@gmail.com] 
Sent: Thursday, January 29, 2015 10:33 AM
To: Tobias Pfeiffer
Cc: YaoPau; user
Subject: Re: reduceByKeyAndWindow, but using log timestamps instead of clock 
seconds

Ohhh nice! Would be great if you can share us some code soon. It is indeed a 
very complicated problem and there is probably no single solution that fits all 
usecases. So having one way of doing things would be a great reference. Looking 
forward to that!

On Wed, Jan 28, 2015 at 4:52 PM, Tobias Pfeiffer t...@preferred.jp wrote:
 Hi,

 On Thu, Jan 29, 2015 at 1:54 AM, YaoPau jonrgr...@gmail.com wrote:

 My thinking is to maintain state in an RDD and update it an persist 
 it with each 2-second pass, but this also seems like it could get 
 messy.  Any thoughts or examples that might help me?


 I have just implemented some timestamp-based windowing on DStreams 
 (can't share the code now, but will be published a couple of months 
 ahead), although with the assumption that items are in correct order. 
 The main challenge (rather technical) was to keep proper state across 
 RDD boundaries and to tell the state you can mark this partial window 
 from the last interval as 'complete' now without shuffling too much 
 data around. For example, if there are some empty intervals, you don't 
 know when the next item to go into the partial window will arrive, or 
 if there will be one at all. I guess if you want to have out-of-order 
 tolerance, that will become even trickier, as you need to define and 
 think about some timeout for partial windows in your state...

 Tobias



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org



RE: Why must the dstream.foreachRDD(...) parameter be serializable?

2015-01-27 Thread Shao, Saisai
Aha, you’re right, I did a wrong comparison, the reason might be only for 
checkpointing  :).

Thanks
Jerry

From: Tobias Pfeiffer [mailto:t...@preferred.jp]
Sent: Wednesday, January 28, 2015 10:39 AM
To: Shao, Saisai
Cc: user
Subject: Re: Why must the dstream.foreachRDD(...) parameter be serializable?

Hi,

thanks for the answers!

On Wed, Jan 28, 2015 at 11:31 AM, Shao, Saisai 
saisai.s...@intel.commailto:saisai.s...@intel.com wrote:
Also this `foreachFunc` is more like an action function of RDD, thinking of 
rdd.foreach(func), in which `func` need to be serializable. So maybe I think 
your way of use it is not a normal way :).

Yeah I totally understand why func in rdd.foreach(func) must be serializable 
(because it's sent to the executors), but I didn't get why a function that's 
not shipped around must be serializable, too.

The explanations made sense, though :-)

Thanks
Tobias




RE: Why must the dstream.foreachRDD(...) parameter be serializable?

2015-01-27 Thread Shao, Saisai
Hey Tobias,

I think one consideration is for checkpoint of DStream which guarantee driver 
fault tolerance.

Also this `foreachFunc` is more like an action function of RDD, thinking of 
rdd.foreach(func), in which `func` need to be serializable. So maybe I think 
your way of use it is not a normal way :).

Thanks
Jerry

From: Tobias Pfeiffer [mailto:t...@preferred.jp]
Sent: Wednesday, January 28, 2015 10:16 AM
To: user
Subject: Why must the dstream.foreachRDD(...) parameter be serializable?

Hi,

I want to do something like

dstream.foreachRDD(rdd = if (someCondition) ssc.stop())

so in particular the function does not touch any element in the RDD and runs 
completely within the driver. However, this fails with a 
NotSerializableException because $outer is not serializable etc. The DStream 
code says:

  def foreachRDD(foreachFunc: (RDD[T], Time) = Unit) {
// because the DStream is reachable from the outer object here, and because
// DStreams can't be serialized with closures, we can't proactively check
// it for serializability and so we pass the optional false to 
SparkContext.clean
new ForEachDStream(this, context.sparkContext.clean(foreachFunc, 
false)).register()
  }

To be honest, I don't understand the comment. Why must that function be 
serializable even when there is no RDD action involved?

Thanks
Tobias


RE: Shuffle to HDFS

2015-01-25 Thread Shao, Saisai
Hi Larry,

I don’t think current Spark’s shuffle can support HDFS as a shuffle output. 
Anyway, is there any specific reason to spill shuffle data to HDFS or NFS, this 
will severely increase the shuffle time.

Thanks
Jerry

From: Larry Liu [mailto:larryli...@gmail.com]
Sent: Sunday, January 25, 2015 4:45 PM
To: u...@spark.incubator.apache.org
Subject: Shuffle to HDFS

How to change shuffle output to HDFS or NFS?


RE: where storagelevel DISK_ONLY persists RDD to

2015-01-25 Thread Shao, Saisai
No, current RDD persistence mechanism do not support putting data on HDFS.

The directory is spark.local.dirs.

Instead you can use checkpoint() to save the RDD on HDFS.

Thanks
Jerry

From: Larry Liu [mailto:larryli...@gmail.com]
Sent: Monday, January 26, 2015 3:08 PM
To: Charles Feduke
Cc: u...@spark.incubator.apache.org
Subject: Re: where storagelevel DISK_ONLY persists RDD to

Hi, Charles

Thanks for your reply.

Is it possible to persist RDD to HDFS? What is the default location to persist 
RDD with storagelevel DISK_ONLY?

On Sun, Jan 25, 2015 at 6:26 AM, Charles Feduke 
charles.fed...@gmail.commailto:charles.fed...@gmail.com wrote:
I think you want to instead use `.saveAsSequenceFile` to save an RDD to 
someplace like HDFS or NFS it you are attempting to interoperate with another 
system, such as Hadoop. `.persist` is for keeping the contents of an RDD around 
so future uses of that particular RDD don't need to recalculate its composite 
parts.

On Sun Jan 25 2015 at 3:36:31 AM Larry Liu 
larryli...@gmail.commailto:larryli...@gmail.com wrote:
I would like to persist RDD TO HDFS or NFS mount. How to change the location?



RE: Shuffle to HDFS

2015-01-25 Thread Shao, Saisai
Hey Larry,

I don’t think Hadoop will put shuffle output in HDFS, instead it’s behavior is 
the same as what Spark did, store mapper output (shuffle) data on local disks. 
You might misunderstood something ☺.

Thanks
Jerry

From: Larry Liu [mailto:larryli...@gmail.com]
Sent: Monday, January 26, 2015 3:03 PM
To: Shao, Saisai
Cc: u...@spark.incubator.apache.org
Subject: Re: Shuffle to HDFS

Hi,Jerry

Thanks for your reply.

The reason I have this question is that in Hadoop, mapper intermediate output 
(shuffle) will be stored in HDFS. I think the default location for spark is 
/tmp I think.

Larry

On Sun, Jan 25, 2015 at 9:44 PM, Shao, Saisai 
saisai.s...@intel.commailto:saisai.s...@intel.com wrote:
Hi Larry,

I don’t think current Spark’s shuffle can support HDFS as a shuffle output. 
Anyway, is there any specific reason to spill shuffle data to HDFS or NFS, this 
will severely increase the shuffle time.

Thanks
Jerry

From: Larry Liu [mailto:larryli...@gmail.commailto:larryli...@gmail.com]
Sent: Sunday, January 25, 2015 4:45 PM
To: u...@spark.incubator.apache.orgmailto:u...@spark.incubator.apache.org
Subject: Shuffle to HDFS

How to change shuffle output to HDFS or NFS?



RE: spark streaming with checkpoint

2015-01-22 Thread Shao, Saisai
Hi,

A new RDD will be created in each slide duration, if there’s no data coming, an 
empty RDD will be generated.

I’m not sure there’s way to alleviate your problem from Spark side. Is your 
application design have to build such a large window, can you change your 
implementation if it is easy for you?

I think it’s better and easy for you to change your implementation rather than 
rely on Spark to handle this.

Thanks
Jerry

From: Balakrishnan Narendran [mailto:balu.na...@gmail.com]
Sent: Friday, January 23, 2015 12:19 AM
To: Shao, Saisai
Cc: user@spark.apache.org
Subject: Re: spark streaming with checkpoint

Thank you Jerry,
   Does the window operation create new RDDs for each slide duration..? I 
am asking this because i see a constant increase in memory even when there is 
no logs received.
If not checkpoint is there any alternative that you would suggest.?


On Tue, Jan 20, 2015 at 7:08 PM, Shao, Saisai 
saisai.s...@intel.commailto:saisai.s...@intel.com wrote:
Hi,

Seems you have such a large window (24 hours), so the phenomena of memory 
increasing is expectable, because of window operation will cache the RDD within 
this window in memory. So for your requirement, memory should be enough to hold 
the data of 24 hours.

I don’t think checkpoint in Spark Streaming can alleviate such problem, because 
checkpoint are mainly for fault tolerance.

Thanks
Jerry

From: balu.naren [mailto:balu.na...@gmail.commailto:balu.na...@gmail.com]
Sent: Tuesday, January 20, 2015 7:17 PM
To: user@spark.apache.orgmailto:user@spark.apache.org
Subject: spark streaming with checkpoint


I am a beginner to spark streaming. So have a basic doubt regarding 
checkpoints. My use case is to calculate the no of unique users by day. I am 
using reduce by key and window for this. Where my window duration is 24 hours 
and slide duration is 5 mins. I am updating the processed record to mongodb. 
Currently I am replace the existing record each time. But I see the memory is 
slowly increasing over time and kills the process after 1 and 1/2 hours(in aws 
small instance). The DB write after the restart clears all the old data. So I 
understand checkpoint is the solution for this. But my doubt is

  *   What should my check point duration be..? As per documentation it says 
5-10 times of slide duration. But I need the data of entire day. So it is ok to 
keep 24 hrs.
  *   Where ideally should the checkpoint be..? Initially when I receive the 
stream or just before the window operation or after the data reduction has 
taken place.

Appreciate your help.
Thank you


View this message in context: spark streaming with 
checkpointhttp://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-with-checkpoint-tp21263.html
Sent from the Apache Spark User List mailing list 
archivehttp://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.



RE: spark streaming with checkpoint

2015-01-20 Thread Shao, Saisai
Hi,

Seems you have such a large window (24 hours), so the phenomena of memory 
increasing is expectable, because of window operation will cache the RDD within 
this window in memory. So for your requirement, memory should be enough to hold 
the data of 24 hours.

I don't think checkpoint in Spark Streaming can alleviate such problem, because 
checkpoint are mainly for fault tolerance.

Thanks
Jerry

From: balu.naren [mailto:balu.na...@gmail.com]
Sent: Tuesday, January 20, 2015 7:17 PM
To: user@spark.apache.org
Subject: spark streaming with checkpoint


I am a beginner to spark streaming. So have a basic doubt regarding 
checkpoints. My use case is to calculate the no of unique users by day. I am 
using reduce by key and window for this. Where my window duration is 24 hours 
and slide duration is 5 mins. I am updating the processed record to mongodb. 
Currently I am replace the existing record each time. But I see the memory is 
slowly increasing over time and kills the process after 1 and 1/2 hours(in aws 
small instance). The DB write after the restart clears all the old data. So I 
understand checkpoint is the solution for this. But my doubt is

  *   What should my check point duration be..? As per documentation it says 
5-10 times of slide duration. But I need the data of entire day. So it is ok to 
keep 24 hrs.
  *   Where ideally should the checkpoint be..? Initially when I receive the 
stream or just before the window operation or after the data reduction has 
taken place.

Appreciate your help.
Thank you


View this message in context: spark streaming with 
checkpointhttp://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-with-checkpoint-tp21263.html
Sent from the Apache Spark User List mailing list 
archivehttp://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.


RE: dynamically change receiver for a spark stream

2015-01-20 Thread Shao, Saisai
Hi,

I don't think current Spark Streaming support this feature, all the DStream 
lineage is fixed after the context is started.

Also stopping a stream is not supported, instead currently we need to stop the 
whole streaming context to meet what you want.

Thanks
Saisai

-Original Message-
From: jamborta [mailto:jambo...@gmail.com] 
Sent: Wednesday, January 21, 2015 3:09 AM
To: user@spark.apache.org
Subject: dynamically change receiver for a spark stream

Hi all,

we have been trying to setup a stream using a custom receiver that would pick 
up data from sql databases. we'd like to keep that stream context running and 
dynamically change the streams on demand, adding and removing streams based on 
demand. alternativel, if a stream is fixed, is it possible to stop a stream, 
change to config and start again? 

thanks,



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/dynamically-change-receiver-for-a-spark-stream-tp21268.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Streaming with Java: Expected ReduceByWindow to Return JavaDStream

2015-01-18 Thread Shao, Saisai
Hi Jeff,

From my understanding it seems more like a bug, since JavaDStreamLike is used 
for Java code, return a Scala DStream is not reasonable. You can fix this by 
submitting a PR, or I can help you to fix this.

Thanks
Jerry

From: Jeff Nadler [mailto:jnad...@srcginc.com]
Sent: Monday, January 19, 2015 2:04 PM
To: user@spark.apache.org
Subject: Streaming with Java: Expected ReduceByWindow to Return JavaDStream


Can anyone tell me if my expectations are sane?

I'm trying to do a reduceByWindow using the 3-arg signature (not providing an 
inverse reduce function):

JavaDStreamwhatevs reducedStream = messages.reduceByWindow((x, y) - 
reduce(x, y), Durations.seconds(5), Durations.seconds(5));

This isn't building; looks like it's returning DStream not JavaDStream.

From JavaDStreamLike.scala, looks like this sig returns DStream, the 4-arg sig 
with the inverse reduce returns JavaDStream.

def reduceByWindow(
reduceFunc: (T, T) = T,
windowDuration: Duration,
slideDuration: Duration
  ): DStream[T] = {
  dstream.reduceByWindow(reduceFunc, windowDuration, slideDuration)
}

So I'm just a noob.  Is this a bug or am I missing something?

Thanks!

Jeff Nadler




RE: How to replay consuming messages from kafka using spark streaming?

2015-01-14 Thread Shao, Saisai
I think there're two solutions:

1. Enable write ahead log in Spark Streaming if you're using Spark 1.2.
2.  Using third-party Kafka consumer 
(https://github.com/dibbhatt/kafka-spark-consumer).

Thanks
Saisai

-Original Message-
From: mykidong [mailto:mykid...@gmail.com] 
Sent: Thursday, January 15, 2015 11:59 AM
To: user@spark.apache.org
Subject: How to replay consuming messages from kafka using spark streaming?

Hi,

My Spark Streaming Job is doing like kafka etl to HDFS.
For instance, every 10 min. my streaming job is retrieving messages from kafka, 
and save them as avro files onto hdfs. 
My question is, if worker fails to write avro to hdfs, sometimes, I want to 
replay consuming messages from the last succeeded kafka offset again. 
I think, Spark Streaming Kafka Receiver is written using Kafka High Level 
Consumer API, not Simple Consumer API.

Any idea how to replay kafka consuming in spark streaming?

- Kidong.






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-replay-consuming-messages-from-kafka-using-spark-streaming-tp21145.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Better way of measuring custom application metrics

2015-01-04 Thread Shao, Saisai
I started to know your requirement, maybe there’s some limitations in current 
MetricsSystem, I think we can improve it either.

Thanks
Jerry

From: Enno Shioji [mailto:eshi...@gmail.com]
Sent: Sunday, January 4, 2015 5:46 PM
To: Shao, Saisai
Cc: user@spark.apache.org
Subject: Re: Better way of measuring custom application metrics

Hi Jerry, thanks for your answer.

I had looked at MetricsSystem, but I couldn't see how I could use it in my use 
case, which is:

stream
.map { i =
  Metriker.mr.meter(Metriker.metricName(testmetric123)).mark(i)
  i * 2
}

From what I can see, a Source accepts an object and describes how to poll it 
for metrics. Presumably that's why Sources have only Gauges and never Meters, 
for example. In my case, I don't have a state that I want Spark's MetricSystem 
to poll.

If I could get a reference to an internal metricRegistry instance AND an task 
identifier in my functions, I could achieve the same thing while using Spark's 
metric configuration, but I couldn't find a way to do this either...








[https://mailfoogae.appspot.com/t?sender=aZXNoaW9qaUBnbWFpbC5jb20%3Dtype=zerocontentguid=d015425e-0a3a-4ca7-8ddb-3b1862e8884d]ᐧ
On Sun, Jan 4, 2015 at 2:46 AM, Shao, Saisai 
saisai.s...@intel.commailto:saisai.s...@intel.com wrote:
Hi,

I think there’s a StreamingSource in Spark Streaming which exposes the Spark 
Streaming running status to the metrics sink, you can connect it with Graphite 
sink to expose metrics to Graphite. I’m not sure is this what you want.

Besides you can customize the Source and Sink of the MetricsSystem to build 
your own and configure it in metrics.properties with class name to let it 
loaded by metrics system, for the details you can refer to 
http://spark.apache.org/docs/latest/monitoring.html or source code.

Thanks
Jerry

From: Enno Shioji [mailto:eshi...@gmail.commailto:eshi...@gmail.com]
Sent: Sunday, January 4, 2015 7:47 AM
To: user@spark.apache.orgmailto:user@spark.apache.org
Subject: Better way of measuring custom application metrics

I have a hack to gather custom application metrics in a Streaming job, but I 
wanted to know if there is any better way of doing this.

My hack consists of this singleton:

object Metriker extends Serializable {
  @transient lazy val mr: MetricRegistry = {
val metricRegistry = new MetricRegistry()
val graphiteEndpoint = new 
InetSocketAddress(ec2-54-220-56-229.eu-west-1.compute.amazonaws.comhttp://ec2-54-220-56-229.eu-west-1.compute.amazonaws.com,
 2003)
GraphiteReporter
  .forRegistry(metricRegistry)
  .build(new Graphite(graphiteEndpoint))
  .start(5, TimeUnit.SECONDS)
metricRegistry
  }

  @transient lazy val processId = ManagementFactory.getRuntimeMXBean.getName

  @transient lazy val hostId = {
try {
  InetAddress.getLocalHost.getHostName
} catch {
  case e: UnknownHostException = localhost
}
  }

   def metricName(name: String): String = {
%s.%s.%s.format(name, hostId, processId)
  }
}


which I then use in my jobs like so:

stream
.map { i =
  Metriker.mr.meter(Metriker.metricName(testmetric123)).mark(i)
  i * 2
}

Then I aggregate the metrics on Graphite. This works, but I was curious to know 
if anyone has a less hacky way.


ᐧ



RE: Better way of measuring custom application metrics

2015-01-03 Thread Shao, Saisai
Hi,

I think there’s a StreamingSource in Spark Streaming which exposes the Spark 
Streaming running status to the metrics sink, you can connect it with Graphite 
sink to expose metrics to Graphite. I’m not sure is this what you want.

Besides you can customize the Source and Sink of the MetricsSystem to build 
your own and configure it in metrics.properties with class name to let it 
loaded by metrics system, for the details you can refer to 
http://spark.apache.org/docs/latest/monitoring.html or source code.

Thanks
Jerry

From: Enno Shioji [mailto:eshi...@gmail.com]
Sent: Sunday, January 4, 2015 7:47 AM
To: user@spark.apache.org
Subject: Better way of measuring custom application metrics

I have a hack to gather custom application metrics in a Streaming job, but I 
wanted to know if there is any better way of doing this.

My hack consists of this singleton:

object Metriker extends Serializable {
  @transient lazy val mr: MetricRegistry = {
val metricRegistry = new MetricRegistry()
val graphiteEndpoint = new 
InetSocketAddress(ec2-54-220-56-229.eu-west-1.compute.amazonaws.comhttp://ec2-54-220-56-229.eu-west-1.compute.amazonaws.com,
 2003)
GraphiteReporter
  .forRegistry(metricRegistry)
  .build(new Graphite(graphiteEndpoint))
  .start(5, TimeUnit.SECONDS)
metricRegistry
  }

  @transient lazy val processId = ManagementFactory.getRuntimeMXBean.getName

  @transient lazy val hostId = {
try {
  InetAddress.getLocalHost.getHostName
} catch {
  case e: UnknownHostException = localhost
}
  }

   def metricName(name: String): String = {
%s.%s.%s.format(name, hostId, processId)
  }
}


which I then use in my jobs like so:

stream
.map { i =
  Metriker.mr.meter(Metriker.metricName(testmetric123)).mark(i)
  i * 2
}

Then I aggregate the metrics on Graphite. This works, but I was curious to know 
if anyone has a less hacky way.


[https://mailfoogae.appspot.com/t?sender=aZXNoaW9qaUBnbWFpbC5jb20%3Dtype=zerocontentguid=29916861-9b4d-423b-8e45-c731deddd43b]ᐧ


RE: serialization issue with mapPartitions

2014-12-25 Thread Shao, Saisai
Hi,

Hadoop Configuration is only Writable, not Java Serializable. You can use 
SerializableWritable (in Spark) to wrap the Configuration to make it 
serializable, and use broadcast variable to broadcast this conf to all the 
node, then you can use it in mapPartitions, rather than  serialize it within 
closure.

You can refer to org.apache.spark.rdd.HadoopRDD, there is a similar usage 
scenario like yours.

Thanks
Jerry.

From: Tobias Pfeiffer [mailto:t...@preferred.jp]
Sent: Friday, December 26, 2014 9:38 AM
To: ey-chih chow
Cc: user
Subject: Re: serialization issue with mapPartitions

Hi,

On Fri, Dec 26, 2014 at 10:13 AM, ey-chih chow 
eyc...@hotmail.commailto:eyc...@hotmail.com wrote:
I should rephrase my question as follows:

How to use the corresponding Hadoop Configuration of a HadoopRDD in defining
a function as an input parameter to the MapPartitions function?

Well, you could try to pull the `val config = job.getConfiguration()` out of 
the function and just use `config` inside the function, hoping that this one is 
serializable.

Tobias




Question on saveAsTextFile with overwrite option

2014-12-24 Thread Shao, Saisai
Hi,

We have such requirements to save RDD output to HDFS with saveAsTextFile like 
API, but need to overwrite the data if existed. I'm not sure if current Spark 
support such kind of operations, or I need to check this manually?

There's a thread in mailing list discussed about this 
(http://apache-spark-user-list.1001560.n3.nabble.com/How-can-I-make-Spark-1-0-saveAsTextFile-to-overwrite-existing-file-td6696.html),
 I'm not sure this feature is enabled or not, or with some configurations?

Appreciate your suggestions.

Thanks a lot
Jerry


RE: Question on saveAsTextFile with overwrite option

2014-12-24 Thread Shao, Saisai
Thanks Patrick for your detailed explanation.

BR
Jerry

-Original Message-
From: Patrick Wendell [mailto:pwend...@gmail.com] 
Sent: Thursday, December 25, 2014 3:43 PM
To: Cheng, Hao
Cc: Shao, Saisai; user@spark.apache.org; d...@spark.apache.org
Subject: Re: Question on saveAsTextFile with overwrite option

So the behavior of overwriting existing directories IMO is something we don't 
want to encourage. The reason why the Hadoop client has these checks is that 
it's very easy for users to do unsafe things without them. For instance, a user 
could overwrite an RDD that had 100 partitions with an RDD that has 10 
partitions... and if they read back the RDD they would get a corrupted RDD that 
has a combination of data from the old and new RDD.

If users want to circumvent these safety checks, we need to make them 
explicitly disable them. Given this, I think a config option is as reasonable 
as any alternatives. This is already pretty easy IMO.

- Patrick

On Wed, Dec 24, 2014 at 11:28 PM, Cheng, Hao hao.ch...@intel.com wrote:
 I am wondering if we can provide more friendly API, other than configuration 
 for this purpose. What do you think Patrick?

 Cheng Hao

 -Original Message-
 From: Patrick Wendell [mailto:pwend...@gmail.com]
 Sent: Thursday, December 25, 2014 3:22 PM
 To: Shao, Saisai
 Cc: user@spark.apache.org; d...@spark.apache.org
 Subject: Re: Question on saveAsTextFile with overwrite option

 Is it sufficient to set spark.hadoop.validateOutputSpecs to false?

 http://spark.apache.org/docs/latest/configuration.html

 - Patrick

 On Wed, Dec 24, 2014 at 10:52 PM, Shao, Saisai saisai.s...@intel.com wrote:
 Hi,



 We have such requirements to save RDD output to HDFS with 
 saveAsTextFile like API, but need to overwrite the data if existed.
 I'm not sure if current Spark support such kind of operations, or I need to 
 check this manually?



 There's a thread in mailing list discussed about this 
 (http://apache-spark-user-list.1001560.n3.nabble.com/How-can-I-make-S
 p ark-1-0-saveAsTextFile-to-overwrite-existing-file-td6696.html),
 I'm not sure this feature is enabled or not, or with some configurations?



 Appreciate your suggestions.



 Thanks a lot

 Jerry

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For 
 additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Spark Streaming Python APIs?

2014-12-14 Thread Shao, Saisai
AFAIK, this will be a new feature in version 1.2, you can check out the master 
branch or 1.2 branch to take a try.

Thanks
Jerry

From: Xiaoyong Zhu [mailto:xiaoy...@microsoft.com]
Sent: Monday, December 15, 2014 10:53 AM
To: user@spark.apache.org
Subject: Spark Streaming Python APIs?

Hi spark experts

Are there any Python APIs for Spark Streaming? I didn't find the Python APIs in 
Spark Streaming programming guide..
http://spark.apache.org/docs/latest/streaming-programming-guide.html

Xiaoyong



RE: KafkaUtils explicit acks

2014-12-14 Thread Shao, Saisai
Hi,

It is not a trivial work to acknowledge the offsets when RDD is fully 
processed, I think from my understanding only modify the KafakUtils is not 
enough to meet your requirement, you need to add a metadata management stuff 
for each block/RDD, and track them both in executor-driver side, and many other 
things should also be taken care :).

Thanks
Jerry

From: mukh@gmail.com [mailto:mukh@gmail.com] On Behalf Of Mukesh Jha
Sent: Monday, December 15, 2014 1:31 PM
To: Tathagata Das
Cc: francois.garil...@typesafe.com; user@spark.apache.org
Subject: Re: KafkaUtils explicit acks

Thanks TD  Francois for the explanation  documentation. I'm curious if we 
have any performance benchmark with  without WAL for spark-streaming-kafka.

Also In spark-streaming-kafka (as kafka provides a way to acknowledge logs) on 
top of WAL can we modify KafkaUtils to acknowledge the offsets only when the 
RRDs are fully processed and are getting evicted out of the Spark memory thus 
we can be cent percent sure that all the records are getting processed in the 
system.
I was thinking if it's good to have the kafka offset information of each batch 
as part of RDDs metadata and commit the offsets once the RDDs lineage is 
complete.

On Thu, Dec 11, 2014 at 6:26 PM, Tathagata Das 
tathagata.das1...@gmail.commailto:tathagata.das1...@gmail.com wrote:
I am updating the docs right now. Here is a staged copy that you can
have sneak peek of. This will be part of the Spark 1.2.

http://people.apache.org/~tdas/spark-1.2-temp/streaming-programming-guide.html

The updated fault-tolerance section tries to simplify the explanation
of when and what data can be lost, and how to prevent that using the
new experimental feature of write ahead logs.
Any feedback will be much appreciated.

TD

On Wed, Dec 10, 2014 at 2:42 AM,  
francois.garil...@typesafe.commailto:francois.garil...@typesafe.com wrote:
 [sorry for the botched half-message]

 Hi Mukesh,

 There's been some great work on Spark Streaming reliability lately.
 https://www.youtube.com/watch?v=jcJq3ZalXD8
 Look at the links from:
 https://issues.apache.org/jira/browse/SPARK-3129

 I'm not aware of any doc yet (did I miss something ?) but you can look at
 the ReliableKafkaReceiver's test suite:

 external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala

 -
 FG


 On Wed, Dec 10, 2014 at 11:17 AM, Mukesh Jha 
 me.mukesh@gmail.commailto:me.mukesh@gmail.com
 wrote:

 Hello Guys,

 Any insights on this??
 If I'm not clear enough my question is how can I use kafka consumer and
 not loose any data in cases of failures with spark-streaming.

 On Tue, Dec 9, 2014 at 2:53 PM, Mukesh Jha 
 me.mukesh@gmail.commailto:me.mukesh@gmail.com
 wrote:

 Hello Experts,

 I'm working on a spark app which reads data from kafka  persists it in
 hbase.

 Spark documentation states the below [1] that in case of worker failure
 we can loose some data. If not how can I make my kafka stream more reliable?
 I have seen there is a simple consumer [2] but I'm not sure if it has
 been used/tested extensively.

 I was wondering if there is a way to explicitly acknowledge the kafka
 offsets once they are replicated in memory of other worker nodes (if it's
 not already done) to tackle this issue.

 Any help is appreciated in advance.


 Using any input source that receives data through a network - For
 network-based data sources like Kafka and Flume, the received input data is
 replicated in memory between nodes of the cluster (default replication
 factor is 2). So if a worker node fails, then the system can recompute the
 lost from the the left over copy of the input data. However, if the worker
 node where a network receiver was running fails, then a tiny bit of data may
 be lost, that is, the data received by the system but not yet replicated to
 other node(s). The receiver will be started on a different node and it will
 continue to receive data.
 https://github.com/dibbhatt/kafka-spark-consumer

 Txz,

 Mukesh Jha




 --


 Thanks  Regards,

 Mukesh Jha




--


Thanks  Regards,

Mukesh Jhamailto:me.mukesh@gmail.com


RE: Spark streaming for v1.1.1 - unable to start application

2014-12-05 Thread Shao, Saisai
Hi,

I don’t think it’s a problem of Spark Streaming, seeing for call stack, it’s 
the problem when BlockManager starting to initializing itself. Would you mind 
checking your configuration of Spark, hardware problem, deployment. Mostly I 
think it’s not the problem of Spark.

Thanks
Saisai

From: Sourav Chandra [mailto:sourav.chan...@livestream.com]
Sent: Friday, December 5, 2014 4:36 PM
To: user@spark.apache.org
Subject: Spark streaming for v1.1.1 - unable to start application

Hi,

I am getting the below error and due to this there is no completed stages- all 
the waiting

14/12/05 03:31:59 WARN AkkaUtils: Error sending message in 1 attempts
java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at 
scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at 
akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread$$anon$3.block(ThreadPoolBuilder.scala:169)
at 
scala.concurrent.forkjoin.ForkJoinPool.managedBlock(ForkJoinPool.java:3640)
at 
akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread.blockOn(ThreadPoolBuilder.scala:167)
at scala.concurrent.Await$.result(package.scala:107)
at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:176)
at 
org.apache.spark.storage.BlockManagerMaster.askDriverWithReply(BlockManagerMaster.scala:213)
at 
org.apache.spark.storage.BlockManagerMaster.tell(BlockManagerMaster.scala:203)
at 
org.apache.spark.storage.BlockManagerMaster.registerBlockManager(BlockManagerMaster.scala:47)
at 
org.apache.spark.storage.BlockManager.initialize(BlockManager.scala:177)
at org.apache.spark.storage.BlockManager.init(BlockManager.scala:147)
at org.apache.spark.storage.BlockManager.init(BlockManager.scala:168)
at org.apache.spark.SparkEnv$.create(SparkEnv.scala:230)
at org.apache.spark.executor.Executor.init(Executor.scala:78)
at 
org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receiveWithLogging$1.applyOrElse(CoarseGrainedExecutorBackend.scala:60)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at 
org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:53)
at 
org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
at 
org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


Could you please let me know the reason and fix for this? Spark version is 1.1.1

--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·


sourav.chan...@livestream.commailto:sourav.chan...@livestream.com

o: +91 80 4121 8723

m: +91 988 699 3746

skype: sourav.chandra

Livestream

Ajmera Summit, First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, 
Koramangala Industrial Area,

Bangalore 560034

www.livestream.comhttp://www.livestream.com/




RE: Spark Streaming empty RDD issue

2014-12-04 Thread Shao, Saisai
Hi,

According to my knowledge of current Spark Streaming Kafka connector, I think 
there's no chance for APP user to detect such kind of failure, this will either 
be done by Kafka consumer with ZK coordinator, either by ReceiverTracker in 
Spark Streaming, so I think you don't need to take care of this issue from 
user's perspective.

If there's no new message coming to consumer, the consumer will wait.

Thanks
Jerry

-Original Message-
From: Hafiz Mujadid [mailto:hafizmujadi...@gmail.com] 
Sent: Thursday, December 4, 2014 2:47 PM
To: u...@spark.incubator.apache.org
Subject: Spark Streaming empty RDD issue

Hi Experts
I am using Spark Streaming to integrate Kafka for real time data processing.
I am facing some issues related to Spark Streaming So I want to know how can we 
detect
1) Our connection has been lost
2) Our receiver is down
3) Spark Streaming has no new messages to consume.

how can we deal these issues?

I will be glad to hear from you and will be thankful to you.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-empty-RDD-issue-tp20329.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Low Level Kafka Consumer for Spark

2014-12-02 Thread Shao, Saisai
Hi Rod,

The purpose of introducing  WAL mechanism in Spark Streaming as a general 
solution is to make all the receivers be benefit from this mechanism. 

Though as you said, external sources like Kafka have their own checkpoint 
mechanism, instead of storing data in WAL, we can only store metadata to WAL, 
and recover from the last committed offsets. But this requires sophisticated 
design of Kafka receiver with low-level API involved, also we need to take care 
of rebalance and fault tolerance things by ourselves. So right now instead of 
implementing a whole new receiver, we choose to implement a simple one, though 
the performance is not so good, it's much easier to understand and maintain.

The design purpose and implementation of reliable Kafka receiver can be found 
in (https://issues.apache.org/jira/browse/SPARK-4062). And in future, to 
improve the reliable Kafka receiver like what you mentioned is on our scheduler.

Thanks
Jerry


-Original Message-
From: RodrigoB [mailto:rodrigo.boav...@aspect.com] 
Sent: Wednesday, December 3, 2014 5:44 AM
To: u...@spark.incubator.apache.org
Subject: Re: Low Level Kafka Consumer for Spark

Dibyendu,

Just to make sure I will not be misunderstood - My concerns are referring to 
the Spark upcoming solution and not yours. I would to gather the perspective of 
someone which implemented recovery with Kafka a different way.

Tnks,
Rod



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p20196.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Spark streaming cannot receive any message from Kafka

2014-11-17 Thread Shao, Saisai
Hi Bill,

Would you mind describing what you found a little more specifically, I’m not 
sure there’s the a parameter in KafkaUtils.createStream you can specify the 
spark parallelism, also what is the exception stacks.

Thanks
Jerry

From: Bill Jay [mailto:bill.jaypeter...@gmail.com]
Sent: Tuesday, November 18, 2014 2:47 AM
To: Helena Edelson
Cc: Jay Vyas; u...@spark.incubator.apache.org; Tobias Pfeiffer; Shao, Saisai
Subject: Re: Spark streaming cannot receive any message from Kafka

Hi all,

I find the reason of this issue. It seems in the new version, if I do not 
specify spark.default.parallelism in KafkaUtils.createstream, there will be an 
exception since the kakfa stream creation stage. In the previous versions, it 
seems Spark will use the default value.

Thanks!

Bill

On Thu, Nov 13, 2014 at 5:00 AM, Helena Edelson 
helena.edel...@datastax.commailto:helena.edel...@datastax.com wrote:

I encounter no issues with streaming from kafka to spark in 1.1.0. Do you 
perhaps have a version conflict?

Helena
On Nov 13, 2014 12:55 AM, Jay Vyas 
jayunit100.apa...@gmail.commailto:jayunit100.apa...@gmail.com wrote:
Yup , very important that  n1 for spark streaming jobs, If local use 
local[2]

The thing to remember is that your spark receiver will take a thread to itself 
and produce data , so u need another thread to consume it .

In a cluster manager like yarn or mesos, the word thread Is not used anymore, I 
guess has different meaning- you need 2 or more free compute slots, and that 
should be guaranteed by looking to see how many free node managers are running 
etc.

On Nov 12, 2014, at 7:53 PM, Shao, Saisai 
saisai.s...@intel.commailto:saisai.s...@intel.com wrote:
Did you configure Spark master as local, it should be local[n], n  1 for local 
mode. Beside there’s a Kafka wordcount example in Spark Streaming example, you 
can try that. I’ve tested with latest master, it’s OK.

Thanks
Jerry

From: Tobias Pfeiffer [mailto:t...@preferred.jp]
Sent: Thursday, November 13, 2014 8:45 AM
To: Bill Jay
Cc: u...@spark.incubator.apache.orgmailto:u...@spark.incubator.apache.org
Subject: Re: Spark streaming cannot receive any message from Kafka

Bill,

However, when I am currently using Spark 1.1.0. the Spark streaming job cannot 
receive any messages from Kafka. I have not made any change to the code.

Do you see any suspicious messages in the log output?

Tobias




RE: Spark streaming cannot receive any message from Kafka

2014-11-12 Thread Shao, Saisai
Did you configure Spark master as local, it should be local[n], n  1 for local 
mode. Beside there’s a Kafka wordcount example in Spark Streaming example, you 
can try that. I’ve tested with latest master, it’s OK.

Thanks
Jerry

From: Tobias Pfeiffer [mailto:t...@preferred.jp]
Sent: Thursday, November 13, 2014 8:45 AM
To: Bill Jay
Cc: u...@spark.incubator.apache.org
Subject: Re: Spark streaming cannot receive any message from Kafka

Bill,

However, when I am currently using Spark 1.1.0. the Spark streaming job cannot 
receive any messages from Kafka. I have not made any change to the code.

Do you see any suspicious messages in the log output?

Tobias



RE: Kafka Consumer in Spark Streaming

2014-11-04 Thread Shao, Saisai
Hi, would you mind describing your problem a little more specific.


1.  Is the Kafka broker currently has no data feed in?

2.  This code will print the lines, but not in the driver side, the code is 
running in the executor side, so you can check the log in worker dir to see if 
there’s any printing logs under this folder.

3.  Did you see any exceptions when running the app, this will help to 
define the problem.

Thanks
Jerry

From: Something Something [mailto:mailinglist...@gmail.com]
Sent: Wednesday, November 05, 2014 1:57 PM
To: user@spark.apache.org
Subject: Kafka Consumer in Spark Streaming

I've following code in my program.  I don't get any error, but it's not 
consuming the messages either.  Shouldn't the following code print the line in 
the 'call' method?  What am I missing?

Please help.  Thanks.



JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new 
Duration(60 * 1 * 1000));

JavaPairReceiverInputDStream tweets = KafkaUtils.createStream(ssc, 
machine:2181, 1, map);

JavaDStreamString statuses = tweets.map(
new FunctionString, String() {
public String call(String status) {
System.out.println(status);
return status;
}
}
);


RE: MEMORY_ONLY_SER question

2014-11-04 Thread Shao, Saisai
From my understanding, the Spark code use Kryo as a streaming manner for RDD 
partitions, the deserialization comes with iteration to move forward. But the 
internal thing of Kryo to deserialize all the object once or incrementally is 
actually a behavior of Kryo, I guess Kyro will not deserialize the objects once 
for all.

Thanks
Jerry

From: Mohit Jaggi [mailto:mohitja...@gmail.com]
Sent: Wednesday, November 05, 2014 2:01 PM
To: Tathagata Das
Cc: user@spark.apache.org
Subject: Re: MEMORY_ONLY_SER question

I used the word streaming but I did not mean to refer to spark streaming. I 
meant if a partition containing 10 objects was kryo-serialized into a single 
buffer, then in a mapPartitions() call, as I call iter.next() 10 times to 
access these objects one at a time, does the deserialization happen
a) once to get all 10 objects,
b) 10 times incrementally to get an object at a time, or
c) 10 times to get 10 objects and discard the wrong 9 objects [ i doubt this 
would a design anyone would have adopted ]
I think your answer is option (a) and you refered to Spark streaming to 
indicate that there is no difference in its behavior from spark core...right?

If it is indeed option (a), I am happy with it and don't need to customize. If 
it is (b), I would like to have (a) instead.

I am also wondering if kryo is good at compression of strings and numbers. 
Often I have the data type as Double but it could be encoded in much fewer 
bits.



On Tue, Nov 4, 2014 at 1:02 PM, Tathagata Das 
tathagata.das1...@gmail.commailto:tathagata.das1...@gmail.com wrote:
It it deserialized in a streaming manner as the iterator moves over the 
partition. This is a functionality of core Spark, and Spark Streaming just uses 
it as is.
What do you want to customize it to?

On Tue, Nov 4, 2014 at 9:22 AM, Mohit Jaggi 
mohitja...@gmail.commailto:mohitja...@gmail.com wrote:
Folks,
If I have an RDD persisted in MEMORY_ONLY_SER mode and then it is needed for a 
transformation/action later, is the whole partition of the RDD deserialized 
into Java objects first before my transform/action code works on it? Or is it 
deserialized in a streaming manner as the iterator moves over the partition? Is 
this behavior customizable? I generally use the Kryo serializer.

Mohit.




RE: Kafka Consumer in Spark Streaming

2014-11-04 Thread Shao, Saisai
If you’re running on a standalone mode, the log is under SPAR_HOME/work/ 
directory. I’m not sure for yarn or mesos, you can check the document of Spark 
to see the details.

Thanks
Jerry

From: Something Something [mailto:mailinglist...@gmail.com]
Sent: Wednesday, November 05, 2014 2:28 PM
To: Shao, Saisai
Cc: user@spark.apache.org
Subject: Re: Kafka Consumer in Spark Streaming

The Kafka broker definitely has messages coming in.  But your #2 point is 
valid.  Needless to say I am a newbie to Spark.  I can't figure out where the 
'executor' logs would be.  How would I find them?
All I see printed on my screen is this:

14/11/04 22:21:23 INFO Slf4jLogger: Slf4jLogger started
14/11/04 22:21:23 INFO Remoting: Starting remoting
14/11/04 22:21:24 INFO Remoting: Remoting started; listening on addresses 
:[akka.tcp://spark@mymachie:60743]
14/11/04 22:21:24 INFO Remoting: Remoting now listens on addresses: 
[akka.tcp://spark@mymachine:60743]
14/11/04 22:21:24 WARN NativeCodeLoader: Unable to load native-hadoop library 
for your platform... using builtin-java classes where applicable
14/11/04 22:21:24 INFO JniBasedUnixGroupsMappingWithFallback: Falling back to 
shell based
---
Time: 141516852 ms
---
---
Time: 141516852 ms
---
Keeps repeating this...

On Tue, Nov 4, 2014 at 10:14 PM, Shao, Saisai 
saisai.s...@intel.commailto:saisai.s...@intel.com wrote:
Hi, would you mind describing your problem a little more specific.


1.  Is the Kafka broker currently has no data feed in?

2.  This code will print the lines, but not in the driver side, the code is 
running in the executor side, so you can check the log in worker dir to see if 
there’s any printing logs under this folder.

3.  Did you see any exceptions when running the app, this will help to 
define the problem.

Thanks
Jerry

From: Something Something 
[mailto:mailinglist...@gmail.commailto:mailinglist...@gmail.com]
Sent: Wednesday, November 05, 2014 1:57 PM
To: user@spark.apache.orgmailto:user@spark.apache.org
Subject: Kafka Consumer in Spark Streaming

I've following code in my program.  I don't get any error, but it's not 
consuming the messages either.  Shouldn't the following code print the line in 
the 'call' method?  What am I missing?

Please help.  Thanks.



JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new 
Duration(60 * 1 * 1000));

JavaPairReceiverInputDStream tweets = KafkaUtils.createStream(ssc, 
machine:2181, 1, map);

JavaDStreamString statuses = tweets.map(
new FunctionString, String() {
public String call(String status) {
System.out.println(status);
return status;
}
}
);



RE: FileNotFoundException in appcache shuffle files

2014-10-28 Thread Shao, Saisai
Hi Ryan,

This is an issue from sort-based shuffle, not consolidated hash-based shuffle. 
I guess mostly this issue occurs when Spark cluster is in abnormal situation, 
maybe long time of GC pause or some others, you can check the system status or 
if there’s any other exceptions beside this one.

Thanks
Jerry

From: nobigdealst...@gmail.com [mailto:nobigdealst...@gmail.com] On Behalf Of 
Ryan Williams
Sent: Wednesday, October 29, 2014 1:31 PM
To: user
Subject: FileNotFoundException in appcache shuffle files

My job is failing with the following error:
14/10/29 02:59:14 WARN scheduler.TaskSetManager: Lost task 1543.0 in stage 3.0 
(TID 6266, 
demeter-csmau08-19.demeter.hpc.mssm.eduhttp://demeter-csmau08-19.demeter.hpc.mssm.edu):
 java.io.FileNotFoundException: 
/data/05/dfs/dn/yarn/nm/usercache/willir31/appcache/application_1413512480649_0108/spark-local-20141028214722-43f1/26/shuffle_0_312_0.index
 (No such file or directory)
java.io.FileOutputStream.open(Native Method)
java.io.FileOutputStream.init(FileOutputStream.java:221)

org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:123)

org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:192)

org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4$$anonfun$apply$2.apply(ExternalSorter.scala:733)

org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4$$anonfun$apply$2.apply(ExternalSorter.scala:732)
scala.collection.Iterator$class.foreach(Iterator.scala:727)

org.apache.spark.util.collection.ExternalSorter$IteratorForPartition.foreach(ExternalSorter.scala:790)

org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4.apply(ExternalSorter.scala:732)

org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4.apply(ExternalSorter.scala:728)
scala.collection.Iterator$class.foreach(Iterator.scala:727)
scala.collection.AbstractIterator.foreach(Iterator.scala:1157)

org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:728)

org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:70)

org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)

org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
org.apache.spark.scheduler.Task.run(Task.scala:56)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:181)

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:744)

I get 4 of those on task 1543 before the job aborts. Interspersed in the 4 
task-1543 failures are a few instances of this failure on another task. Here is 
the entire App Master stdout 
dumphttps://www.dropbox.com/s/m8c4o73o0bh7kf8/adam.108?dl=0[1] (~2MB; stack 
traces towards the bottom, of course). I am running {Spark 1.1, Hadoop 2.3.0}.

Here's a summary of the RDD manipulations I've done up to the point of failure:

  *   val A = [read a file in 1419 shards]

 *   the file is 177GB compressed but ends up being ~5TB uncompressed / 
hydrated into scala objects (I think; see below for more discussion on this 
point).
 *   some relevant Spark options:

*   spark.default.parallelism=2000
*   --master yarn-client
*   --executor-memory 50g
*   --driver-memory 10g
*   --num-executors 100
*   --executor-cores 4

  *   A.repartition(3000)

 *   3000 was chosen in an attempt to mitigate shuffle-disk-spillage that 
previous job attempts with 1000 or 1419 shards were mired in

  *   A.persist()

  *   A.count()  // succeeds

 *   screenshot of web UI with stats: http://cl.ly/image/3e130w3J1B2v
 *   I don't know why each task reports 8 TB of Input; that metric 
seems like it is always ludicrously high and I don't pay attention to it 
typically.
 *   Each task shuffle-writes 3.5GB, for a total of 4.9TB

*   Does that mean that 4.9TB is the uncompressed size of the file that 
A was read from?
*   4.9TB is pretty close to the total amount of memory I've configured 
the job to use: (50GB/executor) * (100 executors) ~= 5TB.
*   Is that a coincidence, or are my executors shuffle-writing an 
amount equal to all of their memory for some reason?

  *   val B = A.groupBy(...).filter(_._2.size == 2).map(_._2).flatMap(x = 
x).persist()

 *   my expectation is that ~all elements pass the filter step, so B should 
~equal to A, just to give a sense of the expected memory blowup.

  *   B.count()

 *   this fails while executing .groupBy(...) above

I've found a few discussions of issues whose manifestations look *like* this, 
but nothing that is obviously the same issue. The 

RE: RDD to DStream

2014-10-27 Thread Shao, Saisai
Hi Jianshi,

For simulation purpose, I think you can try ConstantInputDStream and 
QueueInputDStream to convert one RDD or series of RDD into DStream, the first 
one output the same RDD in each batch duration, and the second one just output 
a RDD in a queue in each batch duration. You can take a look at it.

For your case, I think TD’s comment are quite meaningful, it’s not trivial to 
do so, often requires a job to scan all the records, it’s also not the design 
purpose of Spark Streaming, I guess it’s hard to achieve what you want.


Thanks
Jerry

From: Jianshi Huang [mailto:jianshi.hu...@gmail.com]
Sent: Monday, October 27, 2014 1:42 PM
To: Tathagata Das
Cc: Aniket Bhatnagar; user@spark.apache.org
Subject: Re: RDD to DStream

I have a similar requirement. But instead of grouping it by chunkSize, I would 
have the timeStamp be part of the data. So the function I want has the 
following signature:

  // RDD of (timestamp, value)
  def rddToDStream[T](data: RDD[(Long, T)], timeWindow: Long)(implicit ssc: 
StreamingContext): DStream[T]

And DStream should respect the timestamp part. This is important for 
simulation, right?

Do you have any good solution for this?

Jianshi


On Thu, Aug 7, 2014 at 9:32 AM, Tathagata Das 
tathagata.das1...@gmail.commailto:tathagata.das1...@gmail.com wrote:
Hey Aniket,

Great thoughts! I understand the usecase. But as you have realized yourself it 
is not trivial to cleanly stream a RDD as a DStream. Since RDD operations are 
defined to be scan based, it is not efficient to define RDD based on slices of 
data within a partition of another RDD, using pure RDD transformations. What 
you have done is a decent, and probably the only feasible solution, with its 
limitations.

Also the requirements of converting a batch of data to a stream of data can be 
pretty diverse. What rate, what # of events per batch, how many batches, is it 
efficient? Hence, it is not trivial to define a good, clean public API for 
that. If any one has any thoughts, ideas, etc on this, you are more than 
welcome to share them.

TD

On Mon, Aug 4, 2014 at 12:43 AM, Aniket Bhatnagar 
aniket.bhatna...@gmail.commailto:aniket.bhatna...@gmail.com wrote:
The use case for converting RDD into DStream is that I want to simulate a 
stream from an already persisted data for testing analytics. It is trivial to 
create a RDD from any persisted data but not so much for DStream. Therefore, my 
idea to create DStream from RDD. For example, lets say you are trying to 
implement analytics on time series data using Lambda architecture. This means 
you would have to implement the same analytics on streaming data (in streaming 
mode) as well as persisted data (in batch mode). The workflow for implementing 
the anlytics would be to first implement it in batch mode using RDD operations 
and then simulate stream to test the analytics in stream mode. The simulated 
stream should produce the elements at a specified rate. So the solution maybe 
to read data in a RDD, split (chunk) it into multiple RDDs with each RDD having 
the size of elements that need to be streamed per time unit and then finally 
stream each RDD using the compute function.

The problem with using QueueInputDStream is that it will stream data as per the 
batch duration specified in the streaming context and one cannot specify a 
custom slide duration. Moreover, the class QueueInputDStream is private to 
streaming package, so I can't really use it/extend it from an external package. 
Also, I could not find a good solution split a RDD into equal sized smaller 
RDDs that can be fed into an extended version of QueueInputDStream.

Finally, here is what I came up with:

class RDDExtension[T: ClassTag](rdd: RDD[T]) {
  def toStream(streamingContext: StreamingContext, chunkSize: Int, 
slideDurationMilli: Option[Long] = None): DStream[T] = {
new InputDStream[T](streamingContext) {

  private val iterator = rdd.toLocalIterator // WARNING: each partition 
much fit in RAM of local machine.
  private val grouped = iterator.grouped(chunkSize)

  override def start(): Unit = {}

  override def stop(): Unit = {}

  override def compute(validTime: Time): Option[RDD[T]] = {
if (grouped.hasNext) {
  Some(rdd.sparkContext.parallelize(grouped.next()))
} else {
  None
}
  }

  override def slideDuration = {
slideDurationMilli.map(duration = new Duration(duration)).
  getOrElse(super.slideDuration)
  }
}
}

This aims to stream chunkSize elements every slideDurationMilli milliseconds 
(defaults to batch size in streaming context). It's still not perfect (for 
example, the streaming is not precise) but given that this will only be used 
for testing purposes, I don't look for ways to further optimize it.

Thanks,
Aniket


On 2 August 2014 04:07, Mayur Rustagi 
mayur.rust...@gmail.commailto:mayur.rust...@gmail.com wrote:
Nice question :)
Ideally you should use a queuestream interface to push 

RE: RDD to DStream

2014-10-27 Thread Shao, Saisai
I think you solution may not  be extendable if the data size is increasing, 
since you have to collect all your data back to driver node, so the memory 
usage of driver will be a problem.

why not filter out specific time-range data as a rdd, after filtering the whole 
time-range, you will get a series of RDD with timestamp divided, and then feed 
into queue. Still it is not an efficient way, but  it is not limited by driver 
memory.

Also there may have some other solutions like shuffle to arrange data, but you 
cannot avoid scanning the whole data. Basically we need to avoid fetching large 
amount of data back to driver.


Thanks
Jerry

From: Jianshi Huang [mailto:jianshi.hu...@gmail.com]
Sent: Monday, October 27, 2014 2:39 PM
To: Shao, Saisai
Cc: user@spark.apache.org; Tathagata Das (t...@databricks.com)
Subject: Re: RDD to DStream

Hi Saisai,

I understand it's non-trivial, but the requirement of simulating offline data 
as stream is also fair. :)

I just wrote a prototype, however, I need to do a collect and a bunch of 
parallelize...

  // RDD of (timestamp, value)
  def rddToDStream[T: ClassTag](data: RDD[(Long, T)], timeWindow: Long, ssc: 
StreamingContext): DStream[T] = {
val sc = ssc.sparkContext
val d = data.groupBy(_._1 / timeWindow)
.map(e = (e._1, e._2.toSeq.sortBy(_._1).map(_._2)))
.collect()
.map(e = (e._1, sc.parallelize(e._2)))
.sortBy(_._1)
val queue = new mutable.SynchronizedQueue[RDD[T]]

queue ++= d.map(_._2)

ssc.queueStream(queue)
  }

Any way to get a list of RDDs sorted by group key just after groupBy?

Jianshi

On Mon, Oct 27, 2014 at 2:00 PM, Shao, Saisai 
saisai.s...@intel.commailto:saisai.s...@intel.com wrote:
Hi Jianshi,

For simulation purpose, I think you can try ConstantInputDStream and 
QueueInputDStream to convert one RDD or series of RDD into DStream, the first 
one output the same RDD in each batch duration, and the second one just output 
a RDD in a queue in each batch duration. You can take a look at it.

For your case, I think TD’s comment are quite meaningful, it’s not trivial to 
do so, often requires a job to scan all the records, it’s also not the design 
purpose of Spark Streaming, I guess it’s hard to achieve what you want.


Thanks
Jerry

From: Jianshi Huang 
[mailto:jianshi.hu...@gmail.commailto:jianshi.hu...@gmail.com]
Sent: Monday, October 27, 2014 1:42 PM
To: Tathagata Das
Cc: Aniket Bhatnagar; user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: RDD to DStream

I have a similar requirement. But instead of grouping it by chunkSize, I would 
have the timeStamp be part of the data. So the function I want has the 
following signature:

  // RDD of (timestamp, value)
  def rddToDStream[T](data: RDD[(Long, T)], timeWindow: Long)(implicit ssc: 
StreamingContext): DStream[T]

And DStream should respect the timestamp part. This is important for 
simulation, right?

Do you have any good solution for this?

Jianshi


On Thu, Aug 7, 2014 at 9:32 AM, Tathagata Das 
tathagata.das1...@gmail.commailto:tathagata.das1...@gmail.com wrote:
Hey Aniket,

Great thoughts! I understand the usecase. But as you have realized yourself it 
is not trivial to cleanly stream a RDD as a DStream. Since RDD operations are 
defined to be scan based, it is not efficient to define RDD based on slices of 
data within a partition of another RDD, using pure RDD transformations. What 
you have done is a decent, and probably the only feasible solution, with its 
limitations.

Also the requirements of converting a batch of data to a stream of data can be 
pretty diverse. What rate, what # of events per batch, how many batches, is it 
efficient? Hence, it is not trivial to define a good, clean public API for 
that. If any one has any thoughts, ideas, etc on this, you are more than 
welcome to share them.

TD

On Mon, Aug 4, 2014 at 12:43 AM, Aniket Bhatnagar 
aniket.bhatna...@gmail.commailto:aniket.bhatna...@gmail.com wrote:
The use case for converting RDD into DStream is that I want to simulate a 
stream from an already persisted data for testing analytics. It is trivial to 
create a RDD from any persisted data but not so much for DStream. Therefore, my 
idea to create DStream from RDD. For example, lets say you are trying to 
implement analytics on time series data using Lambda architecture. This means 
you would have to implement the same analytics on streaming data (in streaming 
mode) as well as persisted data (in batch mode). The workflow for implementing 
the anlytics would be to first implement it in batch mode using RDD operations 
and then simulate stream to test the analytics in stream mode. The simulated 
stream should produce the elements at a specified rate. So the solution maybe 
to read data in a RDD, split (chunk) it into multiple RDDs with each RDD having 
the size of elements that need to be streamed per time unit and then finally 
stream each RDD using the compute function

RE: RDD to DStream

2014-10-27 Thread Shao, Saisai
I think what you want is to make each bucket as a new RDD as what you mentioned 
in Pig syntax.

gs = ORDER g BY group ASC, g.timestamp ASC  // 'group' is the rounded timestamp 
for each bucket

From my understanding, currently in Spark there’s no such kind of API to 
achieve this, maybe you have to create a customized RDD by yourself.

For the code why cannot executed,

  .map(sc.parallelize(_._2.sortBy(_._1)))  // nested RDD, hmm...

This “sc.parallelize(_._2.sortBy(_._1))”will be serialized as a closure to 
execute in remote side, which obviously do not has SparkContext, I think Spark 
cannot support nested RDD in closure.

Thanks
Jerry

From: Jianshi Huang [mailto:jianshi.hu...@gmail.com]
Sent: Monday, October 27, 2014 3:30 PM
To: Shao, Saisai
Cc: user@spark.apache.org; Tathagata Das (t...@databricks.com)
Subject: Re: RDD to DStream

Ok, back to Scala code, I'm wondering why I cannot do this:

data.groupBy(timestamp / window)
  .sortByKey()  // no sort method available here
  .map(sc.parallelize(_._2.sortBy(_._1)))  // nested RDD, hmm...
  .collect() // returns Seq[RDD[(Timestamp, T)]]


Jianshi

On Mon, Oct 27, 2014 at 3:24 PM, Jianshi Huang 
jianshi.hu...@gmail.commailto:jianshi.hu...@gmail.com wrote:
You're absolutely right, it's not 'scalable' as I'm using collect().

However, it's important to have the RDDs ordered by the timestamp of the time 
window (groupBy puts data to corresponding timewindow).

It's fairly easy to do in Pig, but somehow I have no idea how to express it in 
RDD...

Something like (in Pig, pseudo code :):

g = GROUP data BY (timestamp / windowSize)  // group data into buckets in the 
same time window
gs = ORDER g BY group ASC, g.timestamp ASC  // 'group' is the rounded timestamp 
for each bucket
stream = FOREACH gs GENERATE toRDD(g)

No idea how to do the order by part in RDD.

Jianshi


On Mon, Oct 27, 2014 at 3:07 PM, Shao, Saisai 
saisai.s...@intel.commailto:saisai.s...@intel.com wrote:
I think you solution may not  be extendable if the data size is increasing, 
since you have to collect all your data back to driver node, so the memory 
usage of driver will be a problem.

why not filter out specific time-range data as a rdd, after filtering the whole 
time-range, you will get a series of RDD with timestamp divided, and then feed 
into queue. Still it is not an efficient way, but  it is not limited by driver 
memory.

Also there may have some other solutions like shuffle to arrange data, but you 
cannot avoid scanning the whole data. Basically we need to avoid fetching large 
amount of data back to driver.


Thanks
Jerry

From: Jianshi Huang 
[mailto:jianshi.hu...@gmail.commailto:jianshi.hu...@gmail.com]
Sent: Monday, October 27, 2014 2:39 PM
To: Shao, Saisai
Cc: user@spark.apache.orgmailto:user@spark.apache.org; Tathagata Das 
(t...@databricks.commailto:t...@databricks.com)

Subject: Re: RDD to DStream

Hi Saisai,

I understand it's non-trivial, but the requirement of simulating offline data 
as stream is also fair. :)

I just wrote a prototype, however, I need to do a collect and a bunch of 
parallelize...

  // RDD of (timestamp, value)
  def rddToDStream[T: ClassTag](data: RDD[(Long, T)], timeWindow: Long, ssc: 
StreamingContext): DStream[T] = {
val sc = ssc.sparkContext
val d = data.groupBy(_._1 / timeWindow)
.map(e = (e._1, e._2.toSeq.sortBy(_._1).map(_._2)))
.collect()
.map(e = (e._1, sc.parallelize(e._2)))
.sortBy(_._1)
val queue = new mutable.SynchronizedQueue[RDD[T]]

queue ++= d.map(_._2)

ssc.queueStream(queue)
  }

Any way to get a list of RDDs sorted by group key just after groupBy?

Jianshi

On Mon, Oct 27, 2014 at 2:00 PM, Shao, Saisai 
saisai.s...@intel.commailto:saisai.s...@intel.com wrote:
Hi Jianshi,

For simulation purpose, I think you can try ConstantInputDStream and 
QueueInputDStream to convert one RDD or series of RDD into DStream, the first 
one output the same RDD in each batch duration, and the second one just output 
a RDD in a queue in each batch duration. You can take a look at it.

For your case, I think TD’s comment are quite meaningful, it’s not trivial to 
do so, often requires a job to scan all the records, it’s also not the design 
purpose of Spark Streaming, I guess it’s hard to achieve what you want.


Thanks
Jerry

From: Jianshi Huang 
[mailto:jianshi.hu...@gmail.commailto:jianshi.hu...@gmail.com]
Sent: Monday, October 27, 2014 1:42 PM
To: Tathagata Das
Cc: Aniket Bhatnagar; user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: RDD to DStream

I have a similar requirement. But instead of grouping it by chunkSize, I would 
have the timeStamp be part of the data. So the function I want has the 
following signature:

  // RDD of (timestamp, value)
  def rddToDStream[T](data: RDD[(Long, T)], timeWindow: Long)(implicit ssc: 
StreamingContext): DStream[T]

And DStream should respect the timestamp part. This is important for 
simulation, right?

Do

RE: RDD to DStream

2014-10-27 Thread Shao, Saisai
Yes, I understand what you want, but maybe hard to achieve without collecting 
back to driver node.

Besides, can we just think of another way to do it.

Thanks
Jerry

From: Jianshi Huang [mailto:jianshi.hu...@gmail.com]
Sent: Monday, October 27, 2014 4:07 PM
To: Shao, Saisai
Cc: user@spark.apache.org; Tathagata Das (t...@databricks.com)
Subject: Re: RDD to DStream

Yeah, you're absolutely right Saisai.

My point is we should allow this kind of logic in RDD, let's say transforming 
type RDD[(Key, Iterable[T])] to Seq[(Key, RDD[T])].

Make sense?

Jianshi

On Mon, Oct 27, 2014 at 3:56 PM, Shao, Saisai 
saisai.s...@intel.commailto:saisai.s...@intel.com wrote:
I think what you want is to make each bucket as a new RDD as what you mentioned 
in Pig syntax.

gs = ORDER g BY group ASC, g.timestamp ASC  // 'group' is the rounded timestamp 
for each bucket

From my understanding, currently in Spark there’s no such kind of API to 
achieve this, maybe you have to create a customized RDD by yourself.

For the code why cannot executed,

  .map(sc.parallelize(_._2.sortBy(_._1)))  // nested RDD, hmm...

This “sc.parallelize(_._2.sortBy(_._1))”will be serialized as a closure to 
execute in remote side, which obviously do not has SparkContext, I think Spark 
cannot support nested RDD in closure.

Thanks
Jerry

From: Jianshi Huang 
[mailto:jianshi.hu...@gmail.commailto:jianshi.hu...@gmail.com]
Sent: Monday, October 27, 2014 3:30 PM

To: Shao, Saisai
Cc: user@spark.apache.orgmailto:user@spark.apache.org; Tathagata Das 
(t...@databricks.commailto:t...@databricks.com)
Subject: Re: RDD to DStream

Ok, back to Scala code, I'm wondering why I cannot do this:

data.groupBy(timestamp / window)
  .sortByKey()  // no sort method available here
  .map(sc.parallelize(_._2.sortBy(_._1)))  // nested RDD, hmm...
  .collect() // returns Seq[RDD[(Timestamp, T)]]


Jianshi

On Mon, Oct 27, 2014 at 3:24 PM, Jianshi Huang 
jianshi.hu...@gmail.commailto:jianshi.hu...@gmail.com wrote:
You're absolutely right, it's not 'scalable' as I'm using collect().

However, it's important to have the RDDs ordered by the timestamp of the time 
window (groupBy puts data to corresponding timewindow).

It's fairly easy to do in Pig, but somehow I have no idea how to express it in 
RDD...

Something like (in Pig, pseudo code :):

g = GROUP data BY (timestamp / windowSize)  // group data into buckets in the 
same time window
gs = ORDER g BY group ASC, g.timestamp ASC  // 'group' is the rounded timestamp 
for each bucket
stream = FOREACH gs GENERATE toRDD(g)

No idea how to do the order by part in RDD.

Jianshi


On Mon, Oct 27, 2014 at 3:07 PM, Shao, Saisai 
saisai.s...@intel.commailto:saisai.s...@intel.com wrote:
I think you solution may not  be extendable if the data size is increasing, 
since you have to collect all your data back to driver node, so the memory 
usage of driver will be a problem.

why not filter out specific time-range data as a rdd, after filtering the whole 
time-range, you will get a series of RDD with timestamp divided, and then feed 
into queue. Still it is not an efficient way, but  it is not limited by driver 
memory.

Also there may have some other solutions like shuffle to arrange data, but you 
cannot avoid scanning the whole data. Basically we need to avoid fetching large 
amount of data back to driver.


Thanks
Jerry

From: Jianshi Huang 
[mailto:jianshi.hu...@gmail.commailto:jianshi.hu...@gmail.com]
Sent: Monday, October 27, 2014 2:39 PM
To: Shao, Saisai
Cc: user@spark.apache.orgmailto:user@spark.apache.org; Tathagata Das 
(t...@databricks.commailto:t...@databricks.com)

Subject: Re: RDD to DStream

Hi Saisai,

I understand it's non-trivial, but the requirement of simulating offline data 
as stream is also fair. :)

I just wrote a prototype, however, I need to do a collect and a bunch of 
parallelize...

  // RDD of (timestamp, value)
  def rddToDStream[T: ClassTag](data: RDD[(Long, T)], timeWindow: Long, ssc: 
StreamingContext): DStream[T] = {
val sc = ssc.sparkContext
val d = data.groupBy(_._1 / timeWindow)
.map(e = (e._1, e._2.toSeq.sortBy(_._1).map(_._2)))
.collect()
.map(e = (e._1, sc.parallelize(e._2)))
.sortBy(_._1)
val queue = new mutable.SynchronizedQueue[RDD[T]]

queue ++= d.map(_._2)

ssc.queueStream(queue)
  }

Any way to get a list of RDDs sorted by group key just after groupBy?

Jianshi

On Mon, Oct 27, 2014 at 2:00 PM, Shao, Saisai 
saisai.s...@intel.commailto:saisai.s...@intel.com wrote:
Hi Jianshi,

For simulation purpose, I think you can try ConstantInputDStream and 
QueueInputDStream to convert one RDD or series of RDD into DStream, the first 
one output the same RDD in each batch duration, and the second one just output 
a RDD in a queue in each batch duration. You can take a look at it.

For your case, I think TD’s comment are quite meaningful, it’s not trivial to 
do so, often requires a job to scan all the records, it’s also

RE: Sort-based shuffle did not work as expected

2014-10-27 Thread Shao, Saisai
Hi,

Probably the problem you met is related to this JIRA ticket 
(https://issues.apache.org/jira/browse/SPARK-3948). It's potentially a Kernel 
2.6.32 bug which will make sort-based shuffle failed. I'm not sure your problem 
is the same as this one, would you mind checking your kernel version?

Thanks
Jerry

From: su...@certusnet.com.cn [mailto:su...@certusnet.com.cn]
Sent: Monday, October 27, 2014 5:41 PM
To: user
Subject: Sort-based shuffle did not work as expected

Hi, all
   We would expect to utilize sort-based shuffle in our spark application but 
had encounted unhandled problems.
   It seems that data file and index file are not in consistence state and we 
got wrong result sets when trying to use
   spark to bulk load data into hbase.
   There are many fetch failurs like the following:
  FetchFailed(BlockManagerId(0, work5.msa.certusnet, 44544, 0), 
shuffleId=0, mapId=42, reduceId=3)
Refering to the executor log, we catch the following exception:
14/10/27 11:20:36 ERROR BlockFetcherIterator$BasicBlockFetcherIterator: Could 
not get block(s) from ConnectionManagerId(work4.msa.certusnet,53616)
java.io.IOException: sendMessageReliably failed with ACK that signalled a 
remote error
at 
org.apache.spark.network.ConnectionManager$$anonfun$14.apply(ConnectionManager.scala:869)
at 
org.apache.spark.network.ConnectionManager$$anonfun$14.apply(ConnectionManager.scala:861)
at 
org.apache.spark.network.ConnectionManager$MessageStatus.markDone(ConnectionManager.scala:66)
at 
org.apache.spark.network.ConnectionManager.org$apache$spark$network$ConnectionManager$$handleMessage(ConnectionManager.scala:660)
at 
org.apache.spark.network.ConnectionManager$$anon$10.run(ConnectionManager.scala:520)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)


14/10/27 11:00:50 ERROR BlockManagerWorker: Exception handling buffer message
java.io.IOException: Channel not open for writing - cannot extend file to 
required size
at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:851)
at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:104)
at org.apache.spark.storage.BlockManager.getLocalBytes(BlockManager.scala:379)
at 
org.apache.spark.storage.BlockManagerWorker.getBlock(BlockManagerWorker.scala:100)
at 
org.apache.spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:79)
at 
org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:48)
at 
org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:48)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at 
org.apache.spark.storage.BlockMessageArray.foreach(BlockMessageArray.scala:28)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at org.apache.spark.storage.BlockMessageArray.map(BlockMessageArray.scala:28)
at 
org.apache.spark.storage.BlockManagerWorker.onBlockMessageReceive(BlockManagerWorker.scala:48)




 Any suggestion?
Thanks
Sun





CertusNet




RE: Spark Hive Snappy Error

2014-10-22 Thread Shao, Saisai
Thanks a lot, I will try to reproduce this in my local settings and dig into 
the details, thanks for your information.


BR
Jerry

From: arthur.hk.c...@gmail.com [mailto:arthur.hk.c...@gmail.com]
Sent: Wednesday, October 22, 2014 8:35 PM
To: Shao, Saisai
Cc: arthur.hk.c...@gmail.com; user
Subject: Re: Spark Hive Snappy Error

Hi,

Yes, I can always reproduce the issue:

about you workload, Spark configuration, JDK version and OS version?

I ran SparkPI 1000

java -version
java version 1.7.0_67
Java(TM) SE Runtime Environment (build 1.7.0_67-b01)
Java HotSpot(TM) 64-Bit Server VM (build 24.65-b04, mixed mode)

cat /etc/centos-release
CentOS release 6.5 (Final)

My Spark’s hive-site.xml with following:
 property
  namehive.exec.compress.output/name
  valuetrue/value
 /property

 property
  namemapred.output.compression.codec/name
  valueorg.apache.hadoop.io.compress.SnappyCodec/value
 /property

 property
  namemapred.output.compression.type/name
  valueBLOCK/value
 /property

e.g.
MASTER=spark://m1:7077,m2:7077 ./bin/run-example SparkPi 1000
2014-10-22 20:23:17,033 ERROR [sparkDriver-akka.actor.default-dispatcher-18] 
actor.ActorSystemImpl (Slf4jLogger.scala:apply$mcV$sp(66)) - Uncaught fatal 
error from thread [sparkDriver-akka.actor.default-dispatcher-2] shutting down 
ActorSystem [sparkDriver]
java.lang.UnsatisfiedLinkError: 
org.xerial.snappy.SnappyNative.maxCompressedLength(I)I
 at org.xerial.snappy.SnappyNative.maxCompressedLength(Native Method)
 at org.xerial.snappy.Snappy.maxCompressedLength(Snappy.java:316)
 at 
org.xerial.snappy.SnappyOutputStream.init(SnappyOutputStream.java:79)
 at 
org.apache.spark.io.SnappyCompressionCodec.compressedOutputStream(CompressionCodec.scala:125)
 at 
org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:207)
 at 
org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:83)
 at 
org.apache.spark.broadcast.TorrentBroadcast.init(TorrentBroadcast.scala:68)
 at 
org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:36)
 at 
org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29)
 at 
org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
 at org.apache.spark.SparkContext.broadcast(SparkContext.scala:809)
 at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:829)
 at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:769)
 at 
org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:753)
 at 
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1360)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
 at akka.actor.ActorCell.invoke(ActorCell.scala:456)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
 at akka.dispatch.Mailbox.run(Mailbox.scala:219)
 at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
 at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2014-10-22 20:23:17,036 INFO  [main] scheduler.DAGScheduler 
(Logging.scala:logInfo(59)) - Failed to run reduce at SparkPi.scala:35
Exception in thread main org.apache.spark.SparkException: Job cancelled 
because SparkContext was shut down
 at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:694)
 at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:693)
 at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
 at 
org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:693)
 at 
org.apache.spark.scheduler.DAGSchedulerEventProcessActor.postStop(DAGScheduler.scala:1399)
 at 
akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:201)
 at 
akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:163)
 at akka.actor.ActorCell.terminate(ActorCell.scala:338)
 at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:431)
 at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
 at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:240)
 at akka.dispatch.Mailbox.run

RE: Spark Hive Snappy Error

2014-10-22 Thread Shao, Saisai
Seems you just add snappy library into your classpath:

export CLASSPATH=$HBASE_HOME/lib/hadoop-snappy-0.0.1-SNAPSHOT.jar

But for spark itself, it depends on snappy-0.2.jar. Is there any possibility 
that this problem caused by different version of snappy?

Thanks
Jerry

From: arthur.hk.c...@gmail.com [mailto:arthur.hk.c...@gmail.com]
Sent: Thursday, October 23, 2014 11:32 AM
To: Shao, Saisai
Cc: arthur.hk.c...@gmail.com; user
Subject: Re: Spark Hive Snappy Error

Hi,

Please find the attached file.



my spark-default.xml
# Default system properties included when running spark-submit.
# This is useful for setting default environmental settings.
#
# Example:
# spark.masterspark://master:7077
# spark.eventLog.enabled  true
# spark.eventLog.dir
  hdfs://namenode:8021/directory
# spark.serializerorg.apache.spark.serializer.KryoSerializer
#
spark.executor.memory   2048m
spark.shuffle.spill.compressfalse
spark.io.compression.codec
org.apache.spark.io.SnappyCompressionCodec



my spark-env.sh
#!/usr/bin/env bash
export CLASSPATH=$HBASE_HOME/lib/hadoop-snappy-0.0.1-SNAPSHOT.jar
export CLASSPATH=$CLASSPATH:$HIVE_HOME/lib/mysql-connector-java-5.1.31-bin.jar
export JAVA_LIBRARY_PATH=$HADOOP_HOME/lib/native/Linux-amd64-64
export HADOOP_CONF_DIR=${HADOOP_CONF_DIR:-/etc/hadoop}
export SPARK_WORKER_DIR=/edh/hadoop_data/spark_work/
export SPARK_LOG_DIR=/edh/hadoop_logs/spark
export SPARK_LIBRARY_PATH=$HADOOP_HOME/lib/native/Linux-amd64-64
export 
SPARK_CLASSPATH=$SPARK_HOME/lib_managed/jars/mysql-connector-java-5.1.31-bin.jar
export 
SPARK_CLASSPATH=$SPARK_CLASSPATH:$HBASE_HOME/lib/*:$HIVE_HOME/csv-serde-1.1.2-0.11.0-all.jar:
export SPARK_WORKER_MEMORY=2g
export HADOOP_HEAPSIZE=2000
export SPARK_DAEMON_JAVA_OPTS=-Dspark.deploy.recoveryMode=ZOOKEEPER 
-Dspark.deploy.zookeeper.url=m35:2181,m33:2181,m37:2181
export SPARK_JAVA_OPTS= -XX:+UseConcMarkSweepGC


ll $HADOOP_HOME/lib/native/Linux-amd64-64
-rw-rw-r--. 1 tester tester50523 Aug 27 14:12 hadoop-auth-2.4.1.jar
-rw-rw-r--. 1 tester tester  1062640 Aug 27 12:19 libhadoop.a
-rw-rw-r--. 1 tester tester  1487564 Aug 27 11:14 libhadooppipes.a
lrwxrwxrwx. 1 tester tester   24 Aug 27 07:08 libhadoopsnappy.so - 
libhadoopsnappy.so.0.0.1
lrwxrwxrwx. 1 tester tester   24 Aug 27 07:08 libhadoopsnappy.so.0 - 
libhadoopsnappy.so.0.0.1
-rwxr-xr-x. 1 tester tester54961 Aug 27 07:08 libhadoopsnappy.so.0.0.1
-rwxrwxr-x. 1 tester tester   630328 Aug 27 12:19 libhadoop.so
-rwxrwxr-x. 1 tester tester   630328 Aug 27 12:19 libhadoop.so.1.0.0
-rw-rw-r--. 1 tester tester   582472 Aug 27 11:14 libhadooputils.a
-rw-rw-r--. 1 tester tester   298626 Aug 27 11:14 libhdfs.a
-rwxrwxr-x. 1 tester tester   200370 Aug 27 11:14 libhdfs.so
-rwxrwxr-x. 1 tester tester   200370 Aug 27 11:14 libhdfs.so.0.0.0
lrwxrwxrwx. 1 tester tester 55 Aug 27 07:08 libjvm.so - 
/usr/lib/jvm/jdk1.6.0_45/jre/lib/amd64/server/libjvm.so
lrwxrwxrwx. 1 tester tester   25 Aug 27 07:08 libprotobuf-lite.so - 
libprotobuf-lite.so.8.0.0
lrwxrwxrwx. 1 tester tester   25 Aug 27 07:08 libprotobuf-lite.so.8 - 
libprotobuf-lite.so.8.0.0
-rwxr-xr-x. 1 tester tester   964689 Aug 27 07:08 libprotobuf-lite.so.8.0.0
lrwxrwxrwx. 1 tester tester   20 Aug 27 07:08 libprotobuf.so - 
libprotobuf.so.8.0.0
lrwxrwxrwx. 1 tester tester   20 Aug 27 07:08 libprotobuf.so.8 - 
libprotobuf.so.8.0.0
-rwxr-xr-x. 1 tester tester  8300050 Aug 27 07:08 libprotobuf.so.8.0.0
lrwxrwxrwx. 1 tester tester   18 Aug 27 07:08 libprotoc.so - 
libprotoc.so.8.0.0
lrwxrwxrwx. 1 tester tester   18 Aug 27 07:08 libprotoc.so.8 - 
libprotoc.so.8.0.0
-rwxr-xr-x. 1 tester tester  9935810 Aug 27 07:08 libprotoc.so.8.0.0
-rw-r--r--. 1 tester tester   233554 Aug 27 15:19 libsnappy.a
lrwxrwxrwx. 1 tester tester   23 Aug 27 11:32 libsnappy.so - 
/usr/lib64/libsnappy.so
lrwxrwxrwx. 1 tester tester   23 Aug 27 11:33 libsnappy.so.1 - 
/usr/lib64/libsnappy.so
-rwxr-xr-x. 1 tester tester   147726 Aug 27 07:08 libsnappy.so.1.2.0
drwxr-xr-x. 2 tester tester 4096 Aug 27 07:08 pkgconfig


Regards
Arthur


On 23 Oct, 2014, at 10:57 am, Shao, Saisai 
saisai.s...@intel.commailto:saisai.s...@intel.com wrote:


Hi Arthur,

I think your problem might be different from what 
SPARK-3958(https://issues.apache.org/jira/browse/SPARK-3958) mentioned, seems 
your problem is more likely to be a library link problem, would you mind 
checking your Spark runtime to see if the snappy.so is loaded or not? (through 
lsof -p).

I guess your problem is more likely to be a library not found problem.


Thanks
Jerry




RE: Shuffle files

2014-10-20 Thread Shao, Saisai
Hi Song,

For what I know in sort-based shuffle.

Normally parallel opened file numbers for sort-based shuffle is much smaller 
than hash-based shuffle.

In hash based shuffle, parallel opened file numbers is C * R (where C is core 
number used and R is the reducer number), as you can see the file numbers are 
related to reducer number, no matter how large the shuffle size is.

While in sort-based shuffle, final map output file is only 1, to achieve this 
we need to do by-partition sorting, this will generate some intermediate 
spilling files, but spilled file numbers are related to shuffle size and memory 
size for shuffle, no relation to reducer number.

So If you met “too many open files” in sort-based shuffle, I guess that you 
have so many spilled files while doing shuffle write, one possible way to 
alleviate this is to increase the shuffle memory usage, also change the ulimit 
is a possible way.

I guess in Yarn you have to do system configuration manually, Spark cannot set 
ulimit automatically for you, I don’t think it’s an issue Spark should take 
care.

Thanks
Jerry

From: Chen Song [mailto:chen.song...@gmail.com]
Sent: Tuesday, October 21, 2014 9:10 AM
To: Andrew Ash
Cc: Sunny Khatri; Lisonbee, Todd; u...@spark.incubator.apache.org
Subject: Re: Shuffle files

My observation is opposite. When my job runs under default 
spark.shuffle.manager, I don't see this exception. However, when it runs with 
SORT based, I start seeing this error? How would that be possible?

I am running my job in YARN, and I noticed that the YARN process limits (cat 
/proc/$PID/limits) are not consistent with system wide limits (shown by limit 
-a), I don't know how that happened. Is there a way to let Spark driver to 
propagate this setting (limit -n number) to spark executors before startup?




On Tue, Oct 7, 2014 at 11:53 PM, Andrew Ash 
and...@andrewash.commailto:and...@andrewash.com wrote:
You will need to restart your Mesos workers to pick up the new limits as well.

On Tue, Oct 7, 2014 at 4:02 PM, Sunny Khatri 
sunny.k...@gmail.commailto:sunny.k...@gmail.com wrote:
@SK:
Make sure ulimit has taken effect as Todd mentioned. You can verify via ulimit 
-a. Also make sure you have proper kernel parameters set in /etc/sysctl.conf 
(MacOSX)

On Tue, Oct 7, 2014 at 3:57 PM, Lisonbee, Todd 
todd.lison...@intel.commailto:todd.lison...@intel.com wrote:

Are you sure the new ulimit has taken effect?

How many cores are you using?  How many reducers?

In general if a node in your cluster has C assigned cores and you run
a job with X reducers then Spark will open C*X files in parallel and
start writing. Shuffle consolidation will help decrease the total
number of files created but the number of file handles open at any
time doesn't change so it won't help the ulimit problem.

Quoted from Patrick at:
http://apache-spark-user-list.1001560.n3.nabble.com/quot-Too-many-open-files-quot-exception-on-reduceByKey-td2462.html

Thanks,

Todd

-Original Message-
From: SK [mailto:skrishna...@gmail.commailto:skrishna...@gmail.com]
Sent: Tuesday, October 7, 2014 2:12 PM
To: u...@spark.incubator.apache.orgmailto:u...@spark.incubator.apache.org
Subject: Re: Shuffle files

- We set ulimit to 50. But I still get the same too many open files
warning.

- I tried setting consolidateFiles to True, but that did not help either.

I am using a Mesos cluster.   Does Mesos have any limit on the number of
open files?

thanks






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Shuffle-files-tp15185p15869.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.orgmailto:user-h...@spark.apache.org


-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.orgmailto:user-h...@spark.apache.org





--
Chen Song



RE: Spark Hive Snappy Error

2014-10-16 Thread Shao, Saisai
Hi Arthur,

I think this is a known issue in Spark, you can check 
(https://issues.apache.org/jira/browse/SPARK-3958). I’m curious about it, can 
you always reproduce this issue, Is this issue related to some specific data 
sets, would you mind giving me some information about you workload, Spark 
configuration, JDK version and OS version?

Thanks
Jerry

From: arthur.hk.c...@gmail.com [mailto:arthur.hk.c...@gmail.com]
Sent: Friday, October 17, 2014 7:13 AM
To: user
Cc: arthur.hk.c...@gmail.com
Subject: Spark Hive Snappy Error

Hi,

When trying Spark with Hive table, I got the “java.lang.UnsatisfiedLinkError: 
org.xerial.snappy.SnappyNative.maxCompressedLength(I)I” error,


val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
sqlContext.sql(“select count(1) from q8_national_market_share
sqlContext.sql(select count(1) from 
q8_national_market_share).collect().foreach(println)
java.lang.UnsatisfiedLinkError: 
org.xerial.snappy.SnappyNative.maxCompressedLength(I)I
 at org.xerial.snappy.SnappyNative.maxCompressedLength(Native Method)
 at org.xerial.snappy.Snappy.maxCompressedLength(Snappy.java:316)
 at 
org.xerial.snappy.SnappyOutputStream.init(SnappyOutputStream.java:79)
 at 
org.apache.spark.io.SnappyCompressionCodec.compressedOutputStream(CompressionCodec.scala:125)
 at 
org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:207)
 at 
org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:83)
 at 
org.apache.spark.broadcast.TorrentBroadcast.init(TorrentBroadcast.scala:68)
 at 
org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:36)
 at 
org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29)
 at 
org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
 at org.apache.spark.SparkContext.broadcast(SparkContext.scala:809)
 at 
org.apache.spark.sql.hive.HadoopTableReader.init(TableReader.scala:68)
 at 
org.apache.spark.sql.hive.execution.HiveTableScan.init(HiveTableScan.scala:68)
 at 
org.apache.spark.sql.hive.HiveStrategies$HiveTableScans$$anonfun$14.apply(HiveStrategies.scala:188)
 at 
org.apache.spark.sql.hive.HiveStrategies$HiveTableScans$$anonfun$14.apply(HiveStrategies.scala:188)
 at 
org.apache.spark.sql.SQLContext$SparkPlanner.pruneFilterProject(SQLContext.scala:364)
 at 
org.apache.spark.sql.hive.HiveStrategies$HiveTableScans$.apply(HiveStrategies.scala:184)
 at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
 at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
 at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
 at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59)
 at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54)
 at 
org.apache.spark.sql.execution.SparkStrategies$HashAggregation$.apply(SparkStrategies.scala:146)
 at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
 at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
 at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
 at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59)
 at 
org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:402)
 at 
org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:400)
 at 
org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:406)
 at 
org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:406)
 at org.apache.spark.sql.SchemaRDD.collect(SchemaRDD.scala:438)
 at $iwC$$iwC$$iwC$$iwC.init(console:15)
 at $iwC$$iwC$$iwC.init(console:20)
 at $iwC$$iwC.init(console:22)
 at $iwC.init(console:24)
 at init(console:26)
 at .init(console:30)
 at .clinit(console)
 at .init(console:7)
 at .clinit(console)
 at $print(console)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at 
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789)
 at 
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062)
 at 
org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615)
 at 

RE: Spark Streaming KafkaUtils Issue

2014-10-10 Thread Shao, Saisai
Hi Abraham,

You are correct, the “auto.offset.reset“ behavior in KafkaReceiver is different 
from original Kafka’s semantics, if you set this configure, KafkaReceiver will 
clean the related immediately, but for Kafka this configuration is just a hint 
which will be effective only when offset is out-of-range. So you will always 
read data from the beginning as you set to “smallest”, otherwise if you set to 
“largest”, you will always get data from the end immediately.

There’s a JIRA and PR to follow this, but still not merged to the master, you 
can check to see it (https://issues.apache.org/jira/browse/SPARK-2492).

Thanks
Jerry

From: Abraham Jacob [mailto:abe.jac...@gmail.com]
Sent: Saturday, October 11, 2014 6:57 AM
To: Sean McNamara
Cc: user@spark.apache.org
Subject: Re: Spark Streaming KafkaUtils Issue

Probably this is the issue -

http://www.michael-noll.com/blog/2014/10/01/kafka-spark-streaming-integration-example-tutorial/


·Spark’s usage of the Kafka consumer parameter 
auto.offset.resethttp://kafka.apache.org/documentation.html#consumerconfigs 
is different from Kafka’s semantics. In Kafka, the behavior of setting 
auto.offset.reset to “smallest” is that the consumer will automatically reset 
the offset to the smallest offset when a) there is no existing offset stored in 
ZooKeeper or b) there is an existing offset but it is out of range. Spark 
however will always remove existing offsets and then start all the way from 
zero again. This means whenever you restart your application with 
auto.offset.reset = smallest, your application will completely re-process all 
available Kafka data. Doh! See this 
discussionhttp://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-and-the-spark-shell-tp3347p3387.htmland
 that discussionhttp://markmail.org/message/257a5l3oqyftsjxj.

Hmm interesting... Wondering what happens if I set it as largest...?


On Fri, Oct 10, 2014 at 3:47 PM, Abraham Jacob 
abe.jac...@gmail.commailto:abe.jac...@gmail.com wrote:
Sure... I do set the group.idhttp://group.id for all the consumers to be the 
same. Here is the code ---

SparkConf sparkConf = new 
SparkConf().setMaster(yarn-cluster).setAppName(Streaming WordCount);
sparkConf.set(spark.shuffle.manager, SORT);
sparkConf.set(spark.streaming.unpersist, true);
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new 
Duration(1000));
MapString, String kafkaConf = new HashMapString, String();
kafkaConf.put(zookeeper.connect, zookeeper);
kafkaConf.put(group.idhttp://group.id, consumerGrp);
kafkaConf.put(auto.offset.reset, smallest);
kafkaConf.put(zookeeper.conection.timeout.mshttp://zookeeper.conection.timeout.ms,
 1000);
kafkaConf.put(rebalance.max.retries, 4);
kafkaConf.put(rebalance.backoff.mshttp://rebalance.backoff.ms, 3000);
MapString, Integer topicMap = new HashMapString, Integer();
topicMap.put(topic, 1);
ListJavaPairDStreambyte[], String kafkaStreams = new 
ArrayListJavaPairDStreambyte[], String();
for(int i = 0; i  numPartitions; i++) {
kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, String.class,
DefaultDecoder.class, PayloadDeSerializer.class,
kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER()).mapToPair(new 
PairFunctionTuple2byte[],String, byte[], String() {

private static final long serialVersionUID = -1936810126415608167L;

public Tuple2byte[], String call(Tuple2byte[], String tuple2) throws 
Exception {
return tuple2;
}
}
)
);
}


JavaPairDStreambyte[], String unifiedStream;
if (kafkaStreams.size()  1) {
unifiedStream = jssc.union(kafkaStreams.get(0), kafkaStreams.subList(1, 
kafkaStreams.size()));
} else {
unifiedStream = kafkaStreams.get(0);
}
unifiedStream.print();
jssc.start();
jssc.awaitTermination();


-abe


On Fri, Oct 10, 2014 at 3:37 PM, Sean McNamara 
sean.mcnam...@webtrends.commailto:sean.mcnam...@webtrends.com wrote:
Would you mind sharing the code leading to your createStream?  Are you also 
setting group.idhttp://group.id?

Thanks,

Sean


On Oct 10, 2014, at 4:31 PM, Abraham Jacob 
abe.jac...@gmail.commailto:abe.jac...@gmail.com wrote:

 Hi Folks,

 I am seeing some strange behavior when using the Spark Kafka connector in 
 Spark streaming.

 I have a Kafka topic which has 8 partitions. I have a kafka producer that 
 pumps some messages into this topic.

 On the consumer side I have a spark streaming application that that has 8 
 executors on 8 worker nodes and 8 ReceiverInputDStream with the same kafka 
 group id connected to the 8 partitions I have for the topic. Also the kafka 
 consumer property auto.offset.reset is set to smallest.


 Now here is the sequence of steps -

 (1) I Start the the spark streaming app.
 (2) Start the producer.

 As this point I see the messages that are being pumped from the producer in 
 Spark Streaming.  Then I -

 (1) Stopped the producer
 (2) Wait for all the message to be consumed.
 (2) Stopped the spark streaming app.

 Now when I restart the spark streaming app (note - the 

RE: Spark Streaming KafkaUtils Issue

2014-10-10 Thread Shao, Saisai
Hi abe,

You can see the details in KafkaInputDStream.scala, here is the snippet

// When auto.offset.reset is defined, it is our responsibility to try and 
whack the
// consumer group zk node.
if (kafkaParams.contains(auto.offset.reset)) {
 tryZookeeperConsumerGroupCleanup(zkConnect, kafkaParams(group.id))
}

KafkaReceiver will check your kafkaParams, if “auto.offset.reset” is set, it 
will clean ZK metadata immediately, so you will always read data from beginning 
(set to “smallest”) and end (set to “largest”) immediately, because the ZK 
metadata is deleted beforehand.

If you do not set this parameter, this code path will not be triggered, so data 
will be read from the last commit point. And if last commit point is not yet 
available, Kafka will move the offset to the end of partition (Kafka is set 
“auto.commit.offset” to “largest” by default).

If you want to keep the same semantics as Kafka, you need to remove the above 
code path manually and recompile the Spark.

Thanks
Jerry

From: Abraham Jacob [mailto:abe.jac...@gmail.com]
Sent: Saturday, October 11, 2014 8:49 AM
To: Shao, Saisai
Cc: user@spark.apache.org; Sean McNamara
Subject: Re: Spark Streaming KafkaUtils Issue

Thanks Jerry, So, from what I can understand from the code, if I leave out 
auto.offset.reset, it should theoretically read from the last commit point... 
Correct?

-abe

On Fri, Oct 10, 2014 at 5:40 PM, Shao, Saisai 
saisai.s...@intel.commailto:saisai.s...@intel.com wrote:
Hi Abraham,

You are correct, the “auto.offset.reset“ behavior in KafkaReceiver is different 
from original Kafka’s semantics, if you set this configure, KafkaReceiver will 
clean the related immediately, but for Kafka this configuration is just a hint 
which will be effective only when offset is out-of-range. So you will always 
read data from the beginning as you set to “smallest”, otherwise if you set to 
“largest”, you will always get data from the end immediately.

There’s a JIRA and PR to follow this, but still not merged to the master, you 
can check to see it (https://issues.apache.org/jira/browse/SPARK-2492).

Thanks
Jerry

From: Abraham Jacob [mailto:abe.jac...@gmail.commailto:abe.jac...@gmail.com]
Sent: Saturday, October 11, 2014 6:57 AM
To: Sean McNamara
Cc: user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: Spark Streaming KafkaUtils Issue

Probably this is the issue -

http://www.michael-noll.com/blog/2014/10/01/kafka-spark-streaming-integration-example-tutorial/


•Spark’s usage of the Kafka consumer parameter 
auto.offset.resethttp://kafka.apache.org/documentation.html#consumerconfigs 
is different from Kafka’s semantics. In Kafka, the behavior of setting 
auto.offset.reset to “smallest” is that the consumer will automatically reset 
the offset to the smallest offset when a) there is no existing offset stored in 
ZooKeeper or b) there is an existing offset but it is out of range. Spark 
however will always remove existing offsets and then start all the way from 
zero again. This means whenever you restart your application with 
auto.offset.reset = smallest, your application will completely re-process all 
available Kafka data. Doh! See this 
discussionhttp://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-and-the-spark-shell-tp3347p3387.htmland
 that discussionhttp://markmail.org/message/257a5l3oqyftsjxj.

Hmm interesting... Wondering what happens if I set it as largest...?


On Fri, Oct 10, 2014 at 3:47 PM, Abraham Jacob 
abe.jac...@gmail.commailto:abe.jac...@gmail.com wrote:
Sure... I do set the group.idhttp://group.id for all the consumers to be the 
same. Here is the code ---

SparkConf sparkConf = new 
SparkConf().setMaster(yarn-cluster).setAppName(Streaming WordCount);
sparkConf.set(spark.shuffle.manager, SORT);
sparkConf.set(spark.streaming.unpersist, true);
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new 
Duration(1000));
MapString, String kafkaConf = new HashMapString, String();
kafkaConf.put(zookeeper.connect, zookeeper);
kafkaConf.put(group.idhttp://group.id, consumerGrp);
kafkaConf.put(auto.offset.reset, smallest);
kafkaConf.put(zookeeper.conection.timeout.mshttp://zookeeper.conection.timeout.ms,
 1000);
kafkaConf.put(rebalance.max.retries, 4);
kafkaConf.put(rebalance.backoff.mshttp://rebalance.backoff.ms, 3000);
MapString, Integer topicMap = new HashMapString, Integer();
topicMap.put(topic, 1);
ListJavaPairDStreambyte[], String kafkaStreams = new 
ArrayListJavaPairDStreambyte[], String();
for(int i = 0; i  numPartitions; i++) {
kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, String.class,
DefaultDecoder.class, PayloadDeSerializer.class,
kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER()).mapToPair(new 
PairFunctionTuple2byte[],String, byte[], String() {

private static final long serialVersionUID = -1936810126415608167L;

public Tuple2byte[], String call(Tuple2byte[], String tuple2) throws 
Exception {
return tuple2

RE: Error reading from Kafka

2014-10-08 Thread Shao, Saisai
Hi, I think you have to change the code like this to specify the type info, 
like this:

  val kafkaStream: ReceiverInputDStream[(String, String)] = 
KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
ssc,
kafkaParams,
topicMap, StorageLevel.MEMORY_AND_DISK_SER_2)

You can take a try, actually Kafka unit test also use this API and worked fine.

Besides, the fixed issue you mentioned below will only be occurred in Java code 
calling related API.

Thanks
Jerry


From: Antonio Jesus Navarro [mailto:ajnava...@stratio.com]
Sent: Wednesday, October 08, 2014 10:04 PM
To: user@spark.apache.org
Subject: Error reading from Kafka

Hi, I'm trying to read from Kafka.  I was able to do it correctly with this 
method.

def createStream(
  ssc: StreamingContext,
  zkQuorum: String,
  groupId: String,
  topics: Map[String, Int],
  storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[(String, String)]


But now I have to add some params to kafka consumer so I've changed to other 
createStream method but I'm getting an error:

14/10/08 15:34:10 INFO receiver.ReceiverSupervisorImpl: Deregistering receiver 0
14/10/08 15:34:10 ERROR scheduler.ReceiverTracker: Deregistered receiver for 
stream 0: Error starting receiver 0 - java.lang.NoSuchMethodException: 
scala.runtime.Nothing$.init(kafka.utils.VerifiableProperties)
at java.lang.Class.getConstructor0(Class.java:2849)
at java.lang.Class.getConstructor(Class.java:1718)
at 
org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:106)
at 
org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)
at 
org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)
at 
org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264)
at 
org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257)
at 
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
at 
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
at org.apache.spark.scheduler.Task.run(Task.scala:54)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

This is my code.   It seems that createStream returns 
ReceiverInputDStream[(Nothing, Nothing)] (forced by me to (string, string))  
so, I think that try togetConstructor(kafka.utils.VerifiableProperties) by 
reflection from Nothing object and don't find the method.

  val topics = config.getString(nessus.kafka.topics)
  val numThreads = config.getInt(nessus.kafka.numThreads)
  val topicMap = topics.split(,).map((_,numThreads.toInt)).toMap
  val kafkaParams = Map(
zookeeper.connect - localhost:2181,
group.idhttp://group.id/ - my-grp)

  val kafkaStream: ReceiverInputDStream[(String, String)] = 
KafkaUtils.createStream(ssc,
kafkaParams,
topicMap, StorageLevel.MEMORY_AND_DISK_SER_2)


I found that issue https://issues.apache.org/jira/browse/SPARK-2103 
https://issues.apache.org/jira/browse/SPARK-2103   but it was solved and I'm 
using spark 1.1.0  and scala 2.10 so I don't know what happens.

Any thoughts?

--
[http://www.stratio.com/wp-content/uploads/2014/05/stratio_logo_2014.png]http://www.stratio.com/
Avenida de Europa, 26. Ática 5. 3ª Planta
28224 Pozuelo de Alarcón, Madrid
Tel: +34 91 352 59 42 // @stratiobdhttps://twitter.com/StratioBD


RE: problem with data locality api

2014-09-28 Thread Shao, Saisai
Hi

First conf is used for Hadoop to determine the locality distribution of HDFS 
file. Second conf is used for Spark, though with the same name, actually they 
are two different classes.

Thanks
Jerry

From: qinwei [mailto:wei@dewmobile.net]
Sent: Sunday, September 28, 2014 2:05 PM
To: user
Subject: problem with data locality api

Hi, everyone
I come across with a problem about data locality, i found these example 
code in 《Spark-on-YARN-A-Deep-Dive-Sandy-Ryza.pdf》
val locData = InputFormatInfo.computePreferredLocations(Seq(new 
InputFormatInfo(conf, classOf[TextInputFormat], new Path(“myfile.txt”)))
val sc = new SparkContext(conf, locData)
but i found the two confs above are of different types, conf in the first 
line if of type org.apache.hadoop.conf.Configuration, and conf in the second 
line is of type SparkConf,  can anyone explain that to me or give me some 
example code?


qinwei


RE: sortByKey trouble

2014-09-24 Thread Shao, Saisai
Hi,

SortByKey is only for RDD[(K, V)], each tuple can only has two members, Spark 
will sort with first member, if you want to use sortByKey, you have to change 
your RDD[(String, String, String, String)] into RDD[(String, (String, String, 
String))].

Thanks
Jerry

-Original Message-
From: david [mailto:david...@free.fr] 
Sent: Wednesday, September 24, 2014 4:30 PM
To: u...@spark.incubator.apache.org
Subject: sortByKey trouble

Hi,

  Does anybody know how to use sortbykey in scala on a RDD like  :

  val rddToSave = file.map(l = l.split(\\|)).map(r = (r(34)+-+r(3), r(4), 
r(10), r(12)))

  besauce, i received ann error sortByKey is not a member of 
ord.apache.spark.rdd.RDD[(String,String,String,String)].  

What i try do do is sort on the first element.


Thank's

  





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/sortByKey-trouble-tp14989.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: spark.local.dir and spark.worker.dir not used

2014-09-23 Thread Shao, Saisai
Hi,

Spark.local.dir is the one used to write map output data and persistent RDD 
blocks, but the path of  file has been hashed, so you cannot directly find the 
persistent rdd block files, but definitely it will be in this folders on your 
worker node.

Thanks
Jerry

From: Priya Ch [mailto:learnings.chitt...@gmail.com]
Sent: Tuesday, September 23, 2014 6:31 PM
To: user@spark.apache.org; d...@spark.apache.org
Subject: spark.local.dir and spark.worker.dir not used

Hi,

I am using spark 1.0.0. In my spark code i m trying to persist an rdd to disk 
as rrd.persist(DISK_ONLY). But unfortunately couldn't find the location where 
the rdd has been written to disk. I specified SPARK_LOCAL_DIRS and 
SPARK_WORKER_DIR to some other location rather than using the default /tmp 
directory, but still couldnt see anything in worker directory andspark ocal 
directory.

I also tried specifying the local dir and worker dir from the spark code while 
defining the SparkConf as conf.set(spark.local.dir, /home/padma/sparkdir) 
but the directories are not used.


In general which directories spark would be using for map output files, 
intermediate writes and persisting rdd to disk ?


Thanks,
Padma Ch


RE: spark.local.dir and spark.worker.dir not used

2014-09-23 Thread Shao, Saisai
This folder will be created when you start your Spark application under your 
spark.local.dir, with the name “spark-local-xxx” as prefix. It’s quite strange 
you don’t see this folder, maybe you miss something. Besides if Spark cannot 
create this folder on start, persist rdd to disk will be failed.

Also I think there’s no way to persist RDD to HDFS, even in YARN, only RDD’s 
checkpoint can save data on HDFS.

Thanks
Jerry

From: Chitturi Padma [mailto:learnings.chitt...@gmail.com]
Sent: Tuesday, September 23, 2014 8:33 PM
To: u...@spark.incubator.apache.org
Subject: Re: spark.local.dir and spark.worker.dir not used

I couldnt even see the spark-id folder in the default /tmp directory of 
local.dir.

On Tue, Sep 23, 2014 at 6:01 PM, Priya Ch [hidden 
email]/user/SendEmail.jtp?type=nodenode=14887i=0 wrote:
Is it possible to view the persisted RDD blocks ?
If I use YARN, RDD blocks would be persisted to hdfs then will i be able to 
read the hdfs blocks as i could do in hadoop ?

On Tue, Sep 23, 2014 at 5:56 PM, Shao, Saisai [via Apache Spark User List] 
[hidden email]/user/SendEmail.jtp?type=nodenode=14887i=1 wrote:
Hi,

Spark.local.dir is the one used to write map output data and persistent RDD 
blocks, but the path of  file has been hashed, so you cannot directly find the 
persistent rdd block files, but definitely it will be in this folders on your 
worker node.

Thanks
Jerry

From: Priya Ch [mailto:[hidden 
email]http://user/SendEmail.jtp?type=nodenode=14885i=0]
Sent: Tuesday, September 23, 2014 6:31 PM
To: [hidden email]http://user/SendEmail.jtp?type=nodenode=14885i=1; [hidden 
email]http://user/SendEmail.jtp?type=nodenode=14885i=2
Subject: spark.local.dir and spark.worker.dir not used

Hi,

I am using spark 1.0.0. In my spark code i m trying to persist an rdd to disk 
as rrd.persist(DISK_ONLY). But unfortunately couldn't find the location where 
the rdd has been written to disk. I specified SPARK_LOCAL_DIRS and 
SPARK_WORKER_DIR to some other location rather than using the default /tmp 
directory, but still couldnt see anything in worker directory andspark ocal 
directory.

I also tried specifying the local dir and worker dir from the spark code while 
defining the SparkConf as conf.set(spark.local.dir, /home/padma/sparkdir) 
but the directories are not used.


In general which directories spark would be using for map output files, 
intermediate writes and persisting rdd to disk ?


Thanks,
Padma Ch


If you reply to this email, your message will be added to the discussion below:
http://apache-spark-user-list.1001560.n3.nabble.com/spark-local-dir-and-spark-worker-dir-not-used-tp14881p14885.html
To start a new topic under Apache Spark User List, email [hidden 
email]/user/SendEmail.jtp?type=nodenode=14887i=2
To unsubscribe from Apache Spark User List, click here.
NAMLhttp://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml




View this message in context: Re: spark.local.dir and spark.worker.dir not 
usedhttp://apache-spark-user-list.1001560.n3.nabble.com/spark-local-dir-and-spark-worker-dir-not-used-tp14881p14887.html
Sent from the Apache Spark User List mailing list 
archivehttp://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.


RE: how long does it take executing ./sbt/sbt assembly

2014-09-23 Thread Shao, Saisai
If you have enough memory, the speed will be faster, within one minutes, since 
most of the files are cached. Also you can build your Spark project on a 
mounted ramfs in Linux, this will also speed up the process.

Thanks
Jerry

-Original Message-
From: Zhan Zhang [mailto:zzh...@hortonworks.com] 
Sent: Wednesday, September 24, 2014 1:11 PM
To: christy
Cc: u...@spark.incubator.apache.org
Subject: Re: how long does it take executing ./sbt/sbt assembly

Definitely something wrong. For me, 10 to 30 minutes.

Thanks.

Zhan Zhang
On Sep 23, 2014, at 10:02 PM, christy 760948...@qq.com wrote:

 This process began yesterday and it has already run for more than 20 hours.
 Is it normal? Any one has the same problem? No error throw out yet.
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/how-long-does-it-t
 ake-executing-sbt-sbt-assembly-tp14975.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For 
 additional commands, e-mail: user-h...@spark.apache.org
 


--
CONFIDENTIALITY NOTICE
NOTICE: This message is intended for the use of the individual or entity to 
which it is addressed and may contain information that is confidential, 
privileged and exempt from disclosure under applicable law. If the reader of 
this message is not the intended recipient, you are hereby notified that any 
printing, copying, dissemination, distribution, disclosure or forwarding of 
this communication is strictly prohibited. If you have received this 
communication in error, please contact the sender immediately and delete it 
from your system. Thank You.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Issues with partitionBy: FetchFailed

2014-09-21 Thread Shao, Saisai
Hi,

I’ve also met this problem before, I think you can try to set 
“spark.core.connection.ack.wait.timeout” to a large value to avoid ack timeout, 
default is 60 seconds.

Sometimes because of GC pause or some other reasons, acknowledged message will 
be timeout, which will lead to this exception, you can try setting a large 
value of this configuration.

Thanks
Jerry

From: Julien Carme [mailto:julien.ca...@gmail.com]
Sent: Sunday, September 21, 2014 7:43 PM
To: user@spark.apache.org
Subject: Issues with partitionBy: FetchFailed

Hello,
I am facing an issue with partitionBy, it is not clear whether it is a problem 
with my code or with my spark setup. I am using Spark 1.1, standalone, and my 
other spark projects work fine.
So I have to repartition a relatively large file (about 70 million lines). Here 
is a minimal version of what is not working fine:
myRDD = sc.textFile(...).map { line = (extractKey(line),line) }
myRepartitionedRDD = myRDD.partitionBy(new HashPartitioner(100))
myRepartitionedRDD.saveAsTextFile(...)
It runs quite some time, until I get some errors and it retries. Errors are:
FetchFailed(BlockManagerId(3,myWorker2, 52082,0), 
shuffleId=1,mapId=1,reduceId=5)
Logs are not much more infomrative. I get:

Java.io.IOException : sendMessageReliability failed because ack was not 
received within 60 sec

I get similar errors with all my workers.
Do you have some kind of explanation for this behaviour? What could be wrong?
Thanks,




RE: Serious Issue with Spark Streaming ? Blocks Getting Removed and Jobs have Failed..

2014-09-18 Thread Shao, Saisai
Hi Rafeeq,

I think this situation always occurs when your Spark Streaming application is 
running in an abnormal situation. Would you mind checking your job processing 
time in WebUI or log, is the total latency of job processing + job scheduling 
time larger than batch duration? If your Spark Streaming application is in this 
situation, you will meet this exception.

Normally the reason of this happening is that Spark Streaming job processed one 
by one by default, if one job is blocked for a long time, the next job has to 
wait until the previous one is finished, but input block will be deleted after 
timeout, so when this job is started, it cannot find the right block and will 
throw this exception.

Also you will meet this exception in other abnormal situation. Anyway this 
exception means your application is abnormal, you should pay attention to your 
job execution time.

You can check the spark streaming 
dochttp://spark.apache.org/docs/latest/streaming-programming-guide.html 
“Monitoring Applications” section to see the details.

Thanks
Jerry

From: Rafeeq S [mailto:rafeeq.ec...@gmail.com]
Sent: Thursday, September 18, 2014 2:43 PM
To: u...@spark.incubator.apache.org
Subject: Serious Issue with Spark Streaming ? Blocks Getting Removed and Jobs 
have Failed..

Hi,

I am testing kafka-spark streaming application which throws below error after 
few seconds and below configuration is used for spark streaming test 
environment.

kafka version- 0.8.1
spark version- 1.0.1

SPARK_MASTER_MEMORY=1G
SPARK_DRIVER_MEMORY=1G
SPARK_WORKER_INSTANCES=1
SPARK_EXECUTOR_INSTANCES=1
SPARK_WORKER_MEMORY=1G
SPARK_EXECUTOR_MEMORY=1G
SPARK_WORKER_CORES=2
SPARK_EXECUTOR_CORES=1

ERROR:

14/09/12 17:30:23 WARN TaskSetManager: Loss was due to java.lang.Exception
java.lang.Exception: Could not compute split, block
input-4-1410542878200 not found
at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
at org.apache.spark.rdd.UnionPartition.iterator(UnionRDD.scala:33)
at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:74)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:77)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:227)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.Task.run(Task.scala:51)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

Please suggest your answer

Regards,
Rafeeq S
(“What you do is what matters, not what you think or say or plan.” )



RE: JMXSink for YARN deployment

2014-09-11 Thread Shao, Saisai
Hi,

I’m guessing the problem is that driver or executor cannot get the 
metrics.properties configuration file in the yarn container, so metrics system 
cannot load the right sinks.

Thanks
Jerry

From: Vladimir Tretyakov [mailto:vladimir.tretya...@sematext.com]
Sent: Thursday, September 11, 2014 7:30 PM
To: user@spark.apache.org
Subject: JMXSink for YARN deployment

Hello, we are in Sematext (https://apps.sematext.com/) are writing Monitoring 
tool for Spark and we came across one question:

How to enable JMX metrics for YARN deployment?

We put *.sink.jmx.class=org.apache.spark.metrics.sink.JmxSink
to file $SPARK_HOME/conf/metrics.properties but it doesn't work.

Everything works in Standalone mode, but not in YARN mode.

Can somebody help?

Thx!

PS: I've found also 
https://stackoverflow.com/questions/23529404/spark-on-yarn-how-to-send-metrics-to-graphite-sink/25786112
 without answer.


RE: JMXSink for YARN deployment

2014-09-11 Thread Shao, Saisai
I think you can try to use ” spark.metrics.conf” to manually specify the path 
of metrics.properties, but the prerequisite is that each container should find 
this file in their local FS because this file is loaded locally.

Besides I think this might be a kind of workaround, a better solution is to fix 
this by some other solutions.

Thanks
Jerry

From: Vladimir Tretyakov [mailto:vladimir.tretya...@sematext.com]
Sent: Thursday, September 11, 2014 10:08 PM
Cc: user@spark.apache.org
Subject: Re: JMXSink for YARN deployment

Hi Shao, thx for explanation, any ideas how to fix it? Where should I put 
metrics.properties file?

On Thu, Sep 11, 2014 at 4:18 PM, Shao, Saisai 
saisai.s...@intel.commailto:saisai.s...@intel.com wrote:
Hi,

I’m guessing the problem is that driver or executor cannot get the 
metrics.properties configuration file in the yarn container, so metrics system 
cannot load the right sinks.

Thanks
Jerry

From: Vladimir Tretyakov 
[mailto:vladimir.tretya...@sematext.commailto:vladimir.tretya...@sematext.com]
Sent: Thursday, September 11, 2014 7:30 PM
To: user@spark.apache.orgmailto:user@spark.apache.org
Subject: JMXSink for YARN deployment

Hello, we are in Sematext (https://apps.sematext.com/) are writing Monitoring 
tool for Spark and we came across one question:

How to enable JMX metrics for YARN deployment?

We put *.sink.jmx.class=org.apache.spark.metrics.sink.JmxSink
to file $SPARK_HOME/conf/metrics.properties but it doesn't work.

Everything works in Standalone mode, but not in YARN mode.

Can somebody help?

Thx!

PS: I've found also 
https://stackoverflow.com/questions/23529404/spark-on-yarn-how-to-send-metrics-to-graphite-sink/25786112
 without answer.



RE: Spark streaming: size of DStream

2014-09-09 Thread Shao, Saisai
Hi, 

Is there any specific scenario which needs to know the RDD numbers in the 
DStream? According to my knowledge DStream will generate one RDD in each right 
batchDuration, some old rdd will be remembered for windowing-like function, and 
will be removed when useless. The hashmap generatedRDDs in DStream.scala 
contains the rdd as you wanted, though you cannot call it from app. 

Besides the count() API returns the records number of this DStream's each RDD, 
not the number of RDD, the number of RDD should always be 1 as I understand.

Thanks
Jerry

-Original Message-
From: julyfire [mailto:hellowe...@gmail.com] 
Sent: Tuesday, September 09, 2014 2:42 PM
To: u...@spark.incubator.apache.org
Subject: Spark streaming: size of DStream

I want to implement the following logic:

val stream = getFlumeStream() // a DStream

if(size_of_stream  0)  // if the DStream contains some RDD

  stream.someTransfromation

stream.count() can figure out the number of RDD in a DStream, but it return a 
DStream[Long] and can't compare with a number.

does anyone know how to get the number of RDD in a DStream?

Thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-size-of-DStream-tp13769.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Spark streaming: size of DStream

2014-09-09 Thread Shao, Saisai
Hi,

I think all the received stream will generate a RDD in each batch duration even 
there is no data feed in (an empty RDD will be generated). So you cannot use 
number of RDD to judge whether there is any data received.

One way is to do this in DStream/foreachRDD(), like

a.foreachRDD { r =
if (r.count() == 0) {
   do something
  } else {
   do some other things.
  }
}

You can try it.

Thanks
Jerry


-Original Message-
From: julyfire [mailto:hellowe...@gmail.com] 
Sent: Tuesday, September 09, 2014 3:42 PM
To: u...@spark.incubator.apache.org
Subject: RE: Spark streaming: size of DStream

Hi Jerry,

Thanks for your reply.
I use spark streaming to receive the flume stream, then I need to do a 
judgement, in each batchDuration, if the received stream has data, then I 
should do something, if no data, do the other thing. Then I thought the
count() can give me the measure, but it returns a DStream, not a number.
so is there a way to achieve this case?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-size-of-DStream-tp13769p13775.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Spark streaming: size of DStream

2014-09-09 Thread Shao, Saisai
I think you should clarify some things in Spark Streaming:

1. closure in map is running in the remote side, so modify count var will only 
take effect in remote side. You will always get -1 in driver side.
2. some codes in closure in foreachRDD is lazily executed in each batch 
duration, while the if (...) code outside the closure is executed once 
immediately and will never executed again, so your code logic is wrong as 
expected.
3. I don't think you need to judge whether there is data feed in to do some 
transformations, you can directly transform on DStream even there is no data 
injected in this batch duration, it's only an empty transformation, no more 
specific overhead.

Thanks
Jerry

-Original Message-
From: julyfire [mailto:hellowe...@gmail.com] 
Sent: Tuesday, September 09, 2014 4:20 PM
To: u...@spark.incubator.apache.org
Subject: RE: Spark streaming: size of DStream

i'm sorry I have some error in my code, update here:

var count = -1L // a global variable in the main object 

val currentBatch = some_DStream
val countDStream = currentBatch.map(o={ 
  count = 0L  // reset the count variable in each batch 
  o 
})
countDStream.foreachRDD(rdd= count += rdd.count())

if (count  0) {
  currentBatch.map(...).someOtherTransformation
}

two problems:
1. the variable count just go on accumulate and no reset in each batch 2. 
if(count  0) only evaluate in the beginning of running the program, so the 
next statement will never run

Can you all give me some suggestion? thanks





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-size-of-DStream-tp13769p13781.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Setting Kafka parameters in Spark Streaming

2014-09-08 Thread Shao, Saisai
Hi Hemanth,

I think there is a bug in this API in Spark 0.8.1, so you will meet this 
exception when using Java code with this API, this bug is fixed in latest 
version, as you can see the patch (https://github.com/apache/spark/pull/1508). 
But it’s only for Kafka 0.8+, as you still use kafka 0.7, you can modify the 
Spark code according to this patch and rebuild. Still highly recommend to use 
latest version of Spark and Kafka, there are lots of improvements in streaming 
field.

Thanks
Jerry

From: Hemanth Yamijala [mailto:yhema...@gmail.com]
Sent: Tuesday, September 09, 2014 12:49 AM
To: user@spark.apache.org
Subject: Setting Kafka parameters in Spark Streaming

Hi,

I am using Spark 0.8.1 with Kafka 0.7. I am trying to set the parameter 
fetch.message.max.bytes when creating the Kafka DStream. The only API that 
seems to allow this is the following:

kafkaStream[T, D : kafka.serializer.Decoder[_]](typeClass: Class[T], 
decoderClass: Class[D], kafkaParams: Map[String, String], topics: Map[String, 
Integer], storageLevel: StorageLevel)
I tried to call this as so:
context.kafkaStream(String.class, StringDecoder.class, kafkaParams, topics, 
StorageLevel.MEMORY_AND_DISK())
However, this is causing an exception like:
java.lang.ClassCastException: java.lang.Object cannot be cast to 
kafka.serializer.Decoder
at 
org.apache.spark.streaming.dstream.KafkaReceiver.onStart(KafkaInputDStream.scala:105)
at 
org.apache.spark.streaming.dstream.NetworkReceiver.start(NetworkInputDStream.scala:125)
at 
org.apache.spark.streaming.NetworkInputTracker$ReceiverExecutor$$anonfun$8.apply(NetworkInputTracker.scala:158)
at 
org.apache.spark.streaming.NetworkInputTracker$ReceiverExecutor$$anonfun$8.apply(NetworkInputTracker.scala:154)
Can anyone provide help on how to set these parameters ?
Thanks
Hemanth


RE: Setting Kafka parameters in Spark Streaming

2014-09-08 Thread Shao, Saisai
As you mentioned you hope to transplant latest version of Spark into Kafka 0.7 
in another mail, there are some notes you should take care:


1.  Kafka 0.7+ can only be compiled with Scala 2.8, while now Spark is 
compiled with Scala 2.10, there is no binary compatible between these two Scala 
versions. So you have to modify Kafka code as previously Spark did to fix Scala 
problem.

2.  High Level Consumer API changes between Kafka 0.7 and 0.8, so you have 
to modify KafkaInputDStream in Spark Streaming.

Thanks
Jerry

From: Hemanth Yamijala [mailto:yhema...@gmail.com]
Sent: Tuesday, September 09, 2014 1:19 PM
To: Shao, Saisai
Cc: user@spark.apache.org
Subject: Re: Setting Kafka parameters in Spark Streaming

Thanks, Shao, for providing the necessary information.

Hemanth

On Tue, Sep 9, 2014 at 8:21 AM, Shao, Saisai 
saisai.s...@intel.commailto:saisai.s...@intel.com wrote:
Hi Hemanth,

I think there is a bug in this API in Spark 0.8.1, so you will meet this 
exception when using Java code with this API, this bug is fixed in latest 
version, as you can see the patch (https://github.com/apache/spark/pull/1508). 
But it’s only for Kafka 0.8+, as you still use kafka 0.7, you can modify the 
Spark code according to this patch and rebuild. Still highly recommend to use 
latest version of Spark and Kafka, there are lots of improvements in streaming 
field.

Thanks
Jerry

From: Hemanth Yamijala [mailto:yhema...@gmail.commailto:yhema...@gmail.com]
Sent: Tuesday, September 09, 2014 12:49 AM
To: user@spark.apache.orgmailto:user@spark.apache.org
Subject: Setting Kafka parameters in Spark Streaming

Hi,

I am using Spark 0.8.1 with Kafka 0.7. I am trying to set the parameter 
fetch.message.max.bytes when creating the Kafka DStream. The only API that 
seems to allow this is the following:

kafkaStream[T, D : kafka.serializer.Decoder[_]](typeClass: Class[T], 
decoderClass: Class[D], kafkaParams: Map[String, String], topics: Map[String, 
Integer], storageLevel: StorageLevel)
I tried to call this as so:
context.kafkaStream(String.class, StringDecoder.class, kafkaParams, topics, 
StorageLevel.MEMORY_AND_DISK())
However, this is causing an exception like:
java.lang.ClassCastException: java.lang.Object cannot be cast to 
kafka.serializer.Decoder
at 
org.apache.spark.streaming.dstream.KafkaReceiver.onStart(KafkaInputDStream.scala:105)
at 
org.apache.spark.streaming.dstream.NetworkReceiver.start(NetworkInputDStream.scala:125)
at 
org.apache.spark.streaming.NetworkInputTracker$ReceiverExecutor$$anonfun$8.apply(NetworkInputTracker.scala:158)
at 
org.apache.spark.streaming.NetworkInputTracker$ReceiverExecutor$$anonfun$8.apply(NetworkInputTracker.scala:154)
Can anyone provide help on how to set these parameters ?
Thanks
Hemanth



RE: Trying to run SparkSQL over Spark Streaming

2014-08-21 Thread Shao, Saisai
Hi,

StreamSQL (https://github.com/thunderain-project/StreamSQL) is a POC project 
based on Spark to combine the power of Catalyst and Spark Streaming, to offer 
people the ability to manipulate SQL on top of DStream as you wanted, this keep 
the same semantics with SparkSQL as offer a SchemaDStream on top of DStream. 
You don't need to do tricky thing like extracting rdd to register as a table. 
Besides other parts are the same as Spark.

Thanks
Jerry

-Original Message-
From: praveshjain1991 [mailto:praveshjain1...@gmail.com] 
Sent: Thursday, August 21, 2014 2:25 PM
To: u...@spark.incubator.apache.org
Subject: Re: Trying to run SparkSQL over Spark Streaming

Oh right. Got it. Thanks

Also found this link on that discussion:
https://github.com/thunderain-project/StreamSQL

Does this provide more features than Spark?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Trying-to-run-SparkSQL-over-Spark-Streaming-tp12530p12538.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: NullPointerException from '.count.foreachRDD'

2014-08-20 Thread Shao, Saisai
Hi,

I don't think there's a NPE issue when using DStream/count() even there is no 
data feed into Spark Streaming. I tested using Kafka in my local settings, both 
are OK with and without data consumed.

Actually you can see the details in ReceiverInputDStream, even there is no data 
in this batch duration, it will generate an empty BlockRDD, so map() and 
transformation() in count() operator will never meet NPE. I think the problem 
may lies on your customized InputDStream, you should make sure to generate an 
empty RDD even when there is no data feed in.

Thanks
Jerry

-Original Message-
From: anoldbrain [mailto:anoldbr...@gmail.com] 
Sent: Wednesday, August 20, 2014 4:13 PM
To: u...@spark.incubator.apache.org
Subject: Re: NullPointerException from '.count.foreachRDD'

Looking at the source codes of DStream.scala


   /**
* Return a new DStream in which each RDD has a single element 
 generated by counting each RDD
* of this DStream.
*/
   def count(): DStream[Long] = {
 this.map(_ = (null, 1L))
 .transform(_.union(context.sparkContext.makeRDD(Seq((null, 
 0L)),
 1)))
 .reduceByKey(_ + _)
 .map(_._2)
   }

transform is the line throwing the NullPointerException. Can anyone give some 
hints as what would cause _ to be null (it is indeed null)? This only happens 
when there is no data to process.

When there's data, no NullPointerException is thrown, and all the 
processing/counting proceeds correctly.

I am using my custom InputDStream. Could it be that this is the source of the 
problem?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/NullPointerException-from-count-foreachRDD-tp2066p12461.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: OutOfMemory Error

2014-08-20 Thread Shao, Saisai
Hi Meethu,

The spark.executor.memory is the Java heap size of forked executor process. 
Increasing the spark.executor.memory can actually increase the runtime heap 
size of executor process.

For the details of Spark configurations, you can check: 
http://spark.apache.org/docs/latest/configuration.html

Thanks
Jerry

From: MEETHU MATHEW [mailto:meethu2...@yahoo.co.in]
Sent: Wednesday, August 20, 2014 4:48 PM
To: Akhil Das; Ghousia
Cc: user@spark.apache.org
Subject: Re: OutOfMemory Error


 Hi ,

How to increase the heap size?

What is the difference between spark executor memory and heap size?

Thanks  Regards,
Meethu M

On Monday, 18 August 2014 12:35 PM, Akhil Das 
ak...@sigmoidanalytics.commailto:ak...@sigmoidanalytics.com wrote:

I believe spark.shuffle.memoryFraction is the one you are looking for.

spark.shuffle.memoryFraction : Fraction of Java heap to use for aggregation and 
cogroups during shuffles, if spark.shuffle.spill is true. At any given time, 
the collective size of all in-memory maps used for shuffles is bounded by this 
limit, beyond which the contents will begin to spill to disk. If spills are 
often, consider increasing this value at the expense of 
spark.storage.memoryFraction.

You can give it a try.


Thanks
Best Regards

On Mon, Aug 18, 2014 at 12:21 PM, Ghousia 
ghousia.ath...@gmail.commailto:ghousia.ath...@gmail.com wrote:
Thanks for the answer Akhil. We are right now getting rid of this issue by 
increasing the number of partitions. And we are persisting RDDs to DISK_ONLY. 
But the issue is with heavy computations within an RDD. It would be better if 
we have the option of spilling the intermediate transformation results to local 
disk (only in case if memory consumption is high)  . Do we have any such option 
available with Spark? If increasing the partitions is the only the way, then 
one might end up with OutOfMemory Errors, when working with certain algorithms 
where intermediate result is huge.

On Mon, Aug 18, 2014 at 12:02 PM, Akhil Das 
ak...@sigmoidanalytics.commailto:ak...@sigmoidanalytics.com wrote:
Hi Ghousia,

You can try the following:

1. Increase the heap 
sizehttps://spark.apache.org/docs/0.9.0/configuration.html
2. Increase the number of 
partitionshttp://stackoverflow.com/questions/21698443/spark-best-practice-for-retrieving-big-data-from-rdd-to-local-machine
3. You could try persisting the RDD to use 
DISK_ONLYhttp://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence


Thanks
Best Regards

On Mon, Aug 18, 2014 at 10:40 AM, Ghousia Taj 
ghousia.ath...@gmail.commailto:ghousia.ath...@gmail.com wrote:
Hi,

I am trying to implement machine learning algorithms on Spark. I am working
on a 3 node cluster, with each node having 5GB of memory. Whenever I am
working with slightly more number of records, I end up with OutOfMemory
Error. Problem is, even if number of records is slightly high, the
intermediate result from a transformation is huge and this results in
OutOfMemory Error. To overcome this, we are partitioning the data such that
each partition has only a few records.

Is there any better way to fix this issue. Some thing like spilling the
intermediate data to local disk?

Thanks,
Ghousia.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/OutOfMemory-Error-tp12275.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.orgmailto:user-h...@spark.apache.org






RE: Hi

2014-08-20 Thread Shao, Saisai
Hi,

Actually several java task threads running in a single executor, not processes, 
so each executor will only have one JVM runtime which shares with different 
task threads.

Thanks
Jerry

From: rapelly kartheek [mailto:kartheek.m...@gmail.com]
Sent: Wednesday, August 20, 2014 5:29 PM
To: user@spark.apache.org
Subject: Hi

Hi
I have this doubt:

I understand that each java process runs on different JVM instances. Now, if I 
have a single executor on my machine and run several java processes, then there 
will be several JVM instances running.

Now, process_local means, the data is located on the same JVM as the task that 
is launched. But, the memory associated with the entire executor is same. Then, 
how does this memory gets distributed across the JVMs??. I mean, how this 
memory gets  associated with multiple JVMs??
Thank you!!!
-karthik


RE: Data loss - Spark streaming and network receiver

2014-08-18 Thread Shao, Saisai
I think Currently Spark Streaming lack a data acknowledging mechanism when data 
is stored and replicated in BlockManager, so potentially data will be lost even 
pulled into Kafka, say if data is stored just in BlockGenerator not BM, while 
in the meantime Kafka itself commit the consumer offset, also at this point 
node is failed, from Kafka’s point this part of data is feed into Spark 
Streaming but actually this data is not yet processed, so potentially this part 
of data will never be processed again, unless you read the whole partition 
again.

To solve this potential data loss problem, Spark Streaming needs to offer a 
data acknowledging mechanism, so custom Receiver can use this acknowledgement 
to do checkpoint or recovery, like Storm.

Besides, driver failure is another story need to be carefully considered. So 
currently it is hard to make sure no data loss in Spark Streaming, still need 
to improve at some points ☺.

Thanks
Jerry

From: Tobias Pfeiffer [mailto:t...@preferred.jp]
Sent: Tuesday, August 19, 2014 10:47 AM
To: Wei Liu
Cc: user
Subject: Re: Data loss - Spark streaming and network receiver

Hi Wei,

On Tue, Aug 19, 2014 at 10:18 AM, Wei Liu 
wei@stellarloyalty.commailto:wei@stellarloyalty.com wrote:
Since our application cannot tolerate losing customer data, I am wondering what 
is the best way for us to address this issue.
1) We are thinking writing application specific logic to address the data loss. 
To us, the problem seems to be caused by that Kinesis receivers advanced their 
checkpoint before we know for sure the data is replicated. For example, we can 
do another checkpoint ourselves to remember the kinesis sequence number for 
data that has been processed by spark streaming. When Kinesis receiver is 
restarted due to worker failures, we restarted it from the checkpoint we 
tracked.

This sounds pretty much to me like the way Kafka does it. So, I am not saying 
that the stock KafkaReceiver does what you want (it may or may not), but it 
should be possible to update the offset (corresponds to sequence number) in 
Zookeeper only after data has been replicated successfully. I guess replace 
Kinesis by Kafka is not in option for you, but you may consider pulling 
Kinesis data into Kafka before processing with Spark?

Tobias



RE: spark streaming - lamda architecture

2014-08-14 Thread Shao, Saisai
Hi Ali,

Maybe you can take a look at twitter's Summingbird project 
(https://github.com/twitter/summingbird), which is currently one of the few 
open source choices of lambda Architecture. There's a undergoing sub-project 
called summingbird-spark, that might be the one you wanted, might this can help 
you.

Thanks
Jerry

-Original Message-
From: salemi [mailto:alireza.sal...@udo.edu] 
Sent: Friday, August 15, 2014 11:25 AM
To: u...@spark.incubator.apache.org
Subject: Re: spark streaming - lamda architecture

below is what is what I understand under lambda architecture. The batch layer 
provides the historical data and the speed layer provides the real-time view!

All data entering the system is dispatched to both the batch layer and the 
speed layer for processing.
The batch layer has two functions: 
(i) managing the master dataset (an immutable, append-only set of raw data), and
(ii) to pre-compute the batch views.

The speed layer compensates for the high latency of updates to the serving 
layer and deals with recent data only.

The serving layer indexes the batch views so that they can be queried in 
low-latency, ad-hoc way.

Any incoming query can be answered by merging results from batch views and 
real-time views.

In my system I have events coming in from Kafka sources and currently we need 
to process 10,000 messages per second and write them out to hdfs and make them 
available to be queried by a serving layer.

What would be your suggestion to architecturally solve this issue? How many 
solution with which would approx. be needed for the proposed architecture.

Thanks,
Ali


Tathagata Das wrote
 Can you be a bit more specific about what you mean by lambda architecture?
 
 
 On Thu, Aug 14, 2014 at 2:27 PM, salemi lt;

 alireza.salemi@

 gt; wrote:
 
 Hi,

 How would you implement the batch layer of lamda architecture with 
 spark/spark streaming?

 Thanks,
 Ali



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-l
 amda-architecture-tp12142.html Sent from the Apache Spark User List 
 mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: 

 user-unsubscribe@.apache

 For additional commands, e-mail: 

 user-help@.apache





Tathagata Das wrote
 Can you be a bit more specific about what you mean by lambda architecture?
 
 
 On Thu, Aug 14, 2014 at 2:27 PM, salemi lt;

 alireza.salemi@

 gt; wrote:
 
 Hi,

 How would you implement the batch layer of lamda architecture with 
 spark/spark streaming?

 Thanks,
 Ali



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-l
 amda-architecture-tp12142.html Sent from the Apache Spark User List 
 mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: 

 user-unsubscribe@.apache

 For additional commands, e-mail: 

 user-help@.apache








--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-lamda-architecture-tp12142p12163.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Spark stream data from kafka topics and output as parquet file on HDFS

2014-08-05 Thread Shao, Saisai
Hi Rafeeq,

I think current Spark Streaming api can offer you the ability to fetch data 
from Kafka and store to another external store, if you do not care about 
management of consumer offset manually, there’s no need to use low level api as 
SimpleConsumer.

For Kafka 0.8.1 compatibility, you can try to modify the pom file and rebuild 
Spark to try it, mostly I think it can work.

For parquet file, I think if parquet offers its own OutputFormat that is 
extended from Hadoop’s OutputFormat, Spark can write data into parquet file, 
like sequence file or text file, you can do this as:

DStream.foreach { rdd = rdd.saveAsHadoopFile(…) } to specify the OutputFormat 
you want.

Thanks
Jerry

From: rafeeq s [mailto:rafeeq.ec...@gmail.com]
Sent: Tuesday, August 05, 2014 5:37 PM
To: Dibyendu Bhattacharya
Cc: u...@spark.incubator.apache.org
Subject: Re: Spark stream data from kafka topics and output as parquet file on 
HDFS

Thanks Dibyendu.
1. Spark itself have api jar for kafka, still we require manual offset 
management (using simple consumer concept) and manual consumer ?
2.Kafka Spark Consumer which is implemented in kafka 0.8.0 ,Can we use it for 
kafka 0.8.1 ?
3.How to use Kafka Spark Consumer to produce output as parquet file on HDFS ?
Please give your suggestion.

Regards,
Rafeeq S
(“What you do is what matters, not what you think or say or plan.” )


On Tue, Aug 5, 2014 at 11:55 AM, Dibyendu Bhattacharya 
dibyendu.bhattach...@gmail.commailto:dibyendu.bhattach...@gmail.com wrote:
You can try this Kafka Spark Consumer which I recently wrote. This uses the Low 
Level Kafka Consumer

https://github.com/dibbhatt/kafka-spark-consumer

Dibyendu



On Tue, Aug 5, 2014 at 12:52 PM, rafeeq s 
rafeeq.ec...@gmail.commailto:rafeeq.ec...@gmail.com wrote:
Hi,

I am new to Apache Spark and Trying to Develop spark streaming program to  
stream data from kafka topics and output as parquet file on HDFS.
Please share the sample reference program to stream data from kafka topics and 
output as parquet file on HDFS.
Thanks in Advance.

Regards,
Rafeeq S
(“What you do is what matters, not what you think or say or plan.” )





RE: spark.streaming.unpersist and spark.cleaner.ttl

2014-07-23 Thread Shao, Saisai
Hi Haopu, 

Please see the inline comments.

Thanks
Jerry

-Original Message-
From: Haopu Wang [mailto:hw...@qilinsoft.com] 
Sent: Wednesday, July 23, 2014 3:00 PM
To: user@spark.apache.org
Subject: spark.streaming.unpersist and spark.cleaner.ttl

I have a DStream receiving data from a socket. I'm using local mode.
I set spark.streaming.unpersist to false and leave 
spark.cleaner.ttl to be infinite.
I can see files for input and shuffle blocks under spark.local.dir
folder and the size of folder keeps increasing, although JVM's memory usage 
seems to be stable.

[question] In this case, because input RDDs are persisted but they don't fit 
into memory, so write to disk, right? And where can I see the details about 
these RDDs? I don't see them in web UI.

[answer] Yes, if memory is not enough to put input RDDs, this data will be 
flush to disk, because the default storage level is MEMORY_AND_DISK_SER_2 as 
you can see in StreamingContext.scala. Actually you cannot not see the input 
RDD in web UI, you can only see the cached RDD in web UI.

Then I set spark.streaming.unpersist to true, the size of spark.local.dir 
folder and JVM's used heap size are reduced regularly.

[question] In this case, because I didn't change spark.cleaner.ttl, which 
component is doing the cleanup? And what's the difference if I set 
spark.cleaner.ttl to some duration in this case?

[answer] If you set spark.streaming.unpersist to true, old unused rdd will be 
deleted, as you can see in DStream.scala. While spark.cleaner.ttl is 
timer-based spark cleaner, not only clean streaming data, but also broadcast, 
shuffle and other data.

Thank you!



RE: spark.streaming.unpersist and spark.cleaner.ttl

2014-07-23 Thread Shao, Saisai
Yeah, the document may not be precisely aligned with latest code, so the best 
way is to check the code.

-Original Message-
From: Haopu Wang [mailto:hw...@qilinsoft.com] 
Sent: Wednesday, July 23, 2014 5:56 PM
To: user@spark.apache.org
Subject: RE: spark.streaming.unpersist and spark.cleaner.ttl

Jerry, thanks for the response.

For the default storage level of DStream, it looks like Spark's document is 
wrong. In this link: 
http://spark.apache.org/docs/latest/streaming-programming-guide.html#memory-tuning
It mentions:
Default persistence level of DStreams: Unlike RDDs, the default persistence 
level of DStreams serializes the data in memory (that is, 
StorageLevel.MEMORY_ONLY_SER for DStream compared to StorageLevel.MEMORY_ONLY 
for RDDs). Even though keeping the data serialized incurs higher 
serialization/deserialization overheads, it significantly reduces GC pauses.

I will take a look at DStream.scala although I have no Scala experience.

-Original Message-
From: Shao, Saisai [mailto:saisai.s...@intel.com] 
Sent: 2014年7月23日 15:13
To: user@spark.apache.org
Subject: RE: spark.streaming.unpersist and spark.cleaner.ttl

Hi Haopu, 

Please see the inline comments.

Thanks
Jerry

-Original Message-
From: Haopu Wang [mailto:hw...@qilinsoft.com] 
Sent: Wednesday, July 23, 2014 3:00 PM
To: user@spark.apache.org
Subject: spark.streaming.unpersist and spark.cleaner.ttl

I have a DStream receiving data from a socket. I'm using local mode.
I set spark.streaming.unpersist to false and leave 
spark.cleaner.ttl to be infinite.
I can see files for input and shuffle blocks under spark.local.dir
folder and the size of folder keeps increasing, although JVM's memory usage 
seems to be stable.

[question] In this case, because input RDDs are persisted but they don't fit 
into memory, so write to disk, right? And where can I see the details about 
these RDDs? I don't see them in web UI.

[answer] Yes, if memory is not enough to put input RDDs, this data will be 
flush to disk, because the default storage level is MEMORY_AND_DISK_SER_2 as 
you can see in StreamingContext.scala. Actually you cannot not see the input 
RDD in web UI, you can only see the cached RDD in web UI.

Then I set spark.streaming.unpersist to true, the size of spark.local.dir 
folder and JVM's used heap size are reduced regularly.

[question] In this case, because I didn't change spark.cleaner.ttl, which 
component is doing the cleanup? And what's the difference if I set 
spark.cleaner.ttl to some duration in this case?

[answer] If you set spark.streaming.unpersist to true, old unused rdd will be 
deleted, as you can see in DStream.scala. While spark.cleaner.ttl is 
timer-based spark cleaner, not only clean streaming data, but also broadcast, 
shuffle and other data.

Thank you!



RE: Executor metrics in spark application

2014-07-22 Thread Shao, Saisai
Hi Denes,

I think you can register your customized metrics source into metrics system 
through metrics.properties, you can take metrics.propertes.template as 
reference,

Basically you can do as follow if you want to monitor on executor:

executor.source.accumulator.class=xx.xx.xx.your-customized-metrics-source

I think the below code can only register metrics source in client side.

SparkEnv.get.metricsSystem.registerSource(accumulatorMetrics);

BTW, it's not a good choice to register through MetricsSystem, it would be nice 
to register through configuration. Also you can enable console sink to verify 
whether the source is registered or not.

Thanks
Jerry


-Original Message-
From: Denes [mailto:te...@outlook.com] 
Sent: Tuesday, July 22, 2014 2:02 PM
To: u...@spark.incubator.apache.org
Subject: Re: Executor metrics in spark application

I'm also pretty interested how to create custom Sinks in Spark. I'm using it 
with Ganglia and the normal metrics from JVM source do show up. I tried to 
create my own metric based on Issac's code, but does not show up in Ganglia.
Does anyone know where is the problem?
Here's the code snippet: 

class AccumulatorSource(accumulator: Accumulator[Long], name: String) extends 
Source {
  
  val sourceName = accumulator.metrics
  val metricRegistry = new MetricRegistry()
  
  metricRegistry.register(MetricRegistry.name(accumulator, name), new 
Gauge[Long] {
 override def getValue: Long = {
return accumulator.value;
  }});

}

and then in the main:
val longAccumulator = sc.accumulator[Long](0); val accumulatorMetrics = new 
AccumulatorSource(longAccumulator , counters.accumulator); 
SparkEnv.get.metricsSystem.registerSource(accumulatorMetrics);




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Executor-metrics-in-spark-application-tp188p10385.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


RE: number of Cached Partitions v.s. Total Partitions

2014-07-22 Thread Shao, Saisai
Yes, it's normal when memory is not enough to put the third partition, as you 
can see in your attached picture.

Thanks
Jerry

From: Haopu Wang [mailto:hw...@qilinsoft.com]
Sent: Tuesday, July 22, 2014 3:09 PM
To: user@spark.apache.org
Subject: number of Cached Partitions v.s. Total Partitions


Hi, I'm using local mode and read a text file as RDD using 
JavaSparkContext.textFile() API.

And then call cache() method on the result RDD.



I look at the Storage information and find the RDD has 3 partitions but 2 of 
them have been cached.

Is this a normal behavior? I assume all of partitions should be cached or none 
of them.

If I'm wrong, what are the cases when number of cached partitions is less 
than the total number of partitions?



[cid:image001.jpg@01CFA5C3.0AE4B440]


RE: Executor metrics in spark application

2014-07-22 Thread Shao, Saisai
Yeah, I start to know your purpose. Original design purpose of customized 
metrics source is focused on self-contained source, seems you need to rely on 
outer variable, so the way you mentioned may be is the only way to register.

Besides, as you cannot see the source in Ganglia, I think you can enable 
console sink to verify the outputs, also seems you want to register this source 
in driver, so you need to enable Ganglia sink on driver side and make sure 
Ganglia client can connect your driver.

Thanks
Jerry

-Original Message-
From: Denes [mailto:te...@outlook.com] 
Sent: Tuesday, July 22, 2014 6:38 PM
To: u...@spark.incubator.apache.org
Subject: RE: Executor metrics in spark application

Hi Jerry,

I know that way of registering a metrics, but it seems defeat the whole 
purpose. I'd like to define a source that is set within the application, for 
example number of parsed messages. 
If I register it in the metrics.properties, how can I obtain the instance?
(or instances?)
How can I set the property? Is there a way to read an accumulator values from a 
Source?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Executor-metrics-in-spark-application-tp188p10397.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


RE: Some question about SQL and streaming

2014-07-10 Thread Shao, Saisai
Actually we have a POC project which shows the power of combining Spark 
Streaming and Catalyst, it can manipulate SQL on top of Spark Streaming and get 
SchemaDStream. You can take a look at it: 
https://github.com/thunderain-project/StreamSQL

Thanks
Jerry

From: Tathagata Das [mailto:tathagata.das1...@gmail.com]
Sent: Friday, July 11, 2014 10:17 AM
To: user@spark.apache.org
Subject: Re: Some question about SQL and streaming

Yeah, the right solution is to have something like SchemaDStream, where the 
schema of all the schemaRDD generated by it can be stored. Something I really 
would like to see happen in the future :)

TD

On Thu, Jul 10, 2014 at 6:37 PM, Tobias Pfeiffer 
t...@preferred.jpmailto:t...@preferred.jp wrote:
Hi,

I think it would be great if we could do the string parsing only once and then 
just apply the transformation for each interval (reducing the processing 
overhead for short intervals).

Also, one issue with the approach above is that transform() has the following 
signature:

  def transform(transformFunc: RDD[T] = RDD[U]): DStream[U]

and therefore, in my example

val result = lines.transform((rdd, time) = {
  // execute statement
  rdd.registerAsTable(data)
  sqlc.sql(query)
})

the variable `result ` is of type DStream[Row]. That is, the meta-information 
from the SchemaRDD is lost and, from what I understand, there is then no way to 
learn about the column names of the returned data, as this information is only 
encoded in the SchemaRDD. I would love to see a fix for this.

Thanks
Tobias





RE: Some question about SQL and streaming

2014-07-10 Thread Shao, Saisai
No specific plans to do so, since there has some functional loss like time 
based windowing function which is important for streaming sql. Also keep 
compatible with fast growing SparkSQL is quite hard. So no clear plans to 
submit to upstream.

-Jerry

From: Tobias Pfeiffer [mailto:t...@preferred.jp]
Sent: Friday, July 11, 2014 10:47 AM
To: user@spark.apache.org
Subject: Re: Some question about SQL and streaming

Hi,

On Fri, Jul 11, 2014 at 11:38 AM, Shao, Saisai 
saisai.s...@intel.commailto:saisai.s...@intel.com wrote:
Actually we have a POC project which shows the power of combining Spark 
Streaming and Catalyst, it can manipulate SQL on top of Spark Streaming and get 
SchemaDStream. You can take a look at it: 
https://github.com/thunderain-project/StreamSQL

Wow, that looks great! Any plans to get this code (or functionality) merged 
into Spark?

Tobias



  1   2   >