CVE-2019-10099: Apache Spark unencrypted data on local disk

2019-08-06 Thread Imran Rashid
 Severity: Important

Vendor: The Apache Software Foundation

Versions affected:
All Spark 1.x, Spark 2.0.x, Spark 2.1.x, and 2.2.x versions
Spark 2.3.0 to 2.3.2


Description:
Prior to Spark 2.3.3, in certain situations Spark would write user data to
local disk unencrypted, even if spark.io.encryption.enabled=true.  This
includes cached blocks that are fetched to disk (controlled by
spark.maxRemoteBlockSizeFetchToMem); in SparkR, using parallelize; in
Pyspark, using broadcast and parallelize; and use of python udfs.


Mitigation:
1.x, 2.0.x, 2.1.x, 2.2.x, 2.3.x  users should upgrade to 2.3.3 or newer,
including 2.4.x.

Credit:
This issue was reported by Thomas Graves of NVIDIA.

References:
https://spark.apache.org/security.html
https://issues.apache.org/jira/browse/SPARK-28626


Re: [SHUFFLE]FAILED_TO_UNCOMPRESS(5) errors when fetching shuffle data with sort-based shuffle

2019-03-12 Thread Imran Rashid
We haven't seen many of these, but we have seen it a couple of times --
there is ongoing work under SPARK-26089 to address the issue we know about,
namely that we don't detect corruption in large shuffle blocks.

Do you believe the cases you have match that -- does it appear to be
corruption in large shuffle blocks?
Or do you not have compression or encryption enabled?  Both the prior
solution and the work under SPARK-26089 only work if either one of those is
enabled.

On Tue, Mar 12, 2019 at 9:36 AM Vadim Semenov  wrote:

> I/We have seen this error before on 1.6 but ever since we upgraded to 2.1
> two years ago we haven't seen it
>
> On Tue, Mar 12, 2019 at 2:19 AM wangfei  wrote:
>
>> Hi all,
>>  Non-deterministic FAILED_TO_UNCOMPRESS(5) or ’Stream is corrupted’  
>> errors
>> may occur during shuffle read, described as this JIRA(
>> https://issues.apache.org/jira/browse/SPARK-4105).
>>  There is not new comment for a long time in this JIRA.  So,  Is
>> there anyone seen these errors in latest version, such as spark-2.3?
>>  Can anyone provide a reproducible case or  analyze the cause of
>> these errors?
>>  Thanks.
>>
>
>
> --
> Sent from my iPhone
>


Re: CVE-2018-11760: Apache Spark local privilege escalation vulnerability

2019-01-31 Thread Imran Rashid
I received some questions about what the exact change was which fixed the
issue, and the PMC decided to post info in jira to make it easier for the
community to track.  The relevant details are all on

https://issues.apache.org/jira/browse/SPARK-26802

On Mon, Jan 28, 2019 at 1:08 PM Imran Rashid  wrote:

> Severity: Important
>
> Vendor: The Apache Software Foundation
>
> Versions affected:
> All Spark 1.x, Spark 2.0.x, and Spark 2.1.x versions
> Spark 2.2.0 to 2.2.2
> Spark 2.3.0 to 2.3.1
>
> Description:
> When using PySpark , it's possible for a different local user to connect
> to the Spark application and impersonate the user running the Spark
> application.  This affects versions 1.x, 2.0.x, 2.1.x, 2.2.0 to 2.2.2, and
> 2.3.0 to 2.3.1.
>
> Mitigation:
> 1.x, 2.0.x, 2.1.x, and 2.2.x users should upgrade to 2.2.3 or newer
> 2.3.x users should upgrade to 2.3.2 or newer
> Otherwise, affected users should avoid using PySpark in multi-user
> environments.
>
> Credit:
> This issue was reported by Luca Canali and Jose Carlos Luna Duran from
> CERN.
>
> References:
> https://spark.apache.org/security.html
>


CVE-2018-11760: Apache Spark local privilege escalation vulnerability

2019-01-28 Thread Imran Rashid
Severity: Important

Vendor: The Apache Software Foundation

Versions affected:
All Spark 1.x, Spark 2.0.x, and Spark 2.1.x versions
Spark 2.2.0 to 2.2.2
Spark 2.3.0 to 2.3.1

Description:
When using PySpark , it's possible for a different local user to connect to
the Spark application and impersonate the user running the Spark
application.  This affects versions 1.x, 2.0.x, 2.1.x, 2.2.0 to 2.2.2, and
2.3.0 to 2.3.1.

Mitigation:
1.x, 2.0.x, 2.1.x, and 2.2.x users should upgrade to 2.2.3 or newer
2.3.x users should upgrade to 2.3.2 or newer
Otherwise, affected users should avoid using PySpark in multi-user
environments.

Credit:
This issue was reported by Luca Canali and Jose Carlos Luna Duran from CERN.

References:
https://spark.apache.org/security.html


Re: Spark on Yarn, is it possible to manually blacklist nodes before running spark job?

2019-01-23 Thread Imran Rashid
Serga, can you explain a bit more why you want this ability?
If the node is really bad, wouldn't you want to decomission the NM entirely?
If you've got heterogenous resources, than nodelabels seem like they would
be more appropriate -- and I don't feel great about adding workarounds for
the node-label limitations into blacklisting.

I don't want to be stuck supporting a configuration with too limited a use
case.

(may be better to move discussion to
https://issues.apache.org/jira/browse/SPARK-26688 so its better archived,
I'm responding here in case you aren't watching that issue)

On Tue, Jan 22, 2019 at 6:09 AM Jörn Franke  wrote:

> You can try with Yarn node labels:
>
> https://hadoop.apache.org/docs/stable/hadoop-yarn/hadoop-yarn-site/NodeLabel.html
>
> Then you can whitelist nodes.
>
> Am 19.01.2019 um 00:20 schrieb Serega Sheypak :
>
> Hi, is there any possibility to tell Scheduler to blacklist specific nodes
> in advance?
>
>


Re: Heap Memory in Spark 2.3.0

2018-07-17 Thread Imran Rashid
perhaps this is https://issues.apache.org/jira/browse/SPARK-24578?

that was reported as a performance issue, not OOMs, but its in the exact
same part of the code and the change was to reduce the memory pressure
significantly.

On Mon, Jul 16, 2018 at 1:43 PM, Bryan Jeffrey 
wrote:

> Hello.
>
> I am working to move our system from Spark 2.1.0 to Spark 2.3.0.  Our
> system is running on Spark managed via Yarn.  During the course of the move
> I mirrored the settings to our new cluster.  However, on the Spark 2.3.0
> cluster with the same resource allocation I am seeing a number of executors
> die due to OOM:
>
> 18/07/16 17:23:06 ERROR YarnClusterScheduler: Lost executor 5 on wn80:
> Container killed by YARN for exceeding memory limits. 22.0 GB of 22 GB
> physical memory used. Consider boosting spark.yarn.executor.memoryOver
> head.
>
> I increased spark.driver.memoryOverhead and spark.executor.memoryOverhead
> from the default (384) to 2048.  I went ahead and disabled vmem and pmem
> Yarn checks on the cluster.  With that disabled I see the following error:
>
> Caused by: java.lang.OutOfMemoryError: Java heap space
>   at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
>   at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
>   at 
> io.netty.buffer.CompositeByteBuf.nioBuffer(CompositeByteBuf.java:1466)
>   at io.netty.buffer.AbstractByteBuf.nioBuffer(AbstractByteBuf.java:1203)
>   at 
> org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:140)
>   at 
> org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123)
>   at 
> io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355)
>   at 
> io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224)
>   at 
> io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382)
>   at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934)
>   at 
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362)
>   at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901)
>   at 
> io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
>   at 
> io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
>   at 
> io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:802)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:814)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:794)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:831)
>   at 
> io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:1041)
>   at 
> io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:300)
>   at 
> org.apache.spark.network.server.TransportRequestHandler.respond(TransportRequestHandler.java:222)
>   at 
> org.apache.spark.network.server.TransportRequestHandler.processFetchRequest(TransportRequestHandler.java:146)
>   at 
> org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:109)
>   at 
> org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:118)
>
>
>
> Looking at GC:
>
>[Eden: 16.0M(8512.0M)->0.0B(8484.0M) Survivors: 4096.0K->4096.0K Heap: 
> 8996.7M(20.0G)->8650.3M(20.0G)]
>  [Times: user=0.03 sys=0.01, real=0.01 secs]
>  794.949: [G1Ergonomics (Heap Sizing) attempt heap expansion, reason: 
> allocation request failed, allocation request: 401255000 bytes]
>  794.949: [G1Ergonomics (Heap Sizing) expand the heap, requested expansion 
> amount: 401255000 bytes, attempted expansion amount: 402653184 bytes]
>  794.949: [G1Ergonomics (Heap Sizing) did not expand the heap, reason: heap 
> already fully expanded]
> 

Re: Spark Job Hangs on our production cluster

2015-08-18 Thread Imran Rashid
just looking at the thread dump from your original email, the 3 executor
threads are all trying to load classes.  (One thread is actually loading
some class, and the others are blocked waiting to load a class, most likely
trying to load the same thing.)  That is really weird, definitely not
something which should keep things blocked for 30 min.  It suggest
something wrong w/ the jvm, or classpath configuration, or a combination.
Looks like you are trying to run in the repl, and for whatever reason the
http server for the repl to serve classes is not responsive.  I'd try
running outside of the repl and see if that works.

sorry not a full diagnosis but maybe this'll help a bit.

On Tue, Aug 11, 2015 at 3:19 PM, java8964 java8...@hotmail.com wrote:

 Currently we have a IBM BigInsight cluster with 1 namenode + 1 JobTracker
 + 42 data/task nodes, which runs with BigInsight V3.0.0.2, corresponding
 with Hadoop 2.2.0 with MR1.

 Since IBM BigInsight doesn't come with Spark, so we build Spark 1.2.2 with
 Hadoop 2.2.0 + Hive 0.12 by ourselves, and deploy it on the same cluster.

 The IBM Biginsight comes with IBM jdk 1.7, but during our experience on
 stage environment, we found out Spark works better with Oracle JVM. So we
 run spark under Oracle JDK 1.7.0_79.

 Now on production, we are facing a issue we never faced, nor can reproduce
 on our staging cluster.

 We are using Spark Standalone cluster, and here is the basic
 configurations:

 more spark-env.sh
 export JAVA_HOME=/opt/java
 export PATH=$JAVA_HOME/bin:$PATH
 export HADOOP_CONF_DIR=/opt/ibm/biginsights/hadoop-conf/
 export
 SPARK_CLASSPATH=/opt/ibm/biginsights/IHC/lib/ibm-compression.jar:/opt/ibm/biginsights/hive/lib
 /db2jcc4-10.6.jar
 export
 SPARK_LOCAL_DIRS=/data1/spark/local,/data2/spark/local,/data3/spark/local
 export SPARK_MASTER_WEBUI_PORT=8081
 export SPARK_MASTER_IP=host1
 export SPARK_MASTER_OPTS=-Dspark.deploy.defaultCores=42
 export SPARK_WORKER_MEMORY=24g
 export SPARK_WORKER_CORES=6
 export SPARK_WORKER_DIR=/tmp/spark/work
 export SPARK_DRIVER_MEMORY=2g
 export SPARK_EXECUTOR_MEMORY=2g

 more spark-defaults.conf
 spark.master spark://host1:7077
 spark.eventLog.enabled true
 spark.eventLog.dir hdfs://host1:9000/spark/eventLog
 spark.serializer org.apache.spark.serializer.KryoSerializer
 spark.executor.extraJavaOptions -verbose:gc -XX:+PrintGCDetails
 -XX:+PrintGCTimeStamps

 We are using AVRO file format a lot, and we have these 2 datasets, one is
 about 96G, and the other one is a little over 1T. Since we are using AVRO,
 so we also built spark-avro of commit 
 a788c9fce51b0ec1bb4ce88dc65c1d55aaa675b8
 https://github.com/databricks/spark-avro/tree/a788c9fce51b0ec1bb4ce88dc65c1d55aaa675b8,
 which is the latest version supporting Spark 1.2.x.

 Here is the problem we are facing on our production cluster, even the
 following simple spark-shell commands will hang in our production cluster:

 import org.apache.spark.sql.SQLContext
 val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
 import com.databricks.spark.avro._
 val bigData = sqlContext.avroFile(hdfs://namenode:9000/bigData/)
 bigData.registerTempTable(bigData)
 bigData.count()

 From the console,  we saw following:
 [Stage 0:
 (44 + 42) / 7800]

 no update for more than 30 minutes and longer.

 The big dataset with 1T should generate 7800 HDFS block, but Spark's HDFS
 client looks like having problem to read them. Since we are running Spark
 on the data nodes, all the Spark tasks are running as NODE_LOCAL on
 locality level.

 If I go to the data/task node which Spark tasks hang, and use the JStack
 to dump the thread, I got the following on the top:

 015-08-11 15:38:38
 Full thread dump Java HotSpot(TM) 64-Bit Server VM (24.79-b02 mixed mode):

 Attach Listener daemon prio=10 tid=0x7f0660589000 nid=0x1584d
 waiting on condition [0x]
java.lang.Thread.State: RUNNABLE

 org.apache.hadoop.hdfs.PeerCache@4a88ec00 daemon prio=10
 tid=0x7f06508b7800 nid=0x13302 waiting on condition [0x7f060be94000]
java.lang.Thread.State: TIMED_WAITING (sleeping)
 at java.lang.Thread.sleep(Native Method)
 at org.apache.hadoop.hdfs.PeerCache.run(PeerCache.java:252)
 at org.apache.hadoop.hdfs.PeerCache.access$000(PeerCache.java:39)
 at org.apache.hadoop.hdfs.PeerCache$1.run(PeerCache.java:135)
 at java.lang.Thread.run(Thread.java:745)

 shuffle-client-1 daemon prio=10 tid=0x7f0650687000 nid=0x132fc
 runnable [0x7f060d198000]
java.lang.Thread.State: RUNNABLE
 at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
 at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
 at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
 at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87)
 - locked 0x00067bf47710 (a
 io.netty.channel.nio.SelectedSelectionKeySet)
 - locked 0x00067bf374e8 (a
 java.util.Collections$UnmodifiableSet)
  

Re: Spark Job Hangs on our production cluster

2015-08-18 Thread Imran Rashid
sorry, by repl I mean spark-shell, I guess I'm used to them being used
interchangeably.  From that thread dump, the one thread that isn't stuck is
trying to get classes specifically related to the shell / repl:

   java.lang.Thread.State: RUNNABLE
 at java.net.SocketInputStream.socketRead0(Native Method)
 at java.net.SocketInputStream.read(SocketInputStream.java:152)
 at java.net.SocketInputStream.read(SocketInputStream.java:122)
 at java.io.BufferedInputStream.fill(BufferedInputStream.java:235)
 at java.io.BufferedInputStream.read1(BufferedInputStream.java:275)
 at java.io.BufferedInputStream.read(BufferedInputStream.java:334)
 - locked 0x00072477d530 (a java.io.BufferedInputStream)
 at sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:689)
 at sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:633)
 at
 sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1324)
 - locked 0x000724772bf8 (a
 sun.net.www.protocol.http.HttpURLConnection)
 at java.net.URL.openStream(URL.java:1037)
 at
 org.apache.spark.repl.ExecutorClassLoader.findClassLocally(ExecutorClassLoader.scala:86)
 at
 org.apache.spark.repl.ExecutorClassLoader.findClass(ExecutorClassLoader.scala:63)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:425)

...

thats because the repl needs to package up the code for every single line,
and it serves those compiled classes to each executor over http.  This
particular executor seems to be stuck pulling one of those lines compiled
in the repl.  (This is all assuming that the thread dump is the same over
the entire 30 minutes that spark seems to be stuck.)

Yes, the classes should be loaded for the first partition that is
processed. (there certainly could be cases where different classes are
needed for each partition, but it doesn't sound like you are doing anything
that would trigger this.)  But to be clear, in repl mode, there will be
additional classes to be sent with every single job.

Hope that helps a little more ... maybe there was some issue w/ 1.2.2,
though I didn't see anything with a quick search, hopefully you'll have
more luck w/ 1.3.1

On Tue, Aug 18, 2015 at 2:23 PM, java8964 java8...@hotmail.com wrote:

 Hi, Imran:

 Thanks for your reply. I am not sure what do you mean repl. Can you be
 more detail about that?

 This is only happened when the Spark 1.2.2 try to scan big data set, and
 cannot reproduce if it scans smaller dataset.

 FYI, I have to build and deploy Spark 1.3.1 on our production cluster.
 Right now, I cannot reproduce this hang problem on the same cluster for the
 same big dataset. On this point, we will continue trying Spark 1.3.1, hope
 we will have more positive experience with it.

 But just for wondering, what class Spark needs to be loaded at this time?
 From my understanding, the executor already scan the first block data from
 HDFS, and hanging while starting the 2nd block. All the class should be
 already loaded in JVM in this case.

 Thanks

 Yong

 --
 From: iras...@cloudera.com
 Date: Tue, 18 Aug 2015 12:17:56 -0500
 Subject: Re: Spark Job Hangs on our production cluster
 To: java8...@hotmail.com
 CC: user@spark.apache.org


 just looking at the thread dump from your original email, the 3 executor
 threads are all trying to load classes.  (One thread is actually loading
 some class, and the others are blocked waiting to load a class, most likely
 trying to load the same thing.)  That is really weird, definitely not
 something which should keep things blocked for 30 min.  It suggest
 something wrong w/ the jvm, or classpath configuration, or a combination.
 Looks like you are trying to run in the repl, and for whatever reason the
 http server for the repl to serve classes is not responsive.  I'd try
 running outside of the repl and see if that works.

 sorry not a full diagnosis but maybe this'll help a bit.

 On Tue, Aug 11, 2015 at 3:19 PM, java8964 java8...@hotmail.com wrote:

 Currently we have a IBM BigInsight cluster with 1 namenode + 1 JobTracker
 + 42 data/task nodes, which runs with BigInsight V3.0.0.2, corresponding
 with Hadoop 2.2.0 with MR1.

 Since IBM BigInsight doesn't come with Spark, so we build Spark 1.2.2 with
 Hadoop 2.2.0 + Hive 0.12 by ourselves, and deploy it on the same cluster.

 The IBM Biginsight comes with IBM jdk 1.7, but during our experience on
 stage environment, we found out Spark works better with Oracle JVM. So we
 run spark under Oracle JDK 1.7.0_79.

 Now on production, we are facing a issue we never faced, nor can reproduce
 on our staging cluster.

 We are using Spark Standalone cluster, and here is the basic
 configurations:

 more spark-env.sh
 export JAVA_HOME=/opt/java
 export PATH=$JAVA_HOME/bin:$PATH
 export HADOOP_CONF_DIR=/opt/ibm/biginsights/hadoop-conf/
 export
 

Re: Spark runs into an Infinite loop even if the tasks are completed successfully

2015-08-13 Thread Imran Rashid
oh I see, you are defining your own RDD  Partition types, and you had a
bug where partition.index did not line up with the partitions slot in
rdd.getPartitions.  Is that correct?

On Thu, Aug 13, 2015 at 2:40 AM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 I figured that out, And these are my findings:

 - It just enters in an infinite loop when there's a duplicate partition
 id.

 - It enters in an infinite loop when the partition id starts from 1
 rather than 0


 Something like this piece of code can reproduce it: (in getPartitions())

 val total_partitions = 4
 val partitionsArray: Array[Partition] =
 Array.ofDim[Partition](total_partitions)

 var i = 0

 for(outer - 0 to 1){
   for(partition - 1 to total_partitions){
 partitionsArray(i) = new DeadLockPartitions(partition)
 i = i + 1
   }
 }

 partitionsArray




 Thanks
 Best Regards

 On Wed, Aug 12, 2015 at 10:57 PM, Imran Rashid iras...@cloudera.com
 wrote:

 yikes.

 Was this a one-time thing?  Or does it happen consistently?  can you turn
 on debug logging for o.a.s.scheduler (dunno if it will help, but maybe ...)

 On Tue, Aug 11, 2015 at 8:59 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Hi

 My Spark job (running in local[*] with spark 1.4.1) reads data from a
 thrift server(Created an RDD, it will compute the partitions in
 getPartitions() call and in computes hasNext will return records from these
 partitions), count(), foreach() is working fine it returns the correct
 number of records. But whenever there is shuffleMap stage (like reduceByKey
 etc.) then all the tasks are executing properly but it enters in an
 infinite loop saying :


1. 15/08/11 13:05:54 INFO DAGScheduler: Resubmitting ShuffleMapStage
1 (map at FilterMain.scala:59) because some of its tasks had failed:
0, 3


 Here's the complete stack-trace http://pastebin.com/hyK7cG8S

 What could be the root cause of this problem? I looked up and bumped
 into this closed JIRA https://issues.apache.org/jira/browse/SPARK-583
 (which is very very old)




 Thanks
 Best Regards






Re: Spark runs into an Infinite loop even if the tasks are completed successfully

2015-08-12 Thread Imran Rashid
yikes.

Was this a one-time thing?  Or does it happen consistently?  can you turn
on debug logging for o.a.s.scheduler (dunno if it will help, but maybe ...)

On Tue, Aug 11, 2015 at 8:59 AM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 Hi

 My Spark job (running in local[*] with spark 1.4.1) reads data from a
 thrift server(Created an RDD, it will compute the partitions in
 getPartitions() call and in computes hasNext will return records from these
 partitions), count(), foreach() is working fine it returns the correct
 number of records. But whenever there is shuffleMap stage (like reduceByKey
 etc.) then all the tasks are executing properly but it enters in an
 infinite loop saying :


1. 15/08/11 13:05:54 INFO DAGScheduler: Resubmitting ShuffleMapStage 1
(map at FilterMain.scala:59) because some of its tasks had failed: 0, 3


 Here's the complete stack-trace http://pastebin.com/hyK7cG8S

 What could be the root cause of this problem? I looked up and bumped into
 this closed JIRA https://issues.apache.org/jira/browse/SPARK-583 (which
 is very very old)




 Thanks
 Best Regards



Re: takeSample() results in two stages

2015-06-12 Thread Imran Rashid
It launches two jobs because it doesn't know ahead of time how big your RDD
is, so it doesn't know what the sampling rate should be.  After counting
all the records, it can determine what the sampling rate should be -- then
it does another pass through the data, sampling by the rate its just
determined.

Note that this suggests: (a) if you know the size of your RDD ahead of
time, you could eliminate that first pass and (b) since you end up
computing the input RDD twice, it may make sense to cache it.

On Thu, Jun 11, 2015 at 11:43 AM, barmaley o...@solver.com wrote:

 I've observed interesting behavior in Spark 1.3.1, the reason for which is
 not clear.

 Doing something as simple as sc.textFile(...).takeSample(...) always
 results in two stages:Spark's takeSample() results in two stages

 
 http://apache-spark-user-list.1001560.n3.nabble.com/file/n23280/Capture.jpg
 



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/takeSample-results-in-two-stages-tp23280.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: flatMap output on disk / flatMap memory overhead

2015-06-09 Thread Imran Rashid
I agree with Richard.  It looks like the issue here is shuffling, and
shuffle data is always written to disk, so the issue is definitely not that
all the output of flatMap has to be stored in memory.

If at all possible, I'd first suggest upgrading to a new version of spark
-- even in 1.2, there were big improvements to shuffle with sort based
shuffle as the default.

On Tue, Jun 2, 2015 at 1:09 PM, Richard Marscher rmarsc...@localytics.com
wrote:

 Are you sure it's memory related? What is the disk utilization and IO
 performance on the workers? The error you posted looks to be related to
 shuffle trying to obtain block data from another worker node and failing to
 do so in reasonable amount of time. It may still be memory related, but I'm
 not sure that other resources are ruled out yet.

 On Tue, Jun 2, 2015 at 5:10 AM, octavian.ganea octavian.ga...@inf.ethz.ch
  wrote:

 I was tried using reduceByKey, without success.

 I also tried this: rdd.persist(MEMORY_AND_DISK).flatMap(...).reduceByKey .
 However, I got the same error as before, namely the error described here:

 http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-td23098.html

 My task is to count the frequencies of pairs of words that occur in a set
 of
 documents at least 5 times. I know that this final output is sparse and
 should comfortably fit in memory. However, the intermediate pairs that are
 spilled by flatMap might need to be stored on the disk, but I don't
 understand why the persist option does not work and my job fails.

 My code:

 rdd.persist(StorageLevel.MEMORY_AND_DISK)
  .flatMap(x = outputPairsOfWords(x)) // outputs pairs of type
 ((word1,word2) , 1)
 .reduceByKey((a,b) = (a + b).toShort)
 .filter({case((x,y),count) = count = 5})


 My cluster has 8 nodes, each with 129 GB of RAM and 16 cores per node. One
 node I keep for the master, 7 nodes for the workers.

 my conf:

 conf.set(spark.cores.max, 128)
 conf.set(spark.akka.frameSize, 1024)
 conf.set(spark.executor.memory, 115g)
 conf.set(spark.shuffle.file.buffer.kb, 1000)

 my spark-env.sh:
  ulimit -n 20
  SPARK_JAVA_OPTS=-Xss1g -Xmx129g -d64 -XX:-UseGCOverheadLimit
 -XX:-UseCompressedOops
  SPARK_DRIVER_MEMORY=129G

 spark version: 1.1.1

 Thank you a lot for your help!



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-tp23098p23108.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Question about Serialization in Storage Level

2015-05-27 Thread Imran Rashid
Hi Zhipeng,

yes, your understanding is correct.  the SER portion just refers to how
its stored in memory.  On disk, the data always has to be serialized.


On Fri, May 22, 2015 at 10:40 PM, Jiang, Zhipeng zhipeng.ji...@intel.com
wrote:

  Hi Todd, Howard,



 Thanks for your reply, I might not present my question clearly.



 What I mean is, if I call *rdd.persist(StorageLevel.MEMORY_AND_DISK)*,
 the BlockManager will cache the rdd to MemoryStore. RDD will be migrated to
 DiskStore when it cannot fit in memory. I think this migration does require
 data serialization and compression (if spark.rdd.compress is set to be
 true). So the data in Disk is serialized, even if I didn’t choose a
 serialized storage level, am I right?



 Thanks,

 Zhipeng





 *From:* Todd Nist [mailto:tsind...@gmail.com]
 *Sent:* Thursday, May 21, 2015 8:49 PM
 *To:* Jiang, Zhipeng
 *Cc:* user@spark.apache.org
 *Subject:* Re: Question about Serialization in Storage Level



 From the docs,
 https://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence
 :



 *Storage Level*

 *Meaning*

 MEMORY_ONLY

 Store RDD as deserialized Java objects in the JVM. If the RDD does not fit
 in memory, some partitions will not be cached and will be recomputed on the
 fly each time they're needed. This is the default level.

 MEMORY_AND_DISK

 Store RDD as *deserialized* Java objects in the JVM. If the RDD does not
 fit in memory, store the partitions that don't fit on disk, and read them
 from there when they're needed.

 MEMORY_ONLY_SER

 Store RDD as *serialized* Java objects (one byte array per partition).
 This is generally more space-efficient than deserialized objects,
 especially when using a fast serializer
 https://spark.apache.org/docs/latest/tuning.html, but more
 CPU-intensive to read.

 MEMORY_AND_DISK_SER

 Similar to *MEMORY_ONLY_SER*, but spill partitions that don't fit in
 memory to disk instead of recomputing them on the fly each time they're
 needed.



 On Thu, May 21, 2015 at 3:52 AM, Jiang, Zhipeng zhipeng.ji...@intel.com
 wrote:

  Hi there,



 This question may seem to be kind of naïve, but what’s the difference
 between *MEMORY_AND_DISK* and *MEMORY_AND_DISK_SER*?



 If I call *rdd.persist(StorageLevel.MEMORY_AND_DISK)*, the BlockManager
 won’t serialize the *rdd*?



 Thanks,

 Zhipeng





Re: Spark and logging

2015-05-27 Thread Imran Rashid
only an answer to one of your questions:


What about log statements in the
 partition processing functions?  Will their log statements get logged into
 a
 file residing on a given 'slave' machine, or will Spark capture this log
 output and divert it into the log file of the driver's machine?


they get logged to files on the remote nodes.  You can view the logs for
each executor through the UI.  If you are using spark on yarn, you can grab
all the logs with yarn logs.


Re: FetchFailedException and MetadataFetchFailedException

2015-05-22 Thread Imran Rashid
 at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:933)
 at
 org.apache.spark.storage.DiskBlockManager$$anonfun$org$apache$spark$storage$DiskBlockManager$$doStop$1.apply(DiskBlockManager.scala:165)
 at
 org.apache.spark.storage.DiskBlockManager$$anonfun$org$apache$spark$storage$DiskBlockManager$$doStop$1.apply(DiskBlockManager.scala:162)
 at
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at
 scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
 at org.apache.spark.storage.DiskBlockManager.org
 $apache$spark$storage$DiskBlockManager$$doStop(DiskBlockManager.scala:162)
 at
 org.apache.spark.storage.DiskBlockManager.stop(DiskBlockManager.scala:156)
 at
 org.apache.spark.storage.BlockManager.stop(BlockManager.scala:1208)
 at org.apache.spark.SparkEnv.stop(SparkEnv.scala:88)
 at org.apache.spark.executor.Executor.stop(Executor.scala:146)
 at
 org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receiveWithLogging$1.applyOrElse(CoarseGrainedExecutorBackend.scala:105)
 at
 scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
 at
 scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
 at
 scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
 at
 org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:53)
 at
 org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42)
 at
 scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
 at
 org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42)
 at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
 at
 org.apache.spark.executor.CoarseGrainedExecutorBackend.aroundReceive(CoarseGrainedExecutorBackend.scala:38)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)

 On Tue, May 19, 2015 at 3:38 AM, Imran Rashid iras...@cloudera.com
 wrote:

 Hi,

 can you take a look at the logs and see what the first error you are
 getting is?  Its possible that the file doesn't exist when that error is
 produced, but it shows up later -- I've seen similar things happen, but
 only after there have already been some errors.  But, if you see that in
 the very first error, then Im not sure what the cause is.  Would be
 helpful for you to send the logs.

 Imran

 On Fri, May 15, 2015 at 10:07 AM, rok rokros...@gmail.com wrote:

 I am trying to sort a collection of key,value pairs (between several
 hundred
 million to a few billion) and have recently been getting lots of
 FetchFailedException errors that seem to originate when one of the
 executors doesn't seem to find a temporary shuffle file on disk. E.g.:

 org.apache.spark.shuffle.FetchFailedException:

 /hadoop/tmp/hadoop-hadoop/nm-local-dir/usercache/user/appcache/application_1426230650260_1044/blockmgr-453473e7-76c2-4a94-85d0-d0b75b515ad6/10/shuffle_0_264_0.index
 (No such file or directory)

 This file actually exists:

  ls -l
 
 /hadoop/tmp/hadoop-hadoop/nm-local-dir/usercache/user/appcache/application_1426230650260_1044/blockmgr-453473e7-76c2-4a94-85d0-d0b75b515ad6/10/shuffle_0_264_0.index

 -rw-r--r-- 1 hadoop hadoop 11936 May 15 16:52

 /hadoop/tmp/hadoop-hadoop/nm-local-dir/usercache/user/appcache/application_1426230650260_1044/blockmgr-453473e7-76c2-4a94-85d0-d0b75b515ad6/10/shuffle_0_264_0.index

 This error repeats on several executors and is followed by a number of

 org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output
 location for shuffle 0

 This results on most tasks being lost and executors dying.

 There is plenty of space on all of the appropriate filesystems, so none
 of
 the executors are running out of disk space. Any idea what might be
 causing
 this? I am running this via YARN on approximately 100 nodes with 2 cores
 per
 node. Any thoughts on what might be causing these errors? Thanks!



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/FetchFailedException-and-MetadataFetchFailedException-tp22901.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






Re: EOFException using KryoSerializer

2015-05-19 Thread Imran Rashid
Hi Jim,

this is definitley strange.  It sure sounds like a bug, but it also is a
very commonly used code path, so it at the very least you must be hitting a
corner case.  Could you share a little more info with us?  What version of
spark are you using?  How big is the object you are trying to broadcast?
Can you share more of the logs from before the exception?

It is not too surprising this shows up in mesos but not in local mode.
Local mode never exercises the part of the code that needs to deserialize
the blocks of a broadcast variables (though it actually does serialize the
data into blocks).  So I doubt its mesos specific, more likely it would
happen in any cluster mode -- yarn, standalone, or even local-cluster (a
pseudo-cluster just for testing).

Imran

On Tue, May 19, 2015 at 3:56 PM, Jim Carroll jimfcarr...@gmail.com wrote:

 I'm seeing the following exception ONLY when I run on a Mesos cluster. If I
 run the exact same code with master set to local[N] I have no problem:

  2015-05-19 16:45:43,484 [task-result-getter-0] WARN  TaskSetManager - Lost
 task 0.0 in stage 0.0 (TID 0, 10.253.1.101): java.io.EOFException
 at

 org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:142)
 at

 org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:216)
 at

 org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:177)
 at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1153)
 at

 org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164)
 at

 org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
 at

 org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
 at

 org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87)
 at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
 at
 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:58)
 at org.apache.spark.scheduler.Task.run(Task.scala:64)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
 at

 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at

 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)

 KryoSerializer explicitly throws an EOFException. The comment says:

 // DeserializationStream uses the EOF exception to indicate stopping
 condition.

 Apparently this isn't what TorrentBroadcast expects.

 Any suggestions? Thanks.

 Jim





 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/EOFException-using-KryoSerializer-tp22948.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Broadcast variables can be rebroadcast?

2015-05-19 Thread Imran Rashid
hmm, I guess it depends on the way you look at it.  In a way, I'm saying
that spark does *not* have any built in auto-re-broadcast if you try to
mutate a broadcast variable.  Instead, you should create something new, and
just broadcast it separately.  Then just have all the code you have
operating on your RDDs look at the new broadcast variable.

But I guess there is another way to look at it -- you are creating new
broadcast variables each time, but they all point to the same underlying
mutable data structure.  So in a way, you are rebroadcasting the same
underlying data structure.

Let me expand my example from earlier a little bit more:


def oneIteration(myRDD: RDD[...], myBroadcastVar: Broadcast[...]): Unit = {
 ...
}

// this is a val, because the data structure itself is mutable
val myMutableDataStructue = ...
// this is a var, because you will create new broadcasts
var myBroadcast = sc.broadcast(myMutableDataStructure)
(0 to 20).foreach { iteration =
  oneIteration(myRDD, myBroadcast)
  // update your mutable data structure in place
  myMutableDataStructure.update(...)
  // ... but that doesn't effect the broadcast variables living out on the
cluster, so we need to
  // create a new one

  // this line is not required -- the broadcast var will automatically get
unpersisted when a gc
  // cleans up the old broadcast on the driver, but I'm including this here
for completeness,
  // in case you want to more proactively clean up old blocks if you are
low on space
  myBroadcast.unpersist()

  // now we create a new broadcast which has the updated data in our
mutable data structure
  myBroadcast = sc.broadcast(myMutableDataStructure)
}


hope this clarifies things!

Imran

On Tue, May 19, 2015 at 3:06 AM, N B nb.nos...@gmail.com wrote:

 Hi Imran,

 If I understood you correctly, you are suggesting to simply call broadcast
 again from the driver program. This is exactly what I am hoping will work
 as I have the Broadcast data wrapped up and I am indeed (re)broadcasting
 the wrapper over again when the underlying data changes. However,
 documentation seems to suggest that one cannot re-broadcast. Is my
 understanding accurate?

 Thanks
 NB


 On Mon, May 18, 2015 at 6:24 PM, Imran Rashid iras...@cloudera.com
 wrote:

 Rather than updating the broadcast variable, can't you simply create a
 new one?  When the old one can be gc'ed in your program, it will also get
 gc'ed from spark's cache (and all executors).

 I think this will make your code *slightly* more complicated, as you need
 to add in another layer of indirection for which broadcast variable to use,
 but not too bad.  Eg., from

 var myBroadcast = sc.broadcast( ...)
 (0 to 20).foreach{ iteration =
   //  ... some rdd operations that involve myBroadcast ...
   myBroadcast.update(...) // wrong! dont' update a broadcast variable
 }

 instead do something like:

 def oneIteration(myRDD: RDD[...], myBroadcastVar: Broadcast[...]): Unit =
 {
  ...
 }

 var myBroadcast = sc.broadcast(...)
 (0 to 20).foreach { iteration =
   oneIteration(myRDD, myBroadcast)
   var myBroadcast = sc.broadcast(...) // create a NEW broadcast here,
 with whatever you need to update it
 }

 On Sat, May 16, 2015 at 2:01 AM, N B nb.nos...@gmail.com wrote:

 Thanks Ayan. Can we rebroadcast after updating in the driver?

 Thanks
 NB.


 On Fri, May 15, 2015 at 6:40 PM, ayan guha guha.a...@gmail.com wrote:

 Hi

 broadcast variables are shipped for the first time it is accessed in a
 transformation to the executors used by the transformation. It will NOT
 updated subsequently, even if the value has changed. However, a new value
 will be shipped to any new executor comes into play after the value has
 changed. This way, changing value of broadcast variable is not a good idea
 as it can create inconsistency within cluster. From documentatins:

  In addition, the object v should not be modified after it is
 broadcast in order to ensure that all nodes get the same value of the
 broadcast variable


 On Sat, May 16, 2015 at 10:39 AM, N B nb.nos...@gmail.com wrote:

 Thanks Ilya. Does one have to call broadcast again once the underlying
 data is updated in order to get the changes visible on all nodes?

 Thanks
 NB


 On Fri, May 15, 2015 at 5:29 PM, Ilya Ganelin ilgan...@gmail.com
 wrote:

 The broadcast variable is like a pointer. If the underlying data
 changes then the changes will be visible throughout the cluster.
 On Fri, May 15, 2015 at 5:18 PM NB nb.nos...@gmail.com wrote:

 Hello,

 Once a broadcast variable is created using sparkContext.broadcast(),
 can it
 ever be updated again? The use case is for something like the
 underlying
 lookup data changing over time.

 Thanks
 NB




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Broadcast-variables-can-be-rebroadcast-tp22908.html
 Sent from the Apache Spark User List mailing list archive at
 Nabble.com.

 -
 To unsubscribe

Re: Error communicating with MapOutputTracker

2015-05-18 Thread Imran Rashid
On Fri, May 15, 2015 at 5:09 PM, Thomas Gerber thomas.ger...@radius.com
wrote:

 Now, we noticed that we get java heap OOM exceptions on the output tracker
 when we have too many tasks. I wonder:
 1. where does the map output tracker live? The driver? The master (when
 those are not the same)?
 2. how can we increase the heap for it? Especially when using spark-submit?


It does not live on the master -- that is only in a standalone cluster, and
it does very little work.  (Though there are *Master and *Worker variants
of the class, its really running on the driver and the executors.)  If you
are getting OOMs in the MapOutputTrackerMaster (which lives on the driver),
then you can increase the memory for the driver via the normal args for
controlling driver memory, with --driver-memory 10G or whatever.

Just to be clear, if you hit an OOM from somewhere in the MapOutputTracker
code, it just means that code is what pushed things over the top.  Of
course you could have 99% of your memory used by something else, perhaps
your own data structures, which perhaps could be trimmed down.  You could
get a heap dump on the driver to see where the memory is really getting
used.

Do you mind sharing the details of how you hit these OOMs?  How much
memory, how many partitions on each side of the shuffle?  Sort based
shuffle I assume?

thanks,
Imran


Re: applications are still in progress?

2015-05-18 Thread Imran Rashid
Most likely, you never call sc.stop().

Note that in 1.4, this will happen for you automatically in a shutdown
hook, taken care of by https://issues.apache.org/jira/browse/SPARK-3090

On Wed, May 13, 2015 at 8:04 AM, Yifan LI iamyifa...@gmail.com wrote:

 Hi,

 I have some applications finished(but actually failed before), that in
 WebUI show
 Application myApp is still in progress.

 and, in the eventlog folder, there are several log files like this:

 app-20150512***.inprogress

 So, I am wondering what the “inprogress” means…

 Thanks! :)


 Best,
 Yifan LI








Re: parallelism on binary file

2015-05-18 Thread Imran Rashid
You can use sc.hadoopFile (or any of the variants) to do what you want.
They even let you reuse your existing HadoopInputFormats.  You should be
able to mimic your old use with MR just fine.  sc.textFile is just a
convenience method which sits on top.

imran

On Fri, May 8, 2015 at 12:03 PM, tog guillaume.all...@gmail.com wrote:

 Hi

 I havé an application that currently run using MR. It currently starts
 extracting information from a proprietary binary file that is copied to
 HDFS. The application starts by creating business objects from information
 extracted from the binary files. Later those objects are used for further
 processing using again MR jobs.

 I am planning to move towards Spark and I clearly see that I could use
 JavaRDDbusinessObjects for parallel processing. however it is not yet
 obvious what could be the process to generate this RDD from my binary file
 in parallel.

 Today I use parallelism based on the split assign to each of the map
 elements of a job. Can I mimick such a thing using spark. All example I
 have seen so far are using text files for which I guess the partitions are
 based on a given number of contiguous lines.

 Any help or pointer would be appreciated

 Cheers
 Guillaume



 --
 PGP KeyID: 2048R/EA31CFC9  subkeys.pgp.net



Re: spark log field clarification

2015-05-18 Thread Imran Rashid
depends what you mean by output data.  Do you mean:

* the data that is sent back to the driver? that is result size
* the shuffle output?  that is in Shuffle Write Metrics
* the data written to a hadoop output format?  that is in Output Metrics

On Thu, May 14, 2015 at 2:22 PM, yanwei echo@gmail.com wrote:

 I am trying to extract the *output data size* information for *each task*.
 What *field(s)* should I look for, given the json-format log?

 Also, what does Result Size stand for?

 Thanks a lot in advance!
 -Yanwei



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/spark-log-field-clarification-tp22892.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: LogisticRegressionWithLBFGS with large feature set

2015-05-18 Thread Imran Rashid
I'm not super familiar with this part of the code, but from taking a quick
look:

a) the code creates a MultivariateOnlineSummarizer, which stores 7 doubles
per feature (mean, max, min, etc. etc.)
b) The limit is on the result size from *all* tasks, not from one task.
You start with 3072 tasks
c) tree aggregate should first merge things down to about 8 partitions
before bringing results back to the driver, which is how you end up with 54
tasks at your failure.

this means you should have about 30 MB / per task per meaure * 54 tasks * 7
measures, which comes to about 11GB, or in the ballpark of what you found.

In principle, you could get this working by adding more levels to the
treeAggregate (the depth parameter), but looks like that isn't exposed.
You could also try coalescing your data down to a smaller set of partitions
first, but that comes with other downsides.

Perhaps an MLLib expert could chime in on an alternate approach.  My
feeling (from a very quick look) is that there is room for some
optimization in the internals

Imran

On Thu, May 14, 2015 at 5:44 PM, Pala M Muthaia mchett...@rocketfuelinc.com
 wrote:

 Hi,

 I am trying to validate our modeling data pipeline by running
 LogisticRegressionWithLBFGS on a dataset with ~3.7 million features,
 basically to compute AUC. This is on Spark 1.3.0.

 I am using 128 executors with 4 GB each + driver with 8 GB. The number of
 data partitions is 3072

 The execution fails with the following messages:

 *Total size of serialized results of 54 tasks (10.4 GB) is bigger than
 spark.driver.maxResultSize (3.0 GB)*

 The associated stage in the job is treeAggregate at
 StandardScaler.scala:52
 http://lsv-10.rfiserve.net:18080/history/application_1426202183036_633264/stages/stage?id=3attempt=0
  :
 The call stack looks as below:

 org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:996)
 org.apache.spark.mllib.feature.StandardScaler.fit(StandardScaler.scala:52)
 org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm.run(GeneralizedLinearAlgorithm.scala:233)
 org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm.run(GeneralizedLinearAlgorithm.scala:190)


 I am trying to both understand why such large amount of data needs to be
 passed back to driver as well as figure out a way around this. I also want
 to understand how much memory is required, as a function of dataset size,
 feature set size, and number of iterations performed, for future
 experiments.

 From looking at the MLLib code, the largest data structure seems to be a
 dense vector of the same size as feature set. I am not familiar with
 algorithm or its implementation I would guess 3.7 million features would
 lead to a constant multiple of ~3.7 * 8 ~ 30 MB. So how does the dataset
 size become so large?

 I looked into the treeAggregate and it looks like hierarchical
 aggregation. If the data being sent to the driver is basically the
 aggregated coefficients (i.e. dense vectors) for the final aggregation,
 can't the dense vectors from executors be pulled in one at a time and
 merged in memory, rather than pulling all of them in together? (This is
 totally uneducated guess so i may be completely off here).

 Is there a way to get this running?

 Thanks,
 pala



Re: com.esotericsoftware.kryo.KryoException: java.io.IOException: Stream is corrupted

2015-05-18 Thread Imran Rashid
Looks like this exception is after many more failures have occurred.  It is
already on attempt 6 for stage 7 -- I'd try to find out why attempt 0
failed.

This particular exception is probably a result of corruption that can
happen when stages are retried, that I'm working on addressing in
https://issues.apache.org/jira/browse/SPARK-7308.  But your real problem is
figuring out why the stage failed in the first place.


On Wed, May 13, 2015 at 6:01 AM, Yifan LI iamyifa...@gmail.com wrote:

 Hi,

 I was running our graphx application(worked finely on Spark 1.2.0) but
 failed on Spark 1.3.1 with below exception.

 Anyone has idea on this issue? I guess it was caused by using LZ4 codec?

 Exception in thread main org.apache.spark.SparkException: Job aborted
 due to stage failure: Task 54 in stage 7.6 failed 128 times, most recent
 failure: Lost task 54.127 in stage 7.6 (TID 5311,
 small15-tap1.common.lip6.fr): com.esotericsoftware.kryo.KryoException:
 java.io.IOException: Stream is corrupted
 at com.esotericsoftware.kryo.io.Input.fill(Input.java:142)
 at com.esotericsoftware.kryo.io.Input.require(Input.java:155)
 at com.esotericsoftware.kryo.io.Input.readInt(Input.java:337)
 at
 com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:109)
 at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:610)
 at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:721)
 at
 org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:138)
 at
 org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
 at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
 at
 org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
 at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
 at
 org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
 at
 org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at
 org.apache.spark.graphx.impl.ShippableVertexPartition$.apply(ShippableVertexPartition.scala:60)
 at org.apache.spark.graphx.VertexRDD$$anonfun$2.apply(VertexRDD.scala:300)
 at org.apache.spark.graphx.VertexRDD$$anonfun$2.apply(VertexRDD.scala:297)
 at
 org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
 at
 org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
 at org.apache.spark.scheduler.Task.run(Task.scala:64)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)
 Caused by: java.io.IOException: Stream is corrupted
 at net.jpountz.lz4.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:152)
 at net.jpountz.lz4.LZ4BlockInputStream.read(LZ4BlockInputStream.java:116)
 at com.esotericsoftware.kryo.io.Input.fill(Input.java:140)
 ... 35 more

 Driver stacktrace:
 at org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at
 org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
 at scala.Option.foreach(Option.scala:236)
 at
 org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
 at
 org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
 at
 

Re: Spark on Yarn : Map outputs lifetime ?

2015-05-18 Thread Imran Rashid
Neither of those two.  Instead, the shuffle data is cleaned up when the
stage they are from get GC'ed by the jvm.  that is, when you are no longer
holding any references to anything which points to the old stages, and
there is an appropriate gc event.

The data is not cleaned up right after the stage completes, because it
might get used again by another later (eg., if the stage is retried).

On Tue, May 12, 2015 at 6:50 PM, Ashwin Shankar ashwinshanka...@gmail.com
wrote:

 Hi,
 In spark on yarn and when running spark_shuffle as auxiliary service on
 node manager, does map spills of a stage gets cleaned up once the next
 stage completes OR
 is it preserved till the app completes(ie waits for all the stages to
 complete) ?

 --
 Thanks,
 Ashwin






Re: Broadcast variables can be rebroadcast?

2015-05-18 Thread Imran Rashid
Rather than updating the broadcast variable, can't you simply create a
new one?  When the old one can be gc'ed in your program, it will also get
gc'ed from spark's cache (and all executors).

I think this will make your code *slightly* more complicated, as you need
to add in another layer of indirection for which broadcast variable to use,
but not too bad.  Eg., from

var myBroadcast = sc.broadcast( ...)
(0 to 20).foreach{ iteration =
  //  ... some rdd operations that involve myBroadcast ...
  myBroadcast.update(...) // wrong! dont' update a broadcast variable
}

instead do something like:

def oneIteration(myRDD: RDD[...], myBroadcastVar: Broadcast[...]): Unit = {
 ...
}

var myBroadcast = sc.broadcast(...)
(0 to 20).foreach { iteration =
  oneIteration(myRDD, myBroadcast)
  var myBroadcast = sc.broadcast(...) // create a NEW broadcast here, with
whatever you need to update it
}

On Sat, May 16, 2015 at 2:01 AM, N B nb.nos...@gmail.com wrote:

 Thanks Ayan. Can we rebroadcast after updating in the driver?

 Thanks
 NB.


 On Fri, May 15, 2015 at 6:40 PM, ayan guha guha.a...@gmail.com wrote:

 Hi

 broadcast variables are shipped for the first time it is accessed in a
 transformation to the executors used by the transformation. It will NOT
 updated subsequently, even if the value has changed. However, a new value
 will be shipped to any new executor comes into play after the value has
 changed. This way, changing value of broadcast variable is not a good idea
 as it can create inconsistency within cluster. From documentatins:

  In addition, the object v should not be modified after it is broadcast
 in order to ensure that all nodes get the same value of the broadcast
 variable


 On Sat, May 16, 2015 at 10:39 AM, N B nb.nos...@gmail.com wrote:

 Thanks Ilya. Does one have to call broadcast again once the underlying
 data is updated in order to get the changes visible on all nodes?

 Thanks
 NB


 On Fri, May 15, 2015 at 5:29 PM, Ilya Ganelin ilgan...@gmail.com
 wrote:

 The broadcast variable is like a pointer. If the underlying data
 changes then the changes will be visible throughout the cluster.
 On Fri, May 15, 2015 at 5:18 PM NB nb.nos...@gmail.com wrote:

 Hello,

 Once a broadcast variable is created using sparkContext.broadcast(),
 can it
 ever be updated again? The use case is for something like the
 underlying
 lookup data changing over time.

 Thanks
 NB




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Broadcast-variables-can-be-rebroadcast-tp22908.html
 Sent from the Apache Spark User List mailing list archive at
 Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





 --
 Best Regards,
 Ayan Guha





Re: FetchFailedException and MetadataFetchFailedException

2015-05-18 Thread Imran Rashid
Hi,

can you take a look at the logs and see what the first error you are
getting is?  Its possible that the file doesn't exist when that error is
produced, but it shows up later -- I've seen similar things happen, but
only after there have already been some errors.  But, if you see that in
the very first error, then Im not sure what the cause is.  Would be
helpful for you to send the logs.

Imran

On Fri, May 15, 2015 at 10:07 AM, rok rokros...@gmail.com wrote:

 I am trying to sort a collection of key,value pairs (between several
 hundred
 million to a few billion) and have recently been getting lots of
 FetchFailedException errors that seem to originate when one of the
 executors doesn't seem to find a temporary shuffle file on disk. E.g.:

 org.apache.spark.shuffle.FetchFailedException:

 /hadoop/tmp/hadoop-hadoop/nm-local-dir/usercache/user/appcache/application_1426230650260_1044/blockmgr-453473e7-76c2-4a94-85d0-d0b75b515ad6/10/shuffle_0_264_0.index
 (No such file or directory)

 This file actually exists:

  ls -l
 
 /hadoop/tmp/hadoop-hadoop/nm-local-dir/usercache/user/appcache/application_1426230650260_1044/blockmgr-453473e7-76c2-4a94-85d0-d0b75b515ad6/10/shuffle_0_264_0.index

 -rw-r--r-- 1 hadoop hadoop 11936 May 15 16:52

 /hadoop/tmp/hadoop-hadoop/nm-local-dir/usercache/user/appcache/application_1426230650260_1044/blockmgr-453473e7-76c2-4a94-85d0-d0b75b515ad6/10/shuffle_0_264_0.index

 This error repeats on several executors and is followed by a number of

 org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output
 location for shuffle 0

 This results on most tasks being lost and executors dying.

 There is plenty of space on all of the appropriate filesystems, so none of
 the executors are running out of disk space. Any idea what might be causing
 this? I am running this via YARN on approximately 100 nodes with 2 cores
 per
 node. Any thoughts on what might be causing these errors? Thanks!



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/FetchFailedException-and-MetadataFetchFailedException-tp22901.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index:

2015-05-06 Thread Imran Rashid
oh yeah, I think I remember we discussed this a while back ... sorry I
forgot the details.  If you know you don't have a graph, did you try
setting spark.kryo.referenceTracking to false?  I'm also confused on how
you could hit this with a few million objects.  Are you serializing them
one at a time, or is there one big container which holds them all?

Was there ever any follow up from kryo?

On Wed, May 6, 2015 at 2:29 AM, Tristan Blakers tris...@blackfrog.org
wrote:

 Hi Imran,

 I had tried setting a really huge kryo buffer size (GB), but it didn’t
 make any difference.

 In my data sets, objects are no more than 1KB each, and don’t form a
 graph, so I don’t think the buffer size should need to be larger than a few
 MB, except perhaps for reasons of efficiency?

 The exception usually occurs in 
 “com.esotericsoftware.kryo.util.IdentityObjectIntMap”
 when it is resizing (or a similar operation), implying there are too many
 object references, though it’s hard to see how I could get to 2b references
 from a few million objects...

 T

 On 6 May 2015 at 00:58, Imran Rashid iras...@cloudera.com wrote:

 Are you setting a really large max buffer size for kryo?
 Was this fixed by https://issues.apache.org/jira/browse/SPARK-6405 ?


 If not, we should open up another issue to get a better warning in these
 cases.

 On Tue, May 5, 2015 at 2:47 AM, shahab shahab.mok...@gmail.com wrote:

 Thanks Tristan for sharing this. Actually this happens when I am reading
 a csv file of 3.5 GB.

 best,
 /Shahab



 On Tue, May 5, 2015 at 9:15 AM, Tristan Blakers tris...@blackfrog.org
 wrote:

 Hi Shahab,

 I’ve seen exceptions very similar to this (it also manifests as
 negative array size exception), and I believe it’s a really bug in Kryo.

 See this thread:

 http://mail-archives.us.apache.org/mod_mbox/spark-user/201502.mbox/%3ccag02ijuw3oqbi2t8acb5nlrvxso2xmas1qrqd_4fq1tgvvj...@mail.gmail.com%3E

 Manifests in all of the following situations when working with an
 object graph in excess of a few GB: Joins, Broadcasts, and when using the
 hadoop save APIs.

 Tristan


 On 3 May 2015 at 07:26, Olivier Girardot ssab...@gmail.com wrote:

 Can you post your code, otherwise there's not much we can do.

 Regards,

 Olivier.

 Le sam. 2 mai 2015 à 21:15, shahab shahab.mok...@gmail.com a écrit :

 Hi,

 I am using sprak-1.2.0 and I used Kryo serialization but I get the
 following excepton.

 java.io.IOException: com.esotericsoftware.kryo.KryoException:
 java.lang.IndexOutOfBoundsException: Index: 3448, Size: 1

 I do apprecciate if anyone could tell me how I can resolve this?

 best,
 /Shahab








Re: How to skip corrupted avro files

2015-05-05 Thread Imran Rashid
You might be interested in https://issues.apache.org/jira/browse/SPARK-6593
and the discussion around the PRs.

This is probably more complicated than what you are looking for, but you
could copy the code for HadoopReliableRDD in the PR into your own code and
use it, without having to wait for the issue to get resolved.

On Sun, May 3, 2015 at 12:57 PM, Shing Hing Man mat...@yahoo.com.invalid
wrote:


 Hi,
  I am using Spark 1.3.1 to read a directory of about 2000 avro files.
 The avro files are from a third party and a few of them are corrupted.

   val path = {myDirecotry of avro files}

  val sparkConf = new SparkConf().setAppName(avroDemo).setMaster(local)
   val sc = new SparkContext(sparkConf)

  val sqlContext = new SQLContext(sc)

  val data = sqlContext.avroFile(path);
  data.select(.)

  When I run the above code, I get the following exception.
  org.apache.avro.AvroRuntimeException: java.io.IOException: Invalid sync!
  at org.apache.avro.file.DataFileStream.hasNext(DataFileStream.java:222)
 ~[classes/:1.7.7]
  at org.apache.avro.mapred.AvroRecordReader.next(AvroRecordReader.java:64)
 ~[avro-mapred-1.7.7-hadoop2.jar:1.7.7]
  at org.apache.avro.mapred.AvroRecordReader.next(AvroRecordReader.java:32)
 ~[avro-mapred-1.7.7-hadoop2.jar:1.7.7]
  at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:245)
 ~[spark-core_2.10-1.3.1.jar:1.3.1]
  at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:212)
 ~[spark-core_2.10-1.3.1.jar:1.3.1]
  at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
 ~[spark-core_2.10-1.3.1.jar:1.3.1]
  at
 org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
 ~[spark-core_2.10-1.3.1.jar:1.3.1]
  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 ~[scala-library.jar:na]
  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 ~[scala-library.jar:na]
  at
 org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$6.apply(Aggregate.scala:129)
 ~[spark-sql_2.10-1.3.1.jar:1.3.1]
  at
 org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$6.apply(Aggregate.scala:126)
 ~[spark-sql_2.10-1.3.1.jar:1.3.1]
  at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634)
 ~[spark-core_2.10-1.3.1.jar:1.3.1]
  at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634)
 ~[spark-core_2.10-1.3.1.jar:1.3.1]
  at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 ~[spark-core_2.10-1.3.1.jar:1.3.1]
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 ~[spark-core_2.10-1.3.1.jar:1.3.1]
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 ~[spark-core_2.10-1.3.1.jar:1.3.1]
  at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 ~[spark-core_2.10-1.3.1.jar:1.3.1]
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 ~[spark-core_2.10-1.3.1.jar:1.3.1]
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 ~[spark-core_2.10-1.3.1.jar:1.3.1]
  at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
 ~[spark-core_2.10-1.3.1.jar:1.3.1]
  at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
 ~[spark-core_2.10-1.3.1.jar:1.3.1]
  at org.apache.spark.scheduler.Task.run(Task.scala:64)
 ~[spark-core_2.10-1.3.1.jar:1.3.1]
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
 ~[spark-core_2.10-1.3.1.jar:1.3.1]
  at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 [na:1.7.0_71]
  at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 [na:1.7.0_71]
  at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71]
 Caused by: java.io.IOException: Invalid sync!
  at
 org.apache.avro.file.DataFileStream.nextRawBlock(DataFileStream.java:314)
 ~[classes/:1.7.7]
  at org.apache.avro.file.DataFileStream.hasNext(DataFileStream.java:209)
 ~[classes/:1.7.7]
  ... 25 common frames omitted

Is there an easy way to skip a corrupted avro file without reading the
 files one by one using sqlContext.avroFile(file) ?
  At present, my solution (hack)  is to have my own version of
 org.apache.avro.file.DataFileStream with method hasNext returns false (
 to signal the end file), when
  java.io.IOException: Invalid sync!
   is thrown.
   Please see  line 210 in

 https://github.com/apache/avro/blob/branch-1.7/lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java

   Thanks in advance for any assistance !
   Shing




Re: How to deal with code that runs before foreach block in Apache Spark?

2015-05-05 Thread Imran Rashid
Gerard is totally correct -- to expand a little more, I think what you want
to do is a solrInputDocumentJavaRDD.foreachPartition, instead of
solrInputDocumentJavaRDD.foreach:


solrInputDocumentJavaRDD.foreachPartition(
  new VoidFunctionIteratorSolrInputDocument() {
@Override
public void call(IteratorSolrInputDocument docItr) {
  ListSolrInputDocument docs = new ArrayListSolrInputDocument();
  for(SolrInputDocument solrInputDocument: docItr) {
// Add the solrInputDocument to the list of SolrInputDocuments
docs.add(solrInputDocument);
  }
  // push things to solr **from the executor, for this partition**
  // so for this make sense, you need to be sure solr can handle a bunch
  // of executors pushing into it simultaneously.
  addThingsToSolr(docs);
}
});

On Mon, May 4, 2015 at 8:44 AM, Gerard Maas gerard.m...@gmail.com wrote:

 I'm not familiar with the Solr API but provided that ' SolrIndexerDriver'
 is a singleton, I guess that what's going on when running on a cluster is
 that the call to:

  SolrIndexerDriver.solrInputDocumentList.add(elem)

 is happening on different singleton instances of the  SolrIndexerDriver
 on different JVMs while

 SolrIndexerDriver.solrServer.commit

 is happening on the driver.

 In practical terms, the lists on the executors are being filled-in but
 they are never committed and on the driver the opposite is happening.

 -kr, Gerard

 On Mon, May 4, 2015 at 3:34 PM, Emre Sevinc emre.sev...@gmail.com wrote:

 I'm trying to deal with some code that runs differently on Spark
 stand-alone mode and Spark running on a cluster. Basically, for each item
 in an RDD, I'm trying to add it to a list, and once this is done, I want to
 send this list to Solr.

 This works perfectly fine when I run the following code in stand-alone
 mode of Spark, but does not work when the same code is run on a cluster.
 When I run the same code on a cluster, it is like send to Solr part of
 the code is executed before the list to be sent to Solr is filled with
 items. I try to force the execution by solrInputDocumentJavaRDD.collect();
 after foreach, but it seems like it does not have any effect.

 // For each RDD
 solrInputDocumentJavaDStream.foreachRDD(
 new FunctionJavaRDDSolrInputDocument, Void() {
   @Override
   public Void call(JavaRDDSolrInputDocument
 solrInputDocumentJavaRDD) throws Exception {

 // For each item in a single RDD
 solrInputDocumentJavaRDD.foreach(
 new VoidFunctionSolrInputDocument() {
   @Override
   public void call(SolrInputDocument
 solrInputDocument) {

 // Add the solrInputDocument to the list of
 SolrInputDocuments

 SolrIndexerDriver.solrInputDocumentList.add(solrInputDocument);
   }
 });

 // Try to force execution
 solrInputDocumentJavaRDD.collect();


 // After having finished adding every SolrInputDocument to
 the list
 // add it to the solrServer, and commit, waiting for the
 commit to be flushed
 try {

   // Seems like when run in cluster mode, the list size is
 zero,
  // therefore the following part is never executed

   if (SolrIndexerDriver.solrInputDocumentList != null
SolrIndexerDriver.solrInputDocumentList.size() 
 0) {

 SolrIndexerDriver.solrServer.add(SolrIndexerDriver.solrInputDocumentList);
 SolrIndexerDriver.solrServer.commit(true, true);
 SolrIndexerDriver.solrInputDocumentList.clear();
   }
 } catch (SolrServerException | IOException e) {
   e.printStackTrace();
 }


 return null;
   }
 }
 );


 What should I do, so that sending-to-Solr part executes after the list of
 SolrDocuments are added to solrInputDocumentList (and works also in cluster
 mode)?


 --
 Emre Sevinç





Re: Spark job concurrency problem

2015-05-05 Thread Imran Rashid
can you give your entire spark submit command?  Are you missing
--executor-cores num_cpu?  Also, if you intend to use all 6 nodes, you
also need --num-executors 6

On Mon, May 4, 2015 at 2:07 AM, Xi Shen davidshe...@gmail.com wrote:

 Hi,

 I have two small RDD, each has about 600 records. In my code, I did

 val rdd1 = sc...cache()
 val rdd2 = sc...cache()

 val result = rdd1.cartesian(rdd2).*repartition*(num_cpu).map {case (a,b)
 =
   some_expensive_job(a,b)
 }

 I ran my job in YARN cluster with --master yarn-cluster, I have 6
 executor, and each has a large memory volume.

 However, I noticed my job is very slow. I went to the RM page, and found
 there are two containers, one is the driver, one is the worker. I guess
 this is correct?

 I went to the worker's log, and monitor the log detail. My app print some
 information, so I can use them to estimate the progress of the map
 operation. Looking at the log, it feels like the jobs are done one by one
 sequentially, rather than #cpu batch at a time.

 I checked the worker node, and their CPU are all busy.



 [image: --]
 Xi Shen
 [image: http://]about.me/davidshen
 http://about.me/davidshen?promo=email_sig
   http://about.me/davidshen



Re: com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index:

2015-05-05 Thread Imran Rashid
Are you setting a really large max buffer size for kryo?
Was this fixed by https://issues.apache.org/jira/browse/SPARK-6405 ?


If not, we should open up another issue to get a better warning in these
cases.

On Tue, May 5, 2015 at 2:47 AM, shahab shahab.mok...@gmail.com wrote:

 Thanks Tristan for sharing this. Actually this happens when I am reading a
 csv file of 3.5 GB.

 best,
 /Shahab



 On Tue, May 5, 2015 at 9:15 AM, Tristan Blakers tris...@blackfrog.org
 wrote:

 Hi Shahab,

 I’ve seen exceptions very similar to this (it also manifests as negative
 array size exception), and I believe it’s a really bug in Kryo.

 See this thread:

 http://mail-archives.us.apache.org/mod_mbox/spark-user/201502.mbox/%3ccag02ijuw3oqbi2t8acb5nlrvxso2xmas1qrqd_4fq1tgvvj...@mail.gmail.com%3E

 Manifests in all of the following situations when working with an object
 graph in excess of a few GB: Joins, Broadcasts, and when using the hadoop
 save APIs.

 Tristan


 On 3 May 2015 at 07:26, Olivier Girardot ssab...@gmail.com wrote:

 Can you post your code, otherwise there's not much we can do.

 Regards,

 Olivier.

 Le sam. 2 mai 2015 à 21:15, shahab shahab.mok...@gmail.com a écrit :

 Hi,

 I am using sprak-1.2.0 and I used Kryo serialization but I get the
 following excepton.

 java.io.IOException: com.esotericsoftware.kryo.KryoException:
 java.lang.IndexOutOfBoundsException: Index: 3448, Size: 1

 I do apprecciate if anyone could tell me how I can resolve this?

 best,
 /Shahab






Re: ReduceByKey and sorting within partitions

2015-05-04 Thread Imran Rashid
oh wow, that is a really interesting observation, Marco  Jerry.
I wonder if this is worth exposing in combineByKey()?  I think Jerry's
proposed workaround is all you can do for now -- use reflection to
side-step the fact that the methods you need are private.

On Mon, Apr 27, 2015 at 8:07 AM, Saisai Shao sai.sai.s...@gmail.com wrote:

 Hi Marco,

 As I know, current combineByKey() does not expose the related argument
 where you could set keyOrdering on the ShuffledRDD, since ShuffledRDD is
 package private, if you can get the ShuffledRDD through reflection or other
 way, the keyOrdering you set will be pushed down to shuffle. If you use a
 combination of transformations to do it, the result will be same but the
 efficiency may be different, some transformations will separate into
 different stages, which will introduce additional shuffle.

 Thanks
 Jerry


 2015-04-27 19:00 GMT+08:00 Marco marcope...@gmail.com:

 Hi,

 I'm trying, after reducing by key, to get data ordered among partitions
 (like RangePartitioner) and within partitions (like sortByKey or
 repartitionAndSortWithinPartition) pushing the sorting down to the
 shuffles machinery of the reducing phase.

 I think, but maybe I'm wrong, that the correct way to do that is that
 combineByKey call setKeyOrdering function on the ShuflleRDD that it
 returns.

 Am I wrong? Can be done by a combination of other transformations with
 the same efficiency?

 Thanks,
 Marco

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: spark kryo serialization question

2015-05-04 Thread Imran Rashid
yes, you should register all three.

The truth is, you only *need* to register classes that will get serialized
-- either via RDD caching or in a shuffle.  So if a type is only used as an
intermediate inside a stage, you don't need to register it.  But the
overhead of registering extra classes is pretty minimal, so as long as you
do this within reason, I think you're OK.


Imran

On Thu, Apr 30, 2015 at 12:34 AM, 邓刚[技术中心] triones.d...@vipshop.com wrote:

 Hi all

  We know that spark support Kryo serialization, suppose there is a
 map function which map C  to K,V(here C,K,V are instance of class C,K,V),
 when we register kryo serialization, should I register all of these three
 class?



 Best Wishes



 Triones Deng





 本电子邮件可能为保密文件。如果阁下非电子邮件所指定之收件人,谨请立即通知本人。敬请阁下不要使用、保存、复印、打印、散布本电子邮件及其内容,或将其用于其他任何目的或向任何人披露。谢谢您的合作!
 This communication is intended only for the addressee(s) and may contain
 information that is privileged and confidential. You are hereby notified
 that, if you are not an intended recipient listed above, or an authorized
 employee or agent of an addressee of this communication responsible for
 delivering e-mail messages to an intended recipient, any dissemination,
 distribution or reproduction of this communication (including any
 attachments hereto) is strictly prohibited. If you have received this
 communication in error, please notify us immediately by a reply e-mail
 addressed to the sender and permanently delete the original e-mail
 communication and any attachments from all storage devices without making
 or otherwise retaining a copy.



Re: Spark partitioning question

2015-05-04 Thread Imran Rashid
Hi Marius,

I am also a little confused -- are you saying that myPartitions is
basically something like:

class MyPartitioner extends Partitioner {
  def numPartitions = 1
  def getPartition(key: Any) = 0
}

??

If so, I don't understand how you'd ever end up data in two partitions.
Indeed, than everything before the call to partitionBy(myPartitioner) is
somewhat irrelevant.  The important point is the partitionsBy should put
all the data in one partition, and then the operations after that do not
move data between partitions.  so if you're really observing data in two
partitions, then it would good to know more about what version of spark you
are on, your data etc. as it sounds like a bug.

But, I have a feeling there is some misunderstanding about what your
partitioner is doing.  Eg., I think doing groupByKey followed by sortByKey
doesn't make a lot of sense -- in general one sortByKey is all you need
(its not exactly the same, but most probably close enough, and avoids doing
another expensive shuffle).  If you can share a bit more information on
your partitioner, and what properties you need for your f, that might
help.

thanks,
Imran


On Tue, Apr 28, 2015 at 7:10 AM, Marius Danciu marius.dan...@gmail.com
wrote:

 Hello all,

 I have the following Spark (pseudo)code:

 rdd = mapPartitionsWithIndex(...)
 .mapPartitionsToPair(...)
 .groupByKey()
 .sortByKey(comparator)
 .partitionBy(myPartitioner)
 .mapPartitionsWithIndex(...)
 .mapPartitionsToPair( *f* )

 The input data has 2 input splits (yarn 2.6.0).
 myPartitioner partitions all the records on partition 0, which is correct,
 so the intuition is that f provided to the last transformation
 (mapPartitionsToPair) would run sequentially inside a single yarn
 container. However from yarn logs I do see that both yarn containers are
 processing records from the same partition ... and *sometimes*  the over
 all job fails (due to the code in f which expects a certain order of
 records) and yarn container 1 receives the records as expected, whereas
 yarn container 2 receives a subset of records ... for a reason I cannot
 explain and f fails.

 The overall behavior of this job is that sometimes it succeeds and
 sometimes it fails ... apparently due to inconsistent propagation of sorted
 records to yarn containers.


 If any of this makes any sense to you, please let me know what I am
 missing.



 Best,
 Marius



Re: Extra stage that executes before triggering computation with an action

2015-05-04 Thread Imran Rashid
sortByKey() runs one job to sample the data, to determine what range of
keys to put in each partition.

There is a jira to change it to defer launching the job until the
subsequent action, but it will still execute another stage:

https://issues.apache.org/jira/browse/SPARK-1021

On Wed, Apr 29, 2015 at 5:57 PM, Tom Hubregtsen thubregt...@gmail.com
wrote:

 I'm not sure, but I wonder if because you are using the Spark REPL that it
 may not be representing what a normal runtime execution would look like and
 is possibly eagerly running a partial DAG once you define an operation that
 would cause a shuffle.

 What happens if you setup your same set of commands [a-e] in a file and use
 the Spark REPL's `load` or `paste` command to load them all at once? From
 Richard

 I have also packaged it in a jar file (without [e], the debug string), and
 still see the extra stage before the other two that I would expect. Even
 when I remove [d], the action, I still see stage 0 being executed (and do
 not see stage 1 and 2).

 Again a shortened log of the Stage 0:
 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[4] at
 sortByKey, which has no missing parents
 INFO DAGScheduler: ResultStage 0 (sortByKey) finished in 0.192 s




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Extra-stage-that-executes-before-triggering-computation-with-an-action-tp22707p22713.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Does HadoopRDD.zipWithIndex method preserve the order of the input data from Hadoop?

2015-04-24 Thread Imran Rashid
Another issue is that hadooprdd (which sc.textfile uses) might split input
files and even if it doesn't split, it doesn't guarantee that part files
numbers go to the corresponding partition number in the rdd.  Eg part-0
could go to partition 27
On Apr 24, 2015 7:41 AM, Michal Michalski michal.michal...@boxever.com
wrote:

 Of course after you do it, you probably want to call
 repartition(somevalue) on your RDD to get your paralellism back.

 Kind regards,
 Michał Michalski,
 michal.michal...@boxever.com

 On 24 April 2015 at 15:28, Michal Michalski michal.michal...@boxever.com
 wrote:

 I did a quick test as I was curious about it too. I created a file with
 numbers from 0 to 999, in order, line by line. Then I did:

 scala val numbers = sc.textFile(./numbers.txt)
 scala val zipped = numbers.zipWithUniqueId
 scala zipped.foreach(i = println(i))

 Expected result if the order was preserved would be something like: (0,
 0), (1, 1) etc.
 Unfortunately, the output looks like this:

 (126,1)
 (223,2)
 (320,3)
 (1,0)
 (127,11)
 (2,10)
 (...)

 The workaround I found that works for me for my specific use case
 (relatively small input files) is setting explicitly the number of
 partitions to 1 when reading a single *text* file:

 scala val numbers_sp = sc.textFile(./numbers.txt, 1)

 Than the output is exactly as I would expect.

 I didn't dive into the code too much, but I took a very quick look at it
 and figured out - correct me if I missed something, it's Friday afternoon!
 ;-)  - that this workaround will work fine for all the input formats
 inheriting from org.apache.hadoop.mapred.FileInputFormat including
 TextInputFormat, of course - see the implementation of getSplits() method
 there (
 http://grepcode.com/file/repo1.maven.org/maven2/org.jvnet.hudson.hadoop/hadoop-core/0.19.1-hudson-2/org/apache/hadoop/mapred/FileInputFormat.java#FileInputFormat.getSplits%28org.apache.hadoop.mapred.JobConf%2Cint%29
 ).
 The numSplits variable passed there is exactly the same value as you
 provide as a second argument to textFile, which is minPartitions. However,
 while *min* suggests that we can only define a minimal number of
 partitions, while we have no control over the max, from what I can see in
 the code, that value specifies the *exact* number of partitions per the
 FileInputFormat.getSplits implementation. Of course it can differ for other
 input formats, but in this case it should work just fine.


 Kind regards,
 Michał Michalski,
 michal.michal...@boxever.com

 On 24 April 2015 at 14:05, Spico Florin spicoflo...@gmail.com wrote:

 Hello!
   I know that HadoopRDD partitions are built based on the number of
 splits in HDFS. I'm wondering if these partitions preserve the initial
 order of data in file.
 As an example, if I have an HDFS (myTextFile) file that has these splits:

 split 0- line 1, ..., line k
 split 1-line k+1,..., line k+n
 splt 2-line k+n, line k+n+m

 and the code
 val lines=sc.textFile(hdfs://mytextFile)
 lines.zipWithIndex()

 will the order of lines preserved?
 (line 1, zipIndex 1) , .. (line k, zipIndex k), and so one.

 I found this question on stackoverflow (
 http://stackoverflow.com/questions/26046410/how-can-i-obtain-an-element-position-in-sparks-rdd)
 whose answer intrigued me:
 Essentially, RDD's zipWithIndex() method seems to do this, but it won't
 preserve the original ordering of the data the RDD was created from

 Can you please confirm that is this the correct answer?

 Thanks.
  Florin










Re: When are TaskCompletionListeners called?

2015-04-17 Thread Imran Rashid
its the latter -- after spark gets to the end of the iterator (or if it
hits an exception)

so your example is good, that is exactly what it is intended for.

On Fri, Apr 17, 2015 at 12:23 PM, Akshat Aranya aara...@gmail.com wrote:

 Hi,

 I'm trying to figure out when TaskCompletionListeners are called -- are
 they called at the end of the RDD's compute() method, or after the
 iteration through the iterator of the compute() method is completed.

 To put it another way, is this OK:

 class DatabaseRDD[T] extends RDD[T] {

   def compute(...): Iterator[T] = {
 val session = // acquire a DB session
 TaskContext.get.addTaskCompletionListener { (context) =
   session.release()
 }

 val iterator = session.query(...)
 iterator
   }
 }



Re: How to persist RDD return from partitionBy() to disk?

2015-04-17 Thread Imran Rashid
https://issues.apache.org/jira/browse/SPARK-1061

note the proposed fix isn't to have spark automatically know about the
partitioner when it reloads the data, but at least to make it *possible*
for it to be done at the application level.

On Fri, Apr 17, 2015 at 11:35 AM, Wang, Ningjun (LNG-NPV) 
ningjun.w...@lexisnexis.com wrote:

  I have a huge RDD[Document] with millions of items. I partitioned it
 using HashPartitioner and save as object file. But when I load the object
 file back into RDD, I lost the HashPartitioner. How do I preserve the
 partitions when loading the object file?



 Here is the code



 *val *docVectors : RDD[DocVector] = computeRdd() // expensive calculation



 *val *partitionedDocVectors : RDD[(String, DocVector)] = docVectors .keyBy(d
 = d.id).partitionBy(*new *HashPartitioner(16))
 partitionedDocVectors.saveAsObjectFile(
 *c:/temp/partitionedDocVectors.obj*)

 // At this point, I check the folder *c:/temp/partitionedDocVectors.obj,
 it contains 16 parts: “part-0, part-1, … part-00015”*



 // Now laod the object file back
 *val *partitionedDocVectors2 : RDD[(String, DocVector)] = sc.objectFile(
 *c:/temp/partitionedDocVectors.obj*)

 // Now partitionedDocVectors2 contains 956 parts and it has no partinier


 *println*(*spartitions: **$*{partitionedDocVectors.partitions.size}**)
 // return 956
 *if *(idAndDocVectors.partitioner.isEmpty) *println*(*No partitioner*)
 // it does print out this line



 So how can I preserve the partitions of partitionedDocVectors on disk so
 I can load it back?



 Ningjun





Re: Can't get SparkListener to work

2015-04-17 Thread Imran Rashid
when you start the spark-shell, its already too late to get the
ApplicationStart event.  Try listening for StageCompleted or JobEnd instead.

On Fri, Apr 17, 2015 at 5:54 PM, Praveen Balaji 
secondorderpolynom...@gmail.com wrote:

 I'm trying to create a simple SparkListener to get notified of error on
 executors. I do not get any call backs on my SparkListener. Here some
 simple code I'm executing in spark-shell. But I still don't get any
 callbacks on my listener. Am I doing something wrong?

 Thanks for any clue you can send my way.

 Cheers
 Praveen

 ==
 import org.apache.spark.scheduler.SparkListener
 import org.apache.spark.scheduler.SparkListenerApplicationStart
 import org.apache.spark.scheduler.SparkListenerApplicationEnd
 import org.apache.spark.SparkException

 sc.addSparkListener(new SparkListener() {
   override def onApplicationStart(applicationStart:
 SparkListenerApplicationStart) {
 println( onApplicationStart:  + applicationStart.appName);
   }

   override def onApplicationEnd(applicationEnd:
 SparkListenerApplicationEnd) {
 println( onApplicationEnd:  + applicationEnd.time);
   }
 });

 sc.parallelize(List(1, 2, 3)).map(throw new
 SparkException(test)).collect();
 ===

 output:

 scala org.apache.spark.SparkException: hshsh
 at $iwC$$iwC$$iwC$$iwC.init(console:29)
 at $iwC$$iwC$$iwC.init(console:34)
 at $iwC$$iwC.init(console:36)
 at $iwC.init(console:38)




Re: Execption while using kryo with broadcast

2015-04-15 Thread Imran Rashid
this is a really strange exception ... I'm especially surprised that it
doesn't work w/ java serialization.  Do you think you could try to boil it
down to a minimal example?

On Wed, Apr 15, 2015 at 8:58 AM, Jeetendra Gangele gangele...@gmail.com
wrote:

 Yes Without Kryo it did work out.when I remove kryo registration it did
 worked out

 On 15 April 2015 at 19:24, Jeetendra Gangele gangele...@gmail.com wrote:

 its not working with the combination of Broadcast.
 Without Kyro also not working.


 On 15 April 2015 at 19:20, Akhil Das ak...@sigmoidanalytics.com wrote:

 Is it working without kryo?

 Thanks
 Best Regards

 On Wed, Apr 15, 2015 at 6:38 PM, Jeetendra Gangele gangele...@gmail.com
  wrote:

 Hi All I am getting below exception while using Kyro serializable with
 broadcast variable. I am broadcating a hasmap with below line.

 MapLong, MatcherReleventData matchData =RddForMarch.collectAsMap();
 final BroadcastMapLong, MatcherReleventData dataMatchGlobal =
 jsc.broadcast(matchData);






 15/04/15 12:58:51 ERROR executor.Executor: Exception in task 0.3 in
 stage 4.0 (TID 7)
 java.io.IOException: java.lang.UnsupportedOperationException
 at
 org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1003)
 at
 org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164)
 at
 org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
 at
 org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
 at
 org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87)
 at
 com.insideview.yam.CompanyMatcher$4.call(CompanyMatcher.java:103)
 at
 com.insideview.yam.CompanyMatcher$4.call(CompanyMatcher.java:1)
 at
 org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:1002)
 at
 org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:1002)
 at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
 at
 org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:204)
 at
 org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:58)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
 at org.apache.spark.scheduler.Task.run(Task.scala:56)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)
 Caused by: java.lang.UnsupportedOperationException
 at java.util.AbstractMap.put(AbstractMap.java:203)
 at
 com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135)
 at
 com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:17)
 at
 com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
 at
 org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:142)
 at
 org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:216)
 at
 org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:177)
 at
 org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1000)
 ... 18 more
 15/04/15 12:58:51 INFO executor.CoarseGrainedExecutorBackend: Driver
 commanded a shutdown
 15/04/15 12:58:51 INFO storage.MemoryStore: MemoryStore cleared
 15/04/15 12:58:51 INFO storage.BlockManager: BlockManager stopped
 15/04/15 12:58:51 INFO
 remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote
 daemon.















Re: Execption while using kryo with broadcast

2015-04-15 Thread Imran Rashid
oh interesting.  The suggested workaround is to wrap the result from
collectAsMap into another hashmap, you should try that:

MapLong, MatcherReleventData matchData =RddForMarch.collectAsMap();
MapString, String tmp = new HashMapString, String(matchData);
final BroadcastMapLong, MatcherReleventData dataMatchGlobal =
jsc.broadcast(tmp);

Can you please clarify:
* Does it work w/ java serialization in the end?  Or is this kryo only?
* which Spark version you are using? (one of the relevant bugs was fixed in
1.2.1 and 1.3.0)



On Wed, Apr 15, 2015 at 9:06 AM, Jeetendra Gangele gangele...@gmail.com
wrote:

 This looks like known issue? check this out

 http://apache-spark-user-list.1001560.n3.nabble.com/java-io-InvalidClassException-org-apache-spark-api-java-JavaUtils-SerializableMapWrapper-no-valid-cor-td20034.html

 Can you please suggest any work around I am broad casting HashMap return
 from RDD.collectasMap().

 On 15 April 2015 at 19:33, Imran Rashid iras...@cloudera.com wrote:

 this is a really strange exception ... I'm especially surprised that it
 doesn't work w/ java serialization.  Do you think you could try to boil it
 down to a minimal example?

 On Wed, Apr 15, 2015 at 8:58 AM, Jeetendra Gangele gangele...@gmail.com
 wrote:

 Yes Without Kryo it did work out.when I remove kryo registration it did
 worked out

 On 15 April 2015 at 19:24, Jeetendra Gangele gangele...@gmail.com
 wrote:

 its not working with the combination of Broadcast.
 Without Kyro also not working.


 On 15 April 2015 at 19:20, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Is it working without kryo?

 Thanks
 Best Regards

 On Wed, Apr 15, 2015 at 6:38 PM, Jeetendra Gangele 
 gangele...@gmail.com wrote:

 Hi All I am getting below exception while using Kyro serializable
 with broadcast variable. I am broadcating a hasmap with below line.

 MapLong, MatcherReleventData matchData =RddForMarch.collectAsMap();
 final BroadcastMapLong, MatcherReleventData dataMatchGlobal =
 jsc.broadcast(matchData);






 15/04/15 12:58:51 ERROR executor.Executor: Exception in task 0.3 in
 stage 4.0 (TID 7)
 java.io.IOException: java.lang.UnsupportedOperationException
 at
 org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1003)
 at
 org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164)
 at
 org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
 at
 org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
 at
 org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87)
 at
 com.insideview.yam.CompanyMatcher$4.call(CompanyMatcher.java:103)
 at
 com.insideview.yam.CompanyMatcher$4.call(CompanyMatcher.java:1)
 at
 org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:1002)
 at
 org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:1002)
 at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
 at
 org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:204)
 at
 org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:58)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
 at org.apache.spark.scheduler.Task.run(Task.scala:56)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)
 Caused by: java.lang.UnsupportedOperationException
 at java.util.AbstractMap.put(AbstractMap.java:203)
 at
 com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135)
 at
 com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:17)
 at
 com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
 at
 org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:142)
 at
 org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:216)
 at
 org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:177)
 at
 org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1000)
 ... 18 more
 15/04/15 12:58:51 INFO executor.CoarseGrainedExecutorBackend: Driver
 commanded a shutdown
 15/04/15 12:58:51 INFO storage.MemoryStore: MemoryStore cleared
 15/04/15 12:58:51 INFO storage.BlockManager: BlockManager stopped
 15/04/15 12:58:51 INFO
 remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote
 daemon.




















Re: Regarding benefits of using more than one cpu for a task in spark

2015-04-14 Thread Imran Rashid
Hi twinkle,

To be completely honest, I'm not sure, I had never heard spark.task.cpus
before.  But I could imagine two different use cases:

a) instead of just relying on spark's creation of tasks for parallelism, a
user wants to run multiple threads *within* a task.  This is sort of going
against the programming model of spark, but I guess this feature is meant
to give you the bare minimum support you need in case you really want.
Eg., maybe you have some existing library you want to use in each task
which is already multi-threaded, or you pipe to some external programming.
Or maybe you even do something custom yourself -- eg. you have some
coordination between threads that spark doesn't give you between tasks.

b) as a simple way to tune some resource management.  Eg., you could
initially have your cluster configured to overcount cores for
hyperthreading, but then set spark.task.cpus to 2, if you don't want to
count hyperthreading.  Or perhaps you want to leave some cores open for all
the other work going on -- GC, network IO, etc.  (But then again, this is a
strange setting to use for that -- you'd probably just want some fixed
number of cores to count, not a multiplier.)

On Tue, Apr 7, 2015 at 2:01 AM, twinkle sachdeva twinkle.sachd...@gmail.com
 wrote:

 Hi,

 In spark, there are two settings regarding number of cores, one is at task
 level :spark.task.cpus

 and there is another one, which drives number of cores per executors:
 spark.executor.cores

 Apart from using more than one core for a task which has to call some
 other external API etc, is there any other use case / benefit of assigning
 more than one core to a task?

 As per the code, I can only see this being used while scheduling etc , as
 such RDD partitions etc remains untouched from this setting. Does this mean
 that coder needs to take care of coding the application logic to take care
 of this setting? ( which again let me think over this setting ).

 Comments please.

 Thanks,

 Twinkle







Re: Registering classes with KryoSerializer

2015-04-14 Thread Imran Rashid
hmm, I dunno why IntelliJ is unhappy, but you can always fall back to
getting a class from the String:

Class.forName(scala.reflect.ClassTag$$anon$1)

perhaps the class is package private or something, and the repl somehow
subverts it ...

On Tue, Apr 14, 2015 at 5:44 PM, Arun Lists lists.a...@gmail.com wrote:

 Hi Imran,

 Thanks for the response! However, I am still not there yet.

 In the Scala interpreter, I can do:

 scala classOf[scala.reflect.ClassTag$$anon$1]

 but when I try to do this in my program in IntelliJ, it indicates an error:

 Cannot resolve symbol ClassTag$$anon$1

 Hence I am not any closer to making this work. If you have any further
 suggestions, they would be most welcome.

 arun


 On Tue, Apr 14, 2015 at 2:33 PM, Imran Rashid iras...@cloudera.com
 wrote:

 Hi Arun,

 It can be hard to use kryo with required registration because of issues
 like this -- there isn't a good way to register all the classes that you
 need transitively.  In this case, it looks like one of your classes has a
 reference to a ClassTag, which in turn has a reference to some anonymous
 inner class.  I'd suggest

 (a) figuring out whether you really want to be serializing this thing --
 its possible you're serializing an RDD which keeps a ClassTag, but normally
 you wouldn't want to serialize your RDDs
 (b) you might want to bring this up w/ chill -- spark offloads most of
 the kryo setup for all the scala internals to chill, I'm surprised they
 don't handle this already.  Looks like they still handle ClassManifests
 which are from pre-scala 2.10:
 https://github.com/twitter/chill/blob/master/chill-scala/src/main/scala/com/twitter/chill/ScalaKryoInstantiator.scala#L189

 (c) you can always register these classes yourself, despite the crazy
 names, though you'll just need to knock these out one-by-one:

 scala classOf[scala.reflect.ClassTag$$anon$1]

 res0: Class[scala.reflect.ClassTag[T]{def unapply(x$1:
 scala.runtime.BoxedUnit): Option[_]; def arrayClass(x$1: Class[_]):
 Class[_]}] = class scala.reflect.ClassTag$$anon$1

 On Mon, Apr 13, 2015 at 6:09 PM, Arun Lists lists.a...@gmail.com wrote:

 Hi,

 I am trying to register classes with KryoSerializer. This has worked
 with other programs. Usually the error messages are helpful in indicating
 which classes need to be registered. But with my current program, I get the
 following cryptic error message:

 *Caused by: java.lang.IllegalArgumentException: Class is not registered:
 scala.reflect.ClassTag$$anon$1*

 *Note: To register this class use:
 kryo.register(scala.reflect.ClassTag$$anon$1.class);*

 How do I find out which class needs to be registered? I looked at my
 program and registered all classes used in RDDs. But clearly more classes
 remain to be registered if I can figure out which classes.

 Thanks for your help!

 arun







Re: counters in spark

2015-04-14 Thread Imran Rashid
Hi Robert,

A lot of task metrics are already available for individual tasks.  You can
get these programmatically by registering a SparkListener, and you van also
view them in the UI.  Eg., for each task, you can see runtime,
serialization time, amount of shuffle data read, etc.  I'm working on also
exposing the data in the UI as json.

In addition, you can also use the metrics system to get a different view of
the data.  It has a different set of information, and also is better for a
timeline view, as opposed to a task-oriented view you get through the UI.

You can read about both options here:

https://spark.apache.org/docs/latest/monitoring.html


On Mon, Apr 13, 2015 at 12:48 PM, Grandl Robert rgra...@yahoo.com.invalid
wrote:

 Guys,

 Do you have any thoughts on this ?


 Thanks,
 Robert



   On Sunday, April 12, 2015 5:35 PM, Grandl Robert
 rgra...@yahoo.com.INVALID wrote:


 Hi guys,

 I was trying to figure out some counters in Spark, related to the amount
 of CPU or Memory used (in some metric), used by a task/stage/job, but I
 could not find any.

 Is there any such counter available ?

 Thank you,
 Robert








Re: Registering classes with KryoSerializer

2015-04-14 Thread Imran Rashid
Hi Arun,

It can be hard to use kryo with required registration because of issues
like this -- there isn't a good way to register all the classes that you
need transitively.  In this case, it looks like one of your classes has a
reference to a ClassTag, which in turn has a reference to some anonymous
inner class.  I'd suggest

(a) figuring out whether you really want to be serializing this thing --
its possible you're serializing an RDD which keeps a ClassTag, but normally
you wouldn't want to serialize your RDDs
(b) you might want to bring this up w/ chill -- spark offloads most of the
kryo setup for all the scala internals to chill, I'm surprised they don't
handle this already.  Looks like they still handle ClassManifests which are
from pre-scala 2.10:
https://github.com/twitter/chill/blob/master/chill-scala/src/main/scala/com/twitter/chill/ScalaKryoInstantiator.scala#L189

(c) you can always register these classes yourself, despite the crazy
names, though you'll just need to knock these out one-by-one:

scala classOf[scala.reflect.ClassTag$$anon$1]

res0: Class[scala.reflect.ClassTag[T]{def unapply(x$1:
scala.runtime.BoxedUnit): Option[_]; def arrayClass(x$1: Class[_]):
Class[_]}] = class scala.reflect.ClassTag$$anon$1

On Mon, Apr 13, 2015 at 6:09 PM, Arun Lists lists.a...@gmail.com wrote:

 Hi,

 I am trying to register classes with KryoSerializer. This has worked with
 other programs. Usually the error messages are helpful in indicating which
 classes need to be registered. But with my current program, I get the
 following cryptic error message:

 *Caused by: java.lang.IllegalArgumentException: Class is not registered:
 scala.reflect.ClassTag$$anon$1*

 *Note: To register this class use:
 kryo.register(scala.reflect.ClassTag$$anon$1.class);*

 How do I find out which class needs to be registered? I looked at my
 program and registered all classes used in RDDs. But clearly more classes
 remain to be registered if I can figure out which classes.

 Thanks for your help!

 arun





Re: [BUG]Broadcast value return empty after turn to org.apache.spark.serializer.KryoSerializer

2015-04-14 Thread Imran Rashid
HI Shuai,

I don't think this is a bug with kryo, its just a subtlety with the kryo
works.  I *think* that it would also work if you changed your
PropertiesUtil class to either (a) remove the no-arg constructor or (b)
instead of extending properties, you make it a contained member variable.
I wish I had a succinct explanation, but I think it really gets into the
nitty gritty details of how these serializer works (and this just a hunch
of mine anyway, I'm not 100% sure).  Would be great if you could confirm
either way.

thanks,
Imran

On Tue, Apr 7, 2015 at 9:29 AM, Shuai Zheng szheng.c...@gmail.com wrote:

 I have found the issue, but I think it is bug.



 If I change my class to:



 public class ModelSessionBuilder implements Serializable {

 /**

 *

  */

 …

 private *Properties[] propertiesList*;

 private static final long serialVersionUID =
 -8139500301736028670L;

 }



 The broadcast value has no issue. But in my original form, if I broadcast
 it as array of my custom subclass of Properties, after broadcast, the
 propertiesList array will be an array of  empty PropertiesUtils objects
 there (empty, not NULL), I am not sure why this happen (the code without
 any problem when run with default java serializer). So I think this is a
 bug, but I am not sure it is a bug of spark or a bug of Kryo.



 Regards,



 Shuai




 *From:* Shuai Zheng [mailto:szheng.c...@gmail.com]
 *Sent:* Monday, April 06, 2015 5:34 PM
 *To:* user@spark.apache.org
 *Subject:* Broadcast value return empty after turn to
 org.apache.spark.serializer.KryoSerializer



 Hi All,



 I have tested my code without problem on EMR yarn (spark 1.3.0) with
 default serializer (java).

 But when I switch to org.apache.spark.serializer.KryoSerializer, the
 broadcast value doesn’t give me right result (actually return me empty
 custom class on inner object).



 Basically I broadcast a builder object, which carry an array of
 propertiesUtils object. The code should not have any logical issue because
 it works on default java serializer. But when I turn to the
 org.apache.spark.serializer.KryoSerializer, it looks like the Array doesn’t
 initialize, propertiesList will give a right size, but then all element in
 the array is just a normal empty PropertiesUtils.



 Do I miss anything when I use this KryoSerializer? I just put the two
 lines, do I need to implement some special code to enable KryoSerializer,
 but I search all places but can’t find any places mention it.



 sparkConf.set(spark.serializer,
 org.apache.spark.serializer.KryoSerializer);

 sparkConf.registerKryoClasses(*new* Class[]{ModelSessionBuilder.*class*,
 Constants.*class*, PropertiesUtils.*class*, ModelSession.*class*});



 public class ModelSessionBuilder implements Serializable {

 /**

 *

  */

 …

 private PropertiesUtils[] propertiesList;

 private static final long serialVersionUID =
 -8139500301736028670L;

 }



 *public* *class* PropertiesUtils *extends* Properties {

/**

*

 */

*private* *static* *final* *long* *serialVersionUID* =
 -3684043338580885551L;



*public* PropertiesUtils(Properties prop) {

   *super*(prop);

}



*public* PropertiesUtils() {

   // *TODO* Auto-generated constructor stub

}

 }





 Regards,



 Shuai



Re: Array[T].distinct doesn't work inside RDD

2015-04-14 Thread Imran Rashid
Interesting, my gut instinct is the same as Sean's.  I'd suggest debugging
this in plain old scala first, without involving spark.  Even just in the
scala shell, create one of your Array[T], try calling .toSet and calling
.distinct.  If those aren't the same, then its got nothing to do with
spark.  If its still different even after you make hashCode() consistent w/
equals(), then you might have more luck asking on the scala-user list:
https://groups.google.com/forum/#!forum/scala-user

If it works fine in plain scala, but not in spark, then it would be worth
bringing up here again for us to look into.

On Tue, Apr 7, 2015 at 4:41 PM, Anny Chen anny9...@gmail.com wrote:

 Hi Sean,

 I didn't override hasCode. But the problem is that Array[T].toSet could
 work but Array[T].distinct couldn't. If it is because I didn't override
 hasCode, then toSet shouldn't work either right? I also tried using this
 Array[T].distinct outside RDD, and it is working alright also, returning me
 the same result as Array[T].toSet.

 Thanks!
 Anny

 On Tue, Apr 7, 2015 at 2:31 PM, Sean Owen so...@cloudera.com wrote:

 Did you override hashCode too?
 On Apr 7, 2015 2:39 PM, anny9699 anny9...@gmail.com wrote:

 Hi,

 I have a question about Array[T].distinct on customized class T. My data
 is
 a like RDD[(String, Array[T])] in which T is a class written by my class.
 There are some duplicates in each Array[T] so I want to remove them. I
 override the equals() method in T and use

 val dataNoDuplicates = dataDuplicates.map{case(id, arr) = (id,
 arr.distinct)}

 to remove duplicates inside RDD. However this doesn't work since I did
 some
 further tests by using

 val dataNoDuplicates = dataDuplicates.map{case(id, arr) =
 val uniqArr = arr.distinct
 if(uniqArr.length  1) println(uniqArr.head == uniqArr.last)
 (id, uniqArr)
 }

 And from the worker stdout I could see that it always returns TRUE
 results. I then tried removing duplicates by using Array[T].toSet
 instead of
 Array[T].distinct and it is working!

 Could anybody explain why the Array[T].toSet and Array[T].distinct
 behaves
 differently here? And Why is Array[T].distinct not working?

 Thanks a lot!
 Anny




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Array-T-distinct-doesn-t-work-inside-RDD-tp22412.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Equi Join is taking for ever. 1 Task is Running while other 199 are complete

2015-04-14 Thread Imran Rashid
Shuffle write could be a good indication of skew, but it looks like the
task in question hasn't generated any shuffle write yet, because its still
working on the shuffle-read side.   So I wouldn't read too much into the
fact that the shuffle write is 0 for a task that is still running.

The shuffle read is larger than for the other tasks (3.0GB vs. 2.2 GB, or
more importantly, 55M records vs 1M records).  So it might not be that the
raw data volume is much higher on that task, but its getting a ton more
small records, which will also generate a lot of work.  It also is a little
more evidence to Jonathan's suggestion that there is a null / 0 record that
is getting grouped together.


On Mon, Apr 13, 2015 at 12:40 PM, Jonathan Coveney jcove...@gmail.com
wrote:

 I'm not 100% sure of spark's implementation but in the MR frameworks, it
 would have a much larger shuffle write size becasue that node is dealing
 with a lot more data and as a result has a lot  more to shuffle

 2015-04-13 13:20 GMT-04:00 java8964 java8...@hotmail.com:

 If it is really due to data skew, will the task hanging has much bigger 
 Shuffle
 Write Size in this case?

 In this case, the shuffle write size for that task is 0, and the rest IO
 of this task is not much larger than the fast finished tasks, is that
 normal?

 I am also interested in this case, as from statistics on the UI, how it
 indicates the task could have skew data?

 Yong

 --
 Date: Mon, 13 Apr 2015 12:58:12 -0400
 Subject: Re: Equi Join is taking for ever. 1 Task is Running while other
 199 are complete
 From: jcove...@gmail.com
 To: deepuj...@gmail.com
 CC: user@spark.apache.org


 I can promise you that this is also a problem in the pig world :) not
 sure why it's not a problem for this data set, though... are you sure that
 the two are doing the exact same code?

 you should inspect your source data. Make a histogram for each and see
 what the data distribution looks like. If there is a value or bucket with a
 disproportionate set of values you know you have an issue

 2015-04-13 12:50 GMT-04:00 ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com:

 You mean there is a tuple in either RDD, that has itemID = 0 or null ?
 And what is catch all ?

 That implies is it a good idea to run a filter on each RDD first ? We do
 not do this using Pig on M/R. Is it required in Spark world ?

 On Mon, Apr 13, 2015 at 9:58 PM, Jonathan Coveney jcove...@gmail.com
 wrote:

 My guess would be data skew. Do you know if there is some item id that is
 a catch all? can it be null? item id 0? lots of data sets have this sort of
 value and it always kills joins

 2015-04-13 11:32 GMT-04:00 ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com:

 Code:

 val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary,
 Long))] = lstgItem.join(viEvents).map {
   case (itemId, (listing, viDetail)) =
 val viSummary = new VISummary
 viSummary.leafCategoryId = listing.getLeafCategId().toInt
 viSummary.itemSiteId = listing.getItemSiteId().toInt
 viSummary.auctionTypeCode = listing.getAuctTypeCode().toInt
 viSummary.sellerCountryId = listing.getSlrCntryId().toInt
 viSummary.buyerSegment = 0
 viSummary.isBin = (if
 (listing.getBinPriceLstgCurncy.doubleValue()  0) 1 else 0)
 val sellerId = listing.getSlrId.toLong
 (sellerId, (viDetail, viSummary, itemId))
 }

 Running Tasks:
 Tasks IndexIDAttemptStatus ▾Locality LevelExecutor ID / HostLaunch Time
 DurationGC TimeShuffle Read Size / RecordsWrite TimeShuffle Write Size /
 RecordsShuffle Spill (Memory)Shuffle Spill (Disk)Errors  0 216 0 RUNNING
 PROCESS_LOCAL 181 / phxaishdc9dn0474.phx.ebay.com 2015/04/13 06:43:53 1.7
 h  13 min  3.0 GB / 56964921  0.0 B / 0  21.2 GB 1902.6 MB   2 218 0
 SUCCESS PROCESS_LOCAL 582 / phxaishdc9dn0235.phx.ebay.com 2015/04/13
 06:43:53 15 min  31 s  2.2 GB / 1666851  0.1 s 3.0 MB / 2062  54.8 GB 1924.5
 MB   1 217 0 SUCCESS PROCESS_LOCAL 202 /
 phxdpehdc9dn2683.stratus.phx.ebay.com 2015/04/13 06:43:53 19 min  1.3
 min  2.2 GB / 1687086  75 ms 3.9 MB / 2692  33.7 GB 1960.4 MB   4 220 0
 SUCCESS PROCESS_LOCAL 218 / phxaishdc9dn0855.phx.ebay.com 2015/04/13
 06:43:53 15 min  56 s  2.2 GB / 1675654  40 ms 3.3 MB / 2260  26.2 GB 1928.4
 MB



 Command:
 ./bin/spark-submit -v --master yarn-cluster --driver-class-path
 /apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/yarn/lib/guava-11.0.2.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar
 --jars
 /apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar,/home/dvasthimal/spark1.3/spark_reporting_dep_only-1.0-SNAPSHOT.jar
  --num-executors 3000 --driver-memory 12g --driver-java-options
 -XX:MaxPermSize=6G --executor-memory 12g --executor-cores 1 --queue
 hdmi-express --class com.ebay.ep.poc.spark.reporting.SparkApp
 

Re: Catching executor exception from executor in driver

2015-04-14 Thread Imran Rashid
(+dev)

Hi Justin,

short answer: no, there is no way to do that.

I'm just guessing here, but I imagine this was done to eliminate
serialization problems (eg., what if we got an error trying to serialize
the user exception to send from the executors back to the driver?).
Though, actually that isn't a great explanation either, since even when the
info gets back to the driver, its broken into a few string fields (eg., we
have the class name of the root exception), but eventually it just gets
converted to one big string.

I've cc'ed dev b/c I think this is an oversight in Spark.  It makes it
really hard to write an app to deal gracefully with various exceptions --
all you can do is look at the string in SparkException (which could change
arbitrarily between versions, in addition to just being a pain to work
with).  We should probably add much more fine-grained subclasses of
SparkException, at the very least distinguishing errors in user code vs.
errors in spark.  I could imagine there might be a few other cases we'd
like to distinguish more carefully as well.

Any thoughts from other devs?

thanks
Imran





On Tue, Apr 14, 2015 at 4:46 PM, Justin Yip yipjus...@prediction.io wrote:

 Hello,

 I would like to know if there is a way of catching exception throw from
 executor exception from the driver program. Here is an example:

 try {
   val x = sc.parallelize(Seq(1,2,3)).map(e = e / 0).collect
 } catch {
   case e: SparkException = {
 println(sERROR: $e)
 println(sCAUSE: ${e.getCause})
   }
 }

 Output:
 ERROR: org.apache.spark.SparkException: Job aborted due to stage failure:
 Task 1 in stage 1.0 failed 4 times, most recent failure: Lost task 1.3 in
 stage 1.0 (TID 15, pio1.c.ace-lotus-714.internal):
 java.lang.ArithmeticException: / by zero
 at
 $line71.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply$mcII$sp(console:51)
 ...
 CAUSE: null

 The exception cause is a null value. Is there any way that I can catch the
 ArithmeticException?

 Thanks

 Justin

 --
 View this message in context: Catching executor exception from executor
 in driver
 http://apache-spark-user-list.1001560.n3.nabble.com/Catching-executor-exception-from-executor-in-driver-tp22495.html
 Sent from the Apache Spark User List mailing list archive
 http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.



Re: Task result in Spark Worker Node

2015-04-13 Thread Imran Rashid
On the worker side, it all happens in Executor.  The task result is
computed here:

https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/executor/Executor.scala#L210

then its serialized along with some other goodies, and finally sent back to
the driver here:

https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/executor/Executor.scala#L255

What happens on the driver is quite a bit more complicated, and involves a
number of spots in the code, but at least to get you started, the results
are received here:

https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L328

though perhaps a more interesting spot is where they are handled in
DagScheduler#handleTaskCompletion:
https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1001


also, I think I know what you mean, but just to make sure: I wouldn't say
the results from the worker are broadcast back to the driver.  (a) in
spark, broadcast tends to refer to a particular api for sharing immutable
data from the driver to the workers (only one direction) and (b) it doesn't
really fit a more general meaning of broadcast either, since the results
are sent only to the driver, not to all nodes.

On Sun, Mar 29, 2015 at 8:34 PM, raggy raghav0110...@gmail.com wrote:

 I am a PhD student working on a research project related to Apache Spark. I
 am trying to modify some of the spark source code such that instead of
 sending the final result RDD from the worker nodes to a master node, I want
 to send the final result RDDs to some different node. In order to do this,
 I
 have been trying to identify at which point the Spark worker nodes
 broadcast
 the results of a job back to the master.

 So far, I understand that in Spark, the master serializes the RDD and the
 functions to be applied on them and sends them over to the worker nodes. In
 the context of reduce, it serializes the RDD partition and the reduce
 function and sends them to the worker nodes. However, my understanding of
 how things happen at the worker node is very limited and I would appreciate
 it if someone could help me identify where the process of broadcasting the
 results of local worker computations back to the master node takes place.

 This is some of the limited knowledge that I have about the worker nodes:

 Each job gets divided into smaller sets of tasks called stages. Each Stage
 is either a Shuffle Map Stage or Result Stage. In a Shuffle Map Stage, the
 task results are used as input for another stage. The result stage uses the
 RDD to compute the action that initiated the job. So, this result stage
 executes the last task for the job on the worker node. I would assume after
 this is done, it gets the result and broadcasts it to the driver
 application(the master).

 In ResultTask.scala(spark-core src/main/scala org.apache.spark.scheduler)
 it
 states A task that sends back the output to the driver application..
 However, I don't see when or where this happens in the source code. I would
 very much appreciate it if someone could help me identify where this
 happens
 in the Spark source code.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Task-result-in-Spark-Worker-Node-tp22283.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Registering classes with KryoSerializer

2015-04-13 Thread Imran Rashid
Those funny class names come from scala's specialization -- its compiling a
different version of OpenHashMap for each primitive you stick in the type
parameter.  Here's a super simple example:

*➜  **~ * more Foo.scala



class Foo[@specialized X]

*➜  **~ * scalac Foo.scala



*➜  **~ * ls Foo*.class



Foo$mcB$sp.class Foo$mcC$sp.class Foo$mcD$sp.class Foo$mcF$sp.class
Foo$mcI$sp.class Foo$mcJ$sp.class Foo$mcS$sp.class Foo$mcV$sp.class
Foo$mcZ$sp.class Foo.class

Sadly, I'm not sure of a foolproof way of getting all those specialized
versions registered except for registering with these strange names.
Here's an example of how its done by chill for Tuples (which is what spark
is relying on for its own registration of tuples):

https://github.com/twitter/chill/blob/6d03f6976f33f6e2e16b8e254fead1625720c281/chill-scala/src/main/scala/com/twitter/chill/TupleSerializers.scala#L861

On Mon, Mar 30, 2015 at 3:59 PM, Arun Lists lists.a...@gmail.com wrote:

 I am trying to register classes with KryoSerializer. I get the following
 error message:

 How do I find out what class is being referred to by: *OpenHashMap$mcI$sp
 ?*

 *com.esotericsoftware.kryo.KryoException:
 java.lang.IllegalArgumentException: Class is not registered:
 com.comp.common.base.OpenHashMap$mcI$sp*

 *Note: To register this class use: *
 *kryo.register(com.dtex.common.base.OpenHashMap$mcI$sp.class);*

 I have registered other classes with it by using:

 sparkConf.registerKryoClasses(Array(

   classOf[MyClass]

 ))


 Thanks,

 arun





Re: How to get rdd count() without double evaluation of the RDD?

2015-04-13 Thread Imran Rashid
yes, it sounds like a good use of an accumulator to me

val counts = sc.accumulator(0L)
rdd.map{x =
  counts += 1
  x
}.saveAsObjectFile(file2)


On Mon, Mar 30, 2015 at 12:08 PM, Wang, Ningjun (LNG-NPV) 
ningjun.w...@lexisnexis.com wrote:

  Sean



 Yes I know that I can use persist() to persist to disk, but it is still a
 big extra cost of persist a huge RDD to disk. I hope that I can do one pass
 to get the count as well as rdd.saveAsObjectFile(file2), but I don’t know
 how.



 May be use accumulator to count the total ?



 Ningjun



 *From:* Mark Hamstra [mailto:m...@clearstorydata.com]
 *Sent:* Thursday, March 26, 2015 12:37 PM
 *To:* Sean Owen
 *Cc:* Wang, Ningjun (LNG-NPV); user@spark.apache.org
 *Subject:* Re: How to get rdd count() without double evaluation of the
 RDD?



 You can also always take the more extreme approach of using
 SparkContext#runJob (or submitJob) to write a custom Action that does what
 you want in one pass.  Usually that's not worth the extra effort.



 On Thu, Mar 26, 2015 at 9:27 AM, Sean Owen so...@cloudera.com wrote:

 To avoid computing twice you need to persist the RDD but that need not be
 in memory. You can persist to disk with persist().

 On Mar 26, 2015 4:11 PM, Wang, Ningjun (LNG-NPV) 
 ningjun.w...@lexisnexis.com wrote:

 I have a rdd that is expensive to compute. I want to save it as object
 file and also print the count. How can I avoid double computation of the
 RDD?



 val rdd = sc.textFile(someFile).map(line = expensiveCalculation(line))



 val count = rdd.count()  // this force computation of the rdd

 println(count)

 rdd.saveAsObjectFile(file2) // this compute the RDD again



 I can avoid double computation by using cache



 val rdd = sc.textFile(someFile).map(line = expensiveCalculation(line))

 rdd.cache()

 val count = rdd.count()

 println(count)

 rdd.saveAsObjectFile(file2) // this compute the RDD again



 This only compute rdd once. However the rdd has millions of items and will
 cause out of memory.



 Question: how can I avoid double computation without using cache?





 Ningjun





Re: Understanding Spark Memory distribution

2015-04-13 Thread Imran Rashid
broadcast variables count towards spark.storage.memoryFraction, so they
use the same pool of memory as cached RDDs.

That being said, I'm really not sure why you are running into problems, it
seems like you have plenty of memory available.  Most likely its got
nothing to do with broadcast variables or caching -- its just whatever
logic you are applying in your transformations that are causing lots of GC
to occur during the computation.  Hard to say without knowing more details.

You could try increasing the timeout for the failed askWithReply by
increasing spark.akka.lookupTimeout (defaults to 30), but that would most
likely be treating a symptom, not the root cause.

On Fri, Mar 27, 2015 at 4:52 PM, Ankur Srivastava 
ankur.srivast...@gmail.com wrote:

 Hi All,

 I am running a spark cluster on EC2 instances of type: m3.2xlarge. I have
 given 26gb of memory with all 8 cores to my executors. I can see that in
 the logs too:

 *15/03/27 21:31:06 INFO AppClient$ClientActor: Executor added:
 app-20150327213106-/0 on worker-20150327212934-10.x.y.z-40128
 (10.x.y.z:40128) with 8 cores*

 I am not caching any RDD so I have set spark.storage.memoryFraction to
 0.2. I can see on SparkUI under executors tab Memory used is 0.0/4.5 GB.

 I am now confused with these logs?

 *15/03/27 21:31:08 INFO BlockManagerMasterActor: Registering block manager
 10.77.100.196:58407 http://10.77.100.196:58407 with 4.5 GB RAM,
 BlockManagerId(4, 10.x.y.z, 58407)*

 I am broadcasting a large object of 3 gb and after that when I am creating
 an RDD, I see logs which show this 4.5 GB memory getting full and then I
 get OOM.

 How can I make block manager use more memory?

 Is there any other fine tuning I need to do for broadcasting large objects?

 And does broadcast variable use cache memory or rest of the heap?


 Thanks

 Ankur



Re: Serialization Problem in Spark Program

2015-03-25 Thread Imran Rashid
you also need to register *array*s of MyObject.  so change:

conf.registerKryoClasses(Array(classOf[MyObject]))

to

conf.registerKryoClasses(Array(classOf[MyObject], classOf[Array[MyObject]]))


On Wed, Mar 25, 2015 at 2:44 AM, donhoff_h 165612...@qq.com wrote:

 Hi, experts

 I wrote a very simple spark program to test the KryoSerialization
 function. The codes are as following:

 object TestKryoSerialization {
   def main(args: Array[String]) {
 val conf = new SparkConf()
 conf.set(spark.serializer,
 org.apache.spark.serializer.KryoSerializer)
 conf.set(spark.kryo.registrationRequired,true)  //I use this
 statement to force checking registration.
 conf.registerKryoClasses(Array(classOf[MyObject]))

 val sc = new SparkContext(conf)
 val rdd =
 sc.textFile(hdfs://dhao.hrb:8020/user/spark/tstfiles/charset/A_utf8.txt)
 val objs = rdd.map(new MyObject(_,1)).collect()
 for (x - objs ) {
   x.printMyObject
 }
 }

 The class MyObject is also a very simple Class, which is only used to test
 the serialization function:
 class MyObject  {
   var myStr : String = 
   var myInt : Int = 0
   def this(inStr : String, inInt : Int) {
 this()
 this.myStr = inStr
 this.myInt = inInt
   }
   def printMyObject {
 println(MyString is : +myStr+\tMyInt is : +myInt)
   }
 }

 But when I ran the application, it reported the following error:
 java.lang.IllegalArgumentException: Class is not registered:
 dhao.test.Serialization.MyObject[]
 Note: To register this class use:
 kryo.register(dhao.test.Serialization.MyObject[].class);
 at com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:442)
 at
 com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:79)
 at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:472)
 at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:565)
 at
 org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:161)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)

 I don't understand what cause this problem. I have used the
 conf.registerKryoClasses to register my class. Could anyone help me ?
 Thanks

 By the way, the spark version is 1.3.0.




Re: spark disk-to-disk

2015-03-24 Thread Imran Rashid
I think writing to hdfs and reading it back again is totally reasonable.
In fact, in my experience, writing to hdfs and reading back in actually
gives you a good opportunity to handle some other issues as well:

a) instead of just writing as an object file, I've found its helpful to
write in a format that is a little more readable.  Json if efficiency
doesn't matter :) or you could use something like avro, which at least has
a good set of command line tools.

b) when developing, I hate it when I introduce a bug in step 12 of a long
pipeline, and need to re-run the whole thing.  If you save to disk, you can
write a little application logic that realizes step 11 is already sitting
on disk, and just restart from there.

c) writing to disk is also a good opportunity to do a little crude
auto-tuning of the number of partitions.  You can look at the size of
each partition on hdfs, and then adjust the number of partitions.

And I completely agree that losing the partitioning info is a major
limitation -- I submitted a PR to help deal w/ it:

https://github.com/apache/spark/pull/4449

getting narrow dependencies w/ partitioners can lead to pretty big
performance improvements, so I do think its important to make it easily
accessible to the user.  Though now I'm thinking that maybe this api is a
little clunky, and this should get rolled into the other changes you are
proposing to hadoop RDD  friends -- but I'll go into more discussion on
that thread.



On Mon, Mar 23, 2015 at 12:55 PM, Koert Kuipers ko...@tresata.com wrote:

 there is a way to reinstate the partitioner, but that requires
 sc.objectFile to read exactly what i wrote, which means sc.objectFile
 should never split files on reading (a feature of hadoop file inputformat
 that gets in the way here).

 On Mon, Mar 23, 2015 at 1:39 PM, Koert Kuipers ko...@tresata.com wrote:

 i just realized the major limitation is that i lose partitioning info...

 On Mon, Mar 23, 2015 at 1:34 AM, Reynold Xin r...@databricks.com wrote:


 On Sun, Mar 22, 2015 at 6:03 PM, Koert Kuipers ko...@tresata.com
 wrote:

 so finally i can resort to:
 rdd.saveAsObjectFile(...)
 sc.objectFile(...)
 but that seems like a rather broken abstraction.


 This seems like a fine solution to me.






Re: ShuffleBlockFetcherIterator: Failed to get block(s)

2015-03-20 Thread Imran Rashid
I think you should see some other errors before that, from
NettyBlockTransferService, with a msg like Exception while beginning
fetchBlocks.  There might be a bit more information there.  there are an
assortment of possible causes, but first lets just make sure you have all
the details from the original cause.

On Fri, Mar 20, 2015 at 8:49 AM, Eric Friedman eric.d.fried...@gmail.com
wrote:

 My job crashes with a bunch of these messages in the YARN logs.

 What are the appropriate steps in troubleshooting?

 15/03/19 23:29:45 ERROR shuffle.RetryingBlockFetcher: Exception while
 beginning fetch of 10 outstanding blocks (after 3 retries)

 15/03/19 23:29:45 ERROR storage.ShuffleBlockFetcherIterator: Failed to get
 block(s) from host:port



Re: Error communicating with MapOutputTracker

2015-03-20 Thread Imran Rashid
Hi Thomas,

sorry for such a late reply.  I don't have any super-useful advice, but
this seems like something that is important to follow up on.  to answer
your immediate question, No, there should not be any hard limit to the
number of tasks that MapOutputTracker can handle.  Though of course as
things get bigger, the overheads increase which is why you might hit
timeouts.

Two other minor suggestions:
(1) increase spark.akka.askTimeout -- thats the timeout you are running
into, it defaults to 30 seconds
(2) as you've noted, you've needed to play w/ other timeouts b/c of long GC
pauses -- its possible some GC tuning might help, though its a bit of a
black art so its hard to say what you can try.  You cold always try
Concurrent Mark Swee to avoid the long pauses, but of course that will
probably hurt overall performance.

can you share any more details of what you are trying to do?

Since you're fetching shuffle blocks in a shuffle map task, I guess you've
got two shuffles back-to-back, eg.
someRDD.reduceByKey{...}.map{...}.filter{...}.combineByKey{...}.  Do you
expect to be doing a lot of GC in between the two shuffles?? -eg., in the
little example I have, if there were lots of objects being created in the
map  filter steps that will make it out of the eden space.  One possible
solution to this would be to force the first shuffle to complete, before
running any of the subsequent transformations, eg. by forcing
materialization to the cache first

val intermediateRDD = someRDD.reduceByKey{...}.persist(DISK)
intermediateRDD.count() // force the shuffle to complete, without trying to
do our complicated downstream logic at the same time

val finalResult = intermediateRDD.map{...}.filter{...}.combineByKey{...}

Also, can you share your data size?  Do you expect the shuffle to be
skewed, or do you think it will be well-balanced?  Not that I'll have any
suggestions for you based on the answer, but it may help us reproduce it
and try to fix whatever the root cause is.

thanks,
Imran



On Wed, Mar 4, 2015 at 12:30 PM, Thomas Gerber thomas.ger...@radius.com
wrote:

 I meant spark.default.parallelism of course.

 On Wed, Mar 4, 2015 at 10:24 AM, Thomas Gerber thomas.ger...@radius.com
 wrote:

 Follow up:
 We re-retried, this time after *decreasing* spark.parallelism. It was set
 to 16000 before, (5 times the number of cores in our cluster). It is now
 down to 6400 (2 times the number of cores).

 And it got past the point where it failed before.

 Does the MapOutputTracker have a limit on the number of tasks it can
 track?


 On Wed, Mar 4, 2015 at 8:15 AM, Thomas Gerber thomas.ger...@radius.com
 wrote:

 Hello,

 We are using spark 1.2.1 on a very large cluster (100 c3.8xlarge
 workers). We use spark-submit to start an application.

 We got the following error which leads to a failed stage:

 Job aborted due to stage failure: Task 3095 in stage 140.0 failed 4 times, 
 most recent failure: Lost task 3095.3 in stage 140.0 (TID 308697, 
 ip-10-0-12-88.ec2.internal): org.apache.spark.SparkException: Error 
 communicating with MapOutputTracker


 We tried the whole application again, and it failed on the same stage
 (but it got more tasks completed on that stage) with the same error.

 We then looked at executors stderr, and all show similar logs, on both
 runs (see below). As far as we can tell, executors and master have disk
 space left.

 *Any suggestion on where to look to understand why the communication
 with the MapOutputTracker fails?*

 Thanks
 Thomas
 
 In case it matters, our akka settings:
 spark.akka.frameSize 50
 spark.akka.threads 8
 // those below are 10* the default, to cope with large GCs
 spark.akka.timeout 1000
 spark.akka.heartbeat.pauses 6
 spark.akka.failure-detector.threshold 3000.0
 spark.akka.heartbeat.interval 1

 Appendix: executor logs, where it starts going awry

 15/03/04 11:45:00 INFO CoarseGrainedExecutorBackend: Got assigned task 
 298525
 15/03/04 11:45:00 INFO Executor: Running task 3083.0 in stage 140.0 (TID 
 298525)
 15/03/04 11:45:00 INFO MemoryStore: ensureFreeSpace(1473) called with 
 curMem=5543008799, maxMem=18127202549
 15/03/04 11:45:00 INFO MemoryStore: Block broadcast_339_piece0 stored as 
 bytes in memory (estimated size 1473.0 B, free 11.7 GB)
 15/03/04 11:45:00 INFO BlockManagerMaster: Updated info of block 
 broadcast_339_piece0
 15/03/04 11:45:00 INFO TorrentBroadcast: Reading broadcast variable 339 
 took 224 ms
 15/03/04 11:45:00 INFO MemoryStore: ensureFreeSpace(2536) called with 
 curMem=5543010272, maxMem=18127202549
 15/03/04 11:45:00 INFO MemoryStore: Block broadcast_339 stored as values in 
 memory (estimated size 2.5 KB, free 11.7 GB)
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Doing the fetch; tracker 
 actor = 
 Actor[akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal:52380/user/MapOutputTracker#-2057016370]
 15/03/04 11:45:00 INFO 

Re: FetchFailedException: Adjusted frame length exceeds 2147483647: 12716268407 - discarded

2015-03-20 Thread Imran Rashid
I think you are running into a combo of

https://issues.apache.org/jira/browse/SPARK-5928
and
https://issues.apache.org/jira/browse/SPARK-5945

The standard solution is to just increase the number of partitions you are
creating. textFile(), reduceByKey(), and sortByKey() all take an optional
second argument, where you can specify the number of partitions you use.
It looks its using spark.default.parallelism right now, which will be the
number of cores in your cluster usually (not sure what that is in your
case).  The exception you gave shows your about 6x over the limit in at
least this one case, so I'd start by with at least 10x the number of
partitions you have now, and increase until it works (or you run into some
other problem from too many partitions ...)

I'd also strongly suggest doing the filter before you do the sortByKey --
no reason to force all that data if you're going to through a lot of it
away.  Its not completely clear where you are hitting the error now -- that
alone. might even solve your problem.

hope this helps,
Imran


On Thu, Mar 19, 2015 at 5:28 PM, roni roni.epi...@gmail.com wrote:

 I get 2 types of error -
 -org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output
 location for shuffle 0 and
 FetchFailedException: Adjusted frame length exceeds 2147483647:
 12716268407 - discarded

 Spar keeps re-trying to submit the code and keeps getting this error.

 My file on which I am finding  the sliding window strings is 500 MB  and I
 am doing it with length = 150.
 It woks fine till length is 100.

 This is my code -
  val hgfasta = sc.textFile(args(0)) // read the fasta file
 val kCount = hgfasta.flatMap(r = { r.sliding(args(2).toInt) })
 val kmerCount = kCount.map(x = (x, 1)).reduceByKey(_ + _).map { case
 (x, y) = (y, x) }.sortByKey(false).map { case (i, j) = (j, i) }

   val filtered = kmerCount.filter(kv = kv._2  5)
   filtered.map(kv = kv._1 + ,  +
 kv._2.toLong).saveAsTextFile(args(1))

   }
 It gets stuck and flat map and save as Text file  Throws
 -org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output
 location for shuffle 0 and

 org.apache.spark.shuffle.FetchFailedException: Adjusted frame length exceeds 
 2147483647: 12716268407 - discarded
   at 
 org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
   at 
 org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
   at 
 org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)





Re: Why I didn't see the benefits of using KryoSerializer

2015-03-20 Thread Imran Rashid
Hi Yong,

yes I think your analysis is correct.  I'd imagine almost all serializers
out there will just convert a string to its utf-8 representation.  You
might be interested in adding compression on top of a serializer, which
would probably bring the string size down in almost all cases, but then you
also need to take the time for compression.  Kryo is generally more
efficient than the java serializer on complicated object types.

I guess I'm still a little surprised that kryo is slower than java
serialization for you.  You might try setting
spark.kryo.referenceTracking to false if you are just serializing objects
with no circular references.  I think that will improve the performance a
little, though I dunno how much.

It might be worth running your experiments again with slightly more
complicated objects and see what you observe.

Imran


On Thu, Mar 19, 2015 at 12:57 PM, java8964 java8...@hotmail.com wrote:

 I read the Spark code a little bit, trying to understand my own question.

 It looks like the different is really between
 org.apache.spark.serializer.JavaSerializer and
 org.apache.spark.serializer.KryoSerializer, both having the method named
 writeObject.

 In my test case, for each line of my text file, it is about 140 bytes of
 String. When either JavaSerializer.writeObject(140 bytes of String) or
 KryoSerializer.writeObject(140 bytes of String), I didn't see difference in
 the underline OutputStream space usage.

 Does this mean that KryoSerializer really doesn't give us any benefit for
 String type? I understand that for primitives types, it shouldn't have any
 benefits, but how about String type?

 When we talk about lower the memory using KryoSerializer in spark, under
 what case it can bring significant benefits? It is my first experience with
 the KryoSerializer, so maybe I am total wrong about its usage.

 Thanks

 Yong

 --
 From: java8...@hotmail.com
 To: user@spark.apache.org
 Subject: Why I didn't see the benefits of using KryoSerializer
 Date: Tue, 17 Mar 2015 12:01:35 -0400


 Hi, I am new to Spark. I tried to understand the memory benefits of using
 KryoSerializer.

 I have this one box standalone test environment, which is 24 cores with
 24G memory. I installed Hadoop 2.2 plus Spark 1.2.0.

 I put one text file in the hdfs about 1.2G.  Here is the settings in the
 spark-env.sh

 export SPARK_MASTER_OPTS=-Dspark.deploy.defaultCores=4
 export SPARK_WORKER_MEMORY=32g
 export SPARK_DRIVER_MEMORY=2g
 export SPARK_EXECUTOR_MEMORY=4g

 First test case:
 val log=sc.textFile(hdfs://namenode:9000/test_1g/)
 log.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY)
 log.count()
 log.count()

 The data is about 3M rows. For the first test case, from the storage in
 the web UI, I can see Size in Memory is 1787M, and Fraction Cached is
 70% with 7 cached partitions.
 This matched with what I thought, and first count finished about 17s, and
 2nd count finished about 6s.

 2nd test case after restart the spark-shell:
 val log=sc.textFile(hdfs://namenode:9000/test_1g/)
 log.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY_SER)
 log.count()
 log.count()

 Now from the web UI, I can see Size in Memory is 1231M, and Fraction
 Cached is 100% with 10 cached partitions. It looks like caching the
 default java serialized format reduce the memory usage, but coming with a
 cost that first count finished around 39s and 2nd count finished around 9s.
 So the job runs slower, with less memory usage.

 So far I can understand all what happened and the tradeoff.

 Now the problem comes with when I tried to test with KryoSerializer

 SPARK_JAVA_OPTS=-Dspark.serializer=org.apache.spark.serializer.KryoSerializer
 /opt/spark/bin/spark-shell
 val log=sc.textFile(hdfs://namenode:9000/test_1g/)
 log.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY_SER)
 log.count()
 log.count()

 First, I saw that the new serializer setting passed in, as proven in the
 Spark Properties of Environment shows 

 spark.driver.extraJavaOptions

   -Dspark.serializer=org.apache.spark.serializer.KryoSerializer
   . This is not there for first 2 test cases.
 But in the web UI of Storage, the Size in Memory is 1234M, with 100%
 Fraction Cached and 10 cached partitions. The first count took 46s and
 2nd count took 23s.

 I don't get much less memory size as I expected, but longer run time for
 both counts. Anything I did wrong? Why the memory foot print of 
 MEMORY_ONLY_SER
 for KryoSerializer still use the same size as default Java serializer, with
 worse duration?

 Thanks

 Yong



Re: Spark will process _temporary folder on S3 is very slow and always cause failure

2015-03-17 Thread Imran Rashid
I'm not super familiar w/ S3, but I think the issue is that you want to use
a different output committers with object stores, that don't have a
simple move operation.  There have been a few other threads on S3 
outputcommitters.  I think the most relevant for you is most probably this
open JIRA:

https://issues.apache.org/jira/browse/SPARK-6352

On Fri, Mar 13, 2015 at 5:51 PM, Shuai Zheng szheng.c...@gmail.com wrote:

 Hi All,



 I try to run a sorting on a r3.2xlarge instance on AWS. I just try to run
 it as a single node cluster for test. The data I use to sort is around 4GB
 and sit on S3, output will also on S3.



 I just connect spark-shell to the local cluster and run the code in the
 script (because I just want a benchmark now).



 My job is as simple as:

 val parquetFile =
 sqlContext.parquetFile(s3n://...,s3n://...,s3n://...,s3n://...,s3n://...,s3n://...,s3n://...,)

 parquetFile.registerTempTable(Test)

 val sortedResult = sqlContext.sql(SELECT * FROM Test order by time).map
 { row = { row.mkString(\t) } }

 sortedResult.saveAsTextFile(s3n://myplace,);



 The job takes around 6 mins to finish the sort when I am monitoring the
 process. After I notice the process stop at:



 15/03/13 22:38:27 INFO DAGScheduler: Job 2 finished: saveAsTextFile at
 console:31, took 581.304992 s



 At that time, the spark actually just write all the data to the _temporary
 folder first, after all sub-tasks finished, it will try to move all the
 ready result from _temporary folder to the final location. This process
 might be quick locally (because it will just be a cut/paste), but it looks
 like very slow on my S3, it takes a few second to move one file (usually
 there will be 200 partitions). And then it raise exceptions after it move
 might be 40-50 files.



 org.apache.http.NoHttpResponseException: The target server failed to
 respond

 at
 org.apache.http.impl.conn.DefaultResponseParser.parseHead(DefaultResponseParser.java:101)

 at
 org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:252)

 at
 org.apache.http.impl.AbstractHttpClientConnection.receiveResponseHeader(AbstractHttpClientConnection.java:281)

 at
 org.apache.http.impl.conn.DefaultClientConnection.receiveResponseHeader(DefaultClientConnection.java:247)

 at
 org.apache.http.impl.conn.AbstractClientConnAdapter.receiveResponseHeader(AbstractClientConnAdapter.java:219)





 I try several times, but never get the full job finished. I am not sure
 anything wrong here, but I use something very basic and I can see the job
 has finished and all result on the S3 under temporary folder, but then it
 raise the exception and fail.



 Any special setting I should do here when deal with S3?



 I don’t know what is the issue here, I never see MapReduce has similar
 issue. So it could not be S3’s problem.



 Regards,



 Shuai



Re: Need Advice about reading lots of text files

2015-03-17 Thread Imran Rashid
Interesting, on another thread, I was just arguing that the user should
*not* open the files themselves and read them, b/c then they lose all the
other goodies we have in HadoopRDD, eg. the metric tracking.

I think this encourages Pat's argument that we might actually need better
support for this in spark context itself?

On Sat, Mar 14, 2015 at 1:11 PM, Michael Armbrust mich...@databricks.com
wrote:


 Here is how I have dealt with many small text files (on s3 though this
 should generalize) in the past:

 http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201411.mbox/%3ccaaswr-58p66-es2haxh4i+bu__0rvxd2okewkly0mee8rue...@mail.gmail.com%3E




 FromMichael Armbrust mich...@databricks.comSubjectRe:
 S3NativeFileSystem inefficient implementation when calling sc.textFile
 DateThu, 27 Nov 2014 03:20:14 GMT

 In the past I have worked around this problem by avoiding sc.textFile().
 Instead I read the data directly inside of a Spark job.  Basically, you
 start with an RDD where each entry is a file in S3 and then flatMap that
 with something that reads the files and returns the lines.

 Here's an example: https://gist.github.com/marmbrus/fff0b058f134fa7752fe

 Using this class you can do something like:

 sc.parallelize(s3n://mybucket/file1 :: s3n://mybucket/file1 ... ::
 Nil).flatMap(new ReadLinesSafe(_))

 You can also build up the list of files by running a Spark 
 job:https://gist.github.com/marmbrus/15e72f7bc22337cf6653

 Michael


 On Sat, Mar 14, 2015 at 10:38 AM, Pat Ferrel p...@occamsmachete.com
 wrote:

 It’s a long story but there are many dirs with smallish part- files
 in them so we create a list of the individual files as input
 to sparkContext.textFile(fileList). I suppose we could move them and rename
 them to be contiguous part- files in one dir. Would that be better than
 passing in a long list of individual filenames? We could also make the part
 files much larger by collecting the smaller ones. But would any of this
 make a difference in IO speed?

 I ask because using the long file list seems to read, what amounts to a
 not very large data set rather slowly. If it were all in large part files
 in one dir I’d expect it to go much faster but this is just intuition.


 On Mar 14, 2015, at 9:58 AM, Koert Kuipers ko...@tresata.com wrote:

 why can you not put them in a directory and read them as one input? you
 will get a task per file, but spark is very fast at executing many tasks
 (its not a jvm per task).

 On Sat, Mar 14, 2015 at 12:51 PM, Pat Ferrel p...@occamsmachete.com
 wrote:

 Any advice on dealing with a large number of separate input files?


 On Mar 13, 2015, at 4:06 PM, Pat Ferrel p...@occamsmachete.com wrote:

 We have many text files that we need to read in parallel. We can create
 a comma delimited list of files to pass in to
 sparkContext.textFile(fileList). The list can get very large (maybe 1)
 and is all on hdfs.

 The question is: what is the most performant way to read them? Should
 they be broken up and read in groups appending the resulting RDDs or should
 we just pass in the entire list at once? In effect I’m asking if Spark does
 some optimization of whether we should do it explicitly. If the later, what
 rule might we use depending on our cluster setup?
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org








Re: Process time series RDD after sortByKey

2015-03-16 Thread Imran Rashid
Hi Shuai,

yup, that is exactly what I meant -- implement your own class
MyGroupingRDD.  This is definitely more detail than a lot of users will
need to go, but its also not all that scary either.  In this case, you want
something that is *extremely* close to the existing CoalescedRDD, so start
by looking at that code.

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala

The only thing which is complicated in CoalescedRDD is the
PartitionCoalescer, but that is completely irrelevant for you, so you can
ignore it.  I started writing up a description of what to do but then I
realized just writing the code would be easier :)  Totally untested, but
here you go:

https://gist.github.com/squito/c2d1dd5413a60830d6f3

The only really interesting part here is getPartitions:

https://gist.github.com/squito/c2d1dd5413a60830d6f3#file-groupedrdd-scala-L31

That's where you create partitions in your new RDD, which depend on
multiple RDDs from the parent.  Also note that compute() is very simple:
you just concatenate together the iterators from each of the parent RDDs:

https://gist.github.com/squito/c2d1dd5413a60830d6f3#file-groupedrdd-scala-L37

let me know how it goes!


On Mon, Mar 16, 2015 at 5:15 PM, Shuai Zheng szheng.c...@gmail.com wrote:

 Hi Imran,



 I am a bit confused here. Assume I have RDD a with 1000 partition and also
 has been sorted. How can I control when creating RDD b (with 20 partitions)
 to make sure 1-50 partition of RDD a map to 1st partition of RDD b? I
 don’t see any control code/logic here?



 You code below:



 val groupedRawData20Partitions = new MyGroupingRDD(rawData1000Partitions)





 Does it means I need to define/develop my own MyGroupingRDD class? I am
 not very clear how to do that, any place I can find an example? I never
 create my own RDD class before (not RDD instance J). But this is very
 valuable approach to me so I am desired to learn.



 Regards,



 Shuai



 *From:* Imran Rashid [mailto:iras...@cloudera.com]
 *Sent:* Monday, March 16, 2015 11:22 AM
 *To:* Shawn Zheng; user@spark.apache.org
 *Subject:* Re: Process time series RDD after sortByKey



 Hi Shuai,



 On Sat, Mar 14, 2015 at 11:02 AM, Shawn Zheng szheng.c...@gmail.com
 wrote:

 Sorry I response late.

 Zhan Zhang's solution is very interesting and I look at into it, but it is
 not what I want. Basically I want to run the job sequentially and also gain
 parallelism. So if possible, if I have 1000 partition, the best case is I
 can run it as 20 subtask, each one take partition: 1-50, 51-100, 101-150,
 etc.

 If we have ability to do this, we will gain huge flexibility when we try
 to process some time series like data and a lot of algo will benefit from
 it.



 yes, this is what I was suggesting you do.  You would first create one RDD
 (a) that has 1000 partitions.  Don't worry about the creation of this RDD
 -- it wont' create any tasks, its just a logical holder of your raw data.
 Then you create another RDD (b) that depends on your RDD (a), but that only
 has 20 partitions.  Each partition in (b) would depend on a number of
 partitions from (a).  As you've suggested, partition 1 in (b) would depend
 on partitions 1-50 in (a), partition 2 in (b) would depend on 51-100 in
 (a), etc.   Note that RDD (b) still doesn't *do* anything.  Its just
 another logical holder for your data, but this time grouped in the way you
 want.  Then after RDD (b), you would do whatever other transformations you
 wanted, but now you'd be working w/ 20 partitions:



 val rawData1000Partitions = sc.textFile(...) // or whatever

 val groupedRawData20Partitions = new MyGroupingRDD(rawData1000Partitions)

 groupedRawData20Partitions.map{...}.filter{...}.reduceByKey{...} //etc.



 note that this is almost exactly the same as what CoalescedRdd does.
 However, it might combine the partitions in whatever ways it feels like --
 you want them combined in a very particular order.  So you'll need to
 create your own subclass.





 Back to Zhan Zhang's

 while( iterPartition  RDD.partitions.length) {

   val res = sc.runJob(this, (it: Iterator[T]) =
 somFunc, iterPartition, allowLocal = true)

   Some other function after processing one partition.

   iterPartition += 1

 }

 I am curious how spark process this without parallelism, the indidivual
 partition will pass back to driver to process or just run one task on that
 node which partition exist? then follow by another partition on another
 node?





 Not exactly.  The partition is not shipped back to the driver.  You create
 a task which will be processed by a worker.  The task scheduling will take
 data locality into account, so ideally the task will get scheduled in the
 same location where the data already resides.  The worker will execute
 someFunc, and after its done it will ship the *result* back to the driver.
 Then the process will get repeated for all the other partitions.



 If you wanted all the data sent back

Re: How to preserve/preset partition information when load time series data?

2015-03-16 Thread Imran Rashid
Hi Shuai,

It should certainly be possible to do it that way, but I would recommend
against it.  If you look at HadoopRDD, its doing all sorts of little
book-keeping that you would most likely want to mimic.  eg., tracking the
number of bytes  records that are read, setting up all the hadoop
configuration, splits, readers, scheduling tasks for locality, etc.  Thats
why I suggested that really you want to just create a small variant of
HadoopRDD.

hope that helps,
Imran


On Sat, Mar 14, 2015 at 11:10 AM, Shawn Zheng szheng.c...@gmail.com wrote:

 Sorry for reply late.

 But I just think of one solution: if I load all the file name itself (not
 the contain of the file), so I have a RDD[key, iterable[filename]], then I
 run mapPartitionsToPair on it with preservesPartitioning=true

 Do you think it is a right solution? I am not sure whether it has
 potential issue if I try to fake/enforce the partition in my own way.

 Regards,

 Shuai

 On Wed, Mar 11, 2015 at 8:09 PM, Imran Rashid iras...@cloudera.com
 wrote:

 It should be *possible* to do what you want ... but if I understand you
 right, there isn't really any very easy way to do it.  I think you would
 need to write your own subclass of RDD, which has its own logic on how the
 input files get put divided among partitions.  You can probably subclass
 HadoopRDD and just modify getPartitions().  your logic could look at the
 day of each filename to decide which partition it goes into.  You'd need to
 make corresponding changes for HadoopPartition  the compute() method.

 (or if you can't subclass HadoopRDD directly you can use it for
 inspiration.)

 On Mon, Mar 9, 2015 at 11:18 AM, Shuai Zheng szheng.c...@gmail.com
 wrote:

 Hi All,



 If I have a set of time series data files, they are in parquet format
 and the data for each day are store in naming convention, but I will not
 know how many files for one day.



 20150101a.parq

 20150101b.parq

 20150102a.parq

 20150102b.parq

 20150102c.parq

 …

 201501010a.parq

 …



 Now I try to write a program to process the data. And I want to make
 sure each day’s data into one partition, of course I can load all into one
 big RDD to do partition but it will be very slow. As I already know the
 time series of the file name, is it possible for me to load the data into
 the RDD also preserve the partition? I know I can preserve the partition by
 each file, but is it anyway for me to load the RDD and preserve partition
 based on a set of files: one partition multiple files?



 I think it is possible because when I load a RDD from 100 files (assume
 cross 100 days), I will have 100 partitions (if I disable file split when
 load file). Then I can use a special coalesce to repartition the RDD? But I
 don’t know is it possible to do that in current Spark now?



 Regards,



 Shuai






Re: Process time series RDD after sortByKey

2015-03-16 Thread Imran Rashid
Hi Shuai,

On Sat, Mar 14, 2015 at 11:02 AM, Shawn Zheng szheng.c...@gmail.com wrote:

 Sorry I response late.

 Zhan Zhang's solution is very interesting and I look at into it, but it is
 not what I want. Basically I want to run the job sequentially and also gain
 parallelism. So if possible, if I have 1000 partition, the best case is I
 can run it as 20 subtask, each one take partition: 1-50, 51-100, 101-150,
 etc.
 If we have ability to do this, we will gain huge flexibility when we try
 to process some time series like data and a lot of algo will benefit from
 it.


yes, this is what I was suggesting you do.  You would first create one RDD
(a) that has 1000 partitions.  Don't worry about the creation of this RDD
-- it wont' create any tasks, its just a logical holder of your raw data.
Then you create another RDD (b) that depends on your RDD (a), but that only
has 20 partitions.  Each partition in (b) would depend on a number of
partitions from (a).  As you've suggested, partition 1 in (b) would depend
on partitions 1-50 in (a), partition 2 in (b) would depend on 51-100 in
(a), etc.   Note that RDD (b) still doesn't *do* anything.  Its just
another logical holder for your data, but this time grouped in the way you
want.  Then after RDD (b), you would do whatever other transformations you
wanted, but now you'd be working w/ 20 partitions:

val rawData1000Partitions = sc.textFile(...) // or whatever
val groupedRawData20Partitions = new MyGroupingRDD(rawData1000Partitions)
groupedRawData20Partitions.map{...}.filter{...}.reduceByKey{...} //etc.

note that this is almost exactly the same as what CoalescedRdd does.
However, it might combine the partitions in whatever ways it feels like --
you want them combined in a very particular order.  So you'll need to
create your own subclass.



 Back to Zhan Zhang's

 while( iterPartition  RDD.partitions.length) {
   val res = sc.runJob(this, (it: Iterator[T]) =
 somFunc, iterPartition, allowLocal = true)
   Some other function after processing one partition.
   iterPartition += 1
 }
 I am curious how spark process this without parallelism, the indidivual
 partition will pass back to driver to process or just run one task on that
 node which partition exist? then follow by another partition on another
 node?



Not exactly.  The partition is not shipped back to the driver.  You create
a task which will be processed by a worker.  The task scheduling will take
data locality into account, so ideally the task will get scheduled in the
same location where the data already resides.  The worker will execute
someFunc, and after its done it will ship the *result* back to the driver.
Then the process will get repeated for all the other partitions.

If you wanted all the data sent back to the driver, you could use
RDD.toLocalIterator.  That will send one partition back to the driver, let
you process it on the driver, then fetch the next partition, etc.


Imran


Re: Workaround for spark 1.2.X roaringbitmap kryo problem?

2015-03-12 Thread Imran Rashid
Giving a bit more detail on the error would make it a lot easier for others
to help you out. Eg., in this case, it would have helped if included your
actual compile error.

In any case, I'm assuming your issue is b/c that class if private to
spark.  You can sneak around that by using
Class.forName(stringOfClassName) instead:

scala classOf[org.apache.spark.scheduler.HighlyCompressedMapStatus]
 console:8: error: class HighlyCompressedMapStatus in package scheduler
 cannot be accessed in package org.apache.spark.scheduler
   classOf[org.apache.spark.scheduler.HighlyCompressedMapStatus]
  ^
 scala
 Class.forName(org.apache.spark.scheduler.HighlyCompressedMapStatus)
 res1: Class[_] = class org.apache.spark.scheduler.HighlyCompressedMapStatus



hope this helps,
Imran


On Thu, Mar 12, 2015 at 12:47 PM, Arun Luthra arun.lut...@gmail.com wrote:

 I'm using a pre-built Spark; I'm not trying to compile Spark.

 The compile error appears when I try to register HighlyCompressedMapStatus
 in my program:

 kryo.register(classOf[org.apache.spark.scheduler.HighlyCompressedMapStatus])

 If I don't register it, I get a runtime error saying that it needs to be
 registered (the error is only when I turn on kryo).

 However the code is running smoothly with kryo turned off.

 On Wed, Mar 11, 2015 at 5:38 PM, Imran Rashid iras...@cloudera.com
 wrote:

 I'm not sure what you mean.   Are you asking how you can recompile all of
 spark and deploy it, instead of using one of the pre-built versions?

 https://spark.apache.org/docs/latest/building-spark.html

 Or are you seeing compile problems specifically w/
 HighlyCompressedMapStatus?   The code compiles fine, so I'm not sure what
 problem you are running into -- we'd need a lot more info to help

 On Tue, Mar 10, 2015 at 6:54 PM, Arun Luthra arun.lut...@gmail.com
 wrote:

 Does anyone know how to get the HighlyCompressedMapStatus to compile?

 I will try turning off kryo in 1.2.0 and hope things don't break.  I
 want to benefit from the MapOutputTracker fix in 1.2.0.

 On Tue, Mar 3, 2015 at 5:41 AM, Imran Rashid iras...@cloudera.com
 wrote:

 the scala syntax for arrays is Array[T], not T[], so you want to use
 something:

 kryo.register(classOf[Array[org.roaringbitmap.RoaringArray$Element]])
 kryo.register(classOf[Array[Short]])

 nonetheless, the spark should take care of this itself.  I'll look into
 it later today.


 On Mon, Mar 2, 2015 at 2:55 PM, Arun Luthra arun.lut...@gmail.com
 wrote:

 I think this is a Java vs scala syntax issue. Will check.

 On Thu, Feb 26, 2015 at 8:17 PM, Arun Luthra arun.lut...@gmail.com
 wrote:

 Problem is noted here:
 https://issues.apache.org/jira/browse/SPARK-5949

 I tried this as a workaround:

 import org.apache.spark.scheduler._
 import org.roaringbitmap._

 ...


 kryo.register(classOf[org.roaringbitmap.RoaringBitmap])
 kryo.register(classOf[org.roaringbitmap.RoaringArray])
 kryo.register(classOf[org.roaringbitmap.ArrayContainer])

 kryo.register(classOf[org.apache.spark.scheduler.HighlyCompressedMapStatus])
 kryo.register(classOf[org.roaringbitmap.RoaringArray$Element])
 kryo.register(classOf[org.roaringbitmap.RoaringArray$Element[]])
 kryo.register(classOf[short[]])


 in build file:

 libraryDependencies += org.roaringbitmap % RoaringBitmap %
 0.4.8


 This fails to compile:

 ...:53: identifier expected but ']' found.

 [error]
 kryo.register(classOf[org.roaringbitmap.RoaringArray$Element[]])

 also:

 :54: identifier expected but ']' found.

 [error] kryo.register(classOf[short[]])
 also:

 :51: class HighlyCompressedMapStatus in package scheduler cannot be
 accessed in package org.apache.spark.scheduler
 [error]
 kryo.register(classOf[org.apache.spark.scheduler.HighlyCompressedMapStatus])


 Suggestions?

 Arun









Re: saveAsTextFile extremely slow near finish

2015-03-11 Thread Imran Rashid
is your data skewed?  Could it be that there are a few keys with a huge
number of records?  You might consider outputting
(recordA, count)
(recordB, count)

instead of

recordA
recordA
recordA
...


you could do this with:

input = sc.textFile
pairsCounts = input.map{x = (x,1)}.reduceByKey{_ + _}
sorted = pairs.sortByKey
sorted.saveAsTextFile


On Mon, Mar 9, 2015 at 12:31 PM, mingweili0x m...@spokeo.com wrote:

 I'm basically running a sorting using spark. The spark program will read
 from
 HDFS, sort on composite keys, and then save the partitioned result back to
 HDFS.
 pseudo code is like this:

 input = sc.textFile
 pairs = input.mapToPair
 sorted = pairs.sortByKey
 values = sorted.values
 values.saveAsTextFile

  Input size is ~ 160G, and I made 1000 partitions specified in
 JavaSparkContext.textFile and JavaPairRDD.sortByKey. From WebUI, the job is
 splitted into two stages: saveAsTextFile and mapToPair. MapToPair finished
 in 8 mins. While saveAsTextFile took ~15mins to reach (2366/2373) progress
 and the last few jobs just took forever and never finishes.

 Cluster setup:
 8 nodes
 on each node: 15gb memory, 8 cores

 running parameters:
 --executor-memory 12G
 --conf spark.cores.max=60

 Thank you for any help.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/saveAsTextFile-extremely-slow-near-finish-tp21978.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Top, takeOrdered, sortByKey

2015-03-11 Thread Imran Rashid
I am not entirely sure I understand your question -- are you saying:

* scoring a sample of 50k events is fast
* taking the top N scores of 77M events is slow, no matter what N is

?

if so, this shouldn't come as a huge surprise.  You can't find the top
scoring elements (no matter how small N is) unless you score all 77M of
them.  Very naively, you would expect scoring 77M events to take ~1000
times as long as scoring 50k events, right?  The fact that it doesn't take
that much longer is probably b/c of the overhead of just launching the jobs.



On Mon, Mar 9, 2015 at 4:21 PM, Saba Sehrish ssehr...@fnal.gov wrote:



  *From:* Saba Sehrish ssehr...@fnal.gov
 *Date:* March 9, 2015 at 4:11:07 PM CDT
 *To:* user-...@spark.apache.org
 *Subject:* *Using top, takeOrdered, sortByKey*

   I am using spark for a template matching problem. We have 77 million
 events in the template library, and we compare energy of each of the input
 event with the each of the template event and return a score. In the end we
 return best 1 matches with lowest score. A score of 0 is a perfect
 match.

  I down sampled the problem to use only 50k events. For a single event
 matching across all the events in the template (50k) I see 150-200ms for
 score calculation on 25 cores (using YARN cluster), but after that when I
 perform either a top or takeOrdered or even sortByKey the time reaches to
 25-50s.
 So far I am not able to figure out why such a huge gap going from a list
 of scores to a list of top 1000 scores and why sorting or getting best X
 matches is being dominant by a large factor. One thing I have noticed is
 that it doesn’t matter how many elements I return the time range is the
 same 25-50s for 10 - 1 elements.

  Any suggestions? if I am not using API properly?

  scores is JavaPairRDDInteger, Double, and I do something like
 numbestmatches is 10, 100, 1 or any number.

   List Tuple2Integer, Double bestscores_list =
 scores.takeOrdered(numbestmatches, new TupleComparator());
  Or
  List Tuple2Integer, Double bestscores_list =
 scores.top(numbestmatches, new TupleComparator());
  Or
  List Tuple2Integer, Double bestscores_list = scores.sortByKey();




Re: Workaround for spark 1.2.X roaringbitmap kryo problem?

2015-03-11 Thread Imran Rashid
I'm not sure what you mean.   Are you asking how you can recompile all of
spark and deploy it, instead of using one of the pre-built versions?

https://spark.apache.org/docs/latest/building-spark.html

Or are you seeing compile problems specifically w/
HighlyCompressedMapStatus?   The code compiles fine, so I'm not sure what
problem you are running into -- we'd need a lot more info to help

On Tue, Mar 10, 2015 at 6:54 PM, Arun Luthra arun.lut...@gmail.com wrote:

 Does anyone know how to get the HighlyCompressedMapStatus to compile?

 I will try turning off kryo in 1.2.0 and hope things don't break.  I want
 to benefit from the MapOutputTracker fix in 1.2.0.

 On Tue, Mar 3, 2015 at 5:41 AM, Imran Rashid iras...@cloudera.com wrote:

 the scala syntax for arrays is Array[T], not T[], so you want to use
 something:

 kryo.register(classOf[Array[org.roaringbitmap.RoaringArray$Element]])
 kryo.register(classOf[Array[Short]])

 nonetheless, the spark should take care of this itself.  I'll look into
 it later today.


 On Mon, Mar 2, 2015 at 2:55 PM, Arun Luthra arun.lut...@gmail.com
 wrote:

 I think this is a Java vs scala syntax issue. Will check.

 On Thu, Feb 26, 2015 at 8:17 PM, Arun Luthra arun.lut...@gmail.com
 wrote:

 Problem is noted here: https://issues.apache.org/jira/browse/SPARK-5949

 I tried this as a workaround:

 import org.apache.spark.scheduler._
 import org.roaringbitmap._

 ...


 kryo.register(classOf[org.roaringbitmap.RoaringBitmap])
 kryo.register(classOf[org.roaringbitmap.RoaringArray])
 kryo.register(classOf[org.roaringbitmap.ArrayContainer])

 kryo.register(classOf[org.apache.spark.scheduler.HighlyCompressedMapStatus])
 kryo.register(classOf[org.roaringbitmap.RoaringArray$Element])
 kryo.register(classOf[org.roaringbitmap.RoaringArray$Element[]])
 kryo.register(classOf[short[]])


 in build file:

 libraryDependencies += org.roaringbitmap % RoaringBitmap % 0.4.8


 This fails to compile:

 ...:53: identifier expected but ']' found.

 [error]
 kryo.register(classOf[org.roaringbitmap.RoaringArray$Element[]])

 also:

 :54: identifier expected but ']' found.

 [error] kryo.register(classOf[short[]])
 also:

 :51: class HighlyCompressedMapStatus in package scheduler cannot be
 accessed in package org.apache.spark.scheduler
 [error]
 kryo.register(classOf[org.apache.spark.scheduler.HighlyCompressedMapStatus])


 Suggestions?

 Arun







Re: Process time series RDD after sortByKey

2015-03-11 Thread Imran Rashid
this is a very interesting use case.  First of all, its worth pointing out
that if you really need to process the data sequentially, fundamentally you
are limiting the parallelism you can get.  Eg., if you need to process the
entire data set sequentially, then you can't get any parallelism.  If you
can process each hour separately, but need to process data within an hour
sequentially, then the max parallelism you can get for one days is 24.

But lets say you're OK with that.  Zhan Zhang solution is good if you just
want to process the entire dataset sequentially.  But what if you wanted to
process each hour separately, so you at least can create 24 tasks that can
be run in parallel for one day?  I think you would need to create your own
subclass of RDD that is similar in spirit to what CoalescedRDD does.  Your
RDD would have 24 partitions, and each partition would depend on some set
of partitions in its parent (your sorted RDD with 1000 partitions).  I
don't think you could use CoalescedRDD directly b/c you want more control
over the way the partitions get grouped together.

this answer is very similar to my answer to your other question about
controlling partitions , hope its helps! :)


On Mon, Mar 9, 2015 at 5:41 PM, Shuai Zheng szheng.c...@gmail.com wrote:

 Hi All,



 I am processing some time series data. For one day, it might has 500GB,
 then for each hour, it is around 20GB data.



 I need to sort the data before I start process. Assume I can sort them
 successfully



 *dayRDD.sortByKey*



 but after that, I might have thousands of partitions (to make the sort
 successfully), might be 1000 partitions. And then I try to process the data
 by hour (not need exactly one hour, but some kind of similar time frame).
 And I can’t just re-partition size to 24 because then one partition might
 be too big to fit into memory (if it is 20GB). So is there any way for me
 to just can process underlying partitions by certain order? Basically I
 want to call mapPartitionsWithIndex with a range of index?



 Anyway to do it? Hope I describe my issue clear… J



 Regards,



 Shuai







Re: can spark take advantage of ordered data?

2015-03-11 Thread Imran Rashid
Hi Jonathan,

you might be interested in https://issues.apache.org/jira/browse/SPARK-3655
(not yet available) and https://github.com/tresata/spark-sorted (not part
of spark, but it is available right now).  Hopefully thats what you are
looking for.  To the best of my knowledge that covers what is available now
/ what is being worked on.

Imran

On Wed, Mar 11, 2015 at 4:38 PM, Jonathan Coveney jcove...@gmail.com
wrote:

 Hello all,

 I am wondering if spark already has support for optimizations on sorted
 data and/or if such support could be added (I am comfortable dropping to a
 lower level if necessary to implement this, but I'm not sure if it is
 possible at all).

 Context: we have a number of data sets which are essentially already
 sorted on a key. With our current systems, we can take advantage of this to
 do a lot of analysis in a very efficient fashion...merges and joins, for
 example, can be done very efficiently, as can folds on a secondary key and
 so on.

 I was wondering if spark would be a fit for implementing these sorts of
 optimizations? Obviously it is sort of a niche case, but would this be
 achievable? Any pointers on where I should look?



Re: Running Spark from Scala source files other than main file

2015-03-11 Thread Imran Rashid
did you forget to specify the main class w/ --class Main?  though if that
was it, you should at least see *some* error message, so I'm confused
myself ...

On Wed, Mar 11, 2015 at 6:53 AM, Aung Kyaw Htet akh...@gmail.com wrote:

 Hi Everyone,

 I am developing a scala app, in which the main object does not call the
 SparkContext, but another object defined in the same package creates it,
 run spark operations and closes it. The jar file is built successfully in
 maven, but when I called spark-submit with this jar, that spark does not
 seem to execute any code.

 So my code looks like

 [Main.scala]

 object Main(args) {
   def main() {
 /*check parameters */
  Component1.start(parameters)
 }
   }

 [Component1.scala]

 object Component1{
   def start{
val sc = new SparkContext(conf)
/* do spark operations */
sc.close()
   }
 }

 The above code compiles into Main.jar but spark-submit does not execute
 anything and does not show me any error (not in the logs either.)

 spark-submit master= spark:// Main.jar

 I've got this all the code working before when I wrote a single scala
 file, but now that I am separating into multiple scala source files,
 something isn't running right.

 Any advice on this would be greatly appreciated!

 Regards,
 Aung



Re: How to preserve/preset partition information when load time series data?

2015-03-11 Thread Imran Rashid
It should be *possible* to do what you want ... but if I understand you
right, there isn't really any very easy way to do it.  I think you would
need to write your own subclass of RDD, which has its own logic on how the
input files get put divided among partitions.  You can probably subclass
HadoopRDD and just modify getPartitions().  your logic could look at the
day of each filename to decide which partition it goes into.  You'd need to
make corresponding changes for HadoopPartition  the compute() method.

(or if you can't subclass HadoopRDD directly you can use it for
inspiration.)

On Mon, Mar 9, 2015 at 11:18 AM, Shuai Zheng szheng.c...@gmail.com wrote:

 Hi All,



 If I have a set of time series data files, they are in parquet format and
 the data for each day are store in naming convention, but I will not know
 how many files for one day.



 20150101a.parq

 20150101b.parq

 20150102a.parq

 20150102b.parq

 20150102c.parq

 …

 201501010a.parq

 …



 Now I try to write a program to process the data. And I want to make sure
 each day’s data into one partition, of course I can load all into one big
 RDD to do partition but it will be very slow. As I already know the time
 series of the file name, is it possible for me to load the data into the
 RDD also preserve the partition? I know I can preserve the partition by
 each file, but is it anyway for me to load the RDD and preserve partition
 based on a set of files: one partition multiple files?



 I think it is possible because when I load a RDD from 100 files (assume
 cross 100 days), I will have 100 partitions (if I disable file split when
 load file). Then I can use a special coalesce to repartition the RDD? But I
 don’t know is it possible to do that in current Spark now?



 Regards,



 Shuai



Re: scala.Double vs java.lang.Double in RDD

2015-03-04 Thread Imran Rashid
This doesn't involve spark at all, I think this is entirely an issue with
how scala deals w/ primitives and boxing.  Often it can hide the details
for you, but IMO it just leads to far more confusing errors when things
don't work out.  The issue here is that your map has value type Any, which
leads scala to leave it as a boxed java.lang.Double.

scala val x = 1.5
 x: Double = 1.5
 scala x.getClass()
 res0: Class[Double] = double
 scala x.getClass() == classOf[java.lang.Double]
 res1: Boolean = false
 scala x.getClass() == classOf[Double]
 res2: Boolean = true
 scala val arr = Array(1.5,2.5)
 arr: Array[Double] = Array(1.5, 2.5)
 scala arr.getClass().getComponentType() == x.getClass()
 res5: Boolean = true
 scala arr.getClass().getComponentType() == classOf[java.lang.Double]
 res6: Boolean = false

//this map has java.lang.Double
 scala val map: Map[String, Any] = arr.map{x = x.toString - x}.toMap
 map: Map[String,Any] = Map(1.5 - 1.5, 2.5 - 2.5)
 scala map(1.5).getClass()
 res15: Class[_] = class java.lang.Double
 scala map(1.5).getClass() == x.getClass()
 res10: Boolean = false
 scala map(1.5).getClass() == classOf[java.lang.Double]
 res11: Boolean = true
 //this one has Double
 scala val map2: Map[String, Double] = arr.map{x = x.toString - x}.toMap
 map2: Map[String,Double] = Map(1.5 - 1.5, 2.5 - 2.5)
 scala map2(1.5).getClass()
 res12: Class[Double] = double
 scala map2(1.5).getClass() == x.getClass()
 res13: Boolean = true
 scala map2(1.5).getClass() == classOf[java.lang.Double]
 res14: Boolean = false


On Wed, Mar 4, 2015 at 3:17 AM, Tobias Pfeiffer t...@preferred.jp wrote:

 Hi,

 I have a function with signature

   def aggFun1(rdd: RDD[(Long, (Long, Double))]):
 RDD[(Long, Any)]

 and one with

   def aggFun2[_Key: ClassTag, _Index](rdd: RDD[(_Key, (_Index, Double))]):
 RDD[(_Key, Double)]

 where all Double classes involved are scala.Double classes (according
 to IDEA) and my implementation of aggFun1 is just calling aggFun2 (type
 parameters _Key and _Index are inferred by the Scala compiler).

 Now I am writing a test as follows:

   val result: Map[Long, Any] = aggFun1(input).collect().toMap
   result.values.foreach(v = println(v.getClass))
   result.values.foreach(_ shouldBe a[Double])

 and I get the following output:

   class java.lang.Double
   class java.lang.Double
   [info] avg
   [info] - should compute the average *** FAILED ***
   [info]   1.75 was not an instance of double, but an instance of
 java.lang.Double

 So I am wondering about what magic is going on here. Are scala.Double
 values in RDDs automatically converted to java.lang.Doubles or am I just
 missing the implicit back-conversion etc.?

 Any help appreciated,
 Tobias




Re: Is the RDD's Partitions determined before hand ?

2015-03-04 Thread Imran Rashid
You can set the number of partitions dynamically -- its just a parameter to
a method, so you can compute it however you want, it doesn't need to be
some static constant:

val dataSizeEstimate = yourMagicFunctionToEstimateDataSize()
val numberOfPartitions =
yourConversionFromDataSizeToNumPartitions(dataSizeEstimate)


val reducedRDD = someInputRDD.reduceByKey(f, numberOfPartitions) //or
whatever else that needs to know number of partitions

of course this means you need to do the work of figuring out those magic
functions, but its certainly possible.

I agree with all of Sean's recommendations, but I guess I might put a bit
more emphasis on The one exception are operations that tend to pull data
into memory.  For me, I've found that to be a very important exception,
that can come up a lot.  And though in general a lot of partitions makes
sense, there have been recent questions on the user list about folks going
to far, using eg. 100K partitions and then having the bookkeeping overhead
dominating.  But thats a pretty big number -- you should still be able to
err on the side of too many partitions w/out going that far, I'd imagine.



On Wed, Mar 4, 2015 at 4:17 AM, Jeff Zhang zjf...@gmail.com wrote:

 Hi Sean,

   If you know a stage needs unusually high parallelism for example you
 can repartition further for that stage.

 The problem is we may don't know whether high parallelism is needed. e.g.
 for the join operator, high parallelism may only be necessary for some
 dataset that lots of data can join together while for other dataset high
 parallelism may not be necessary if only a few data can join together.

 So my question is that unable changing parallelism at runtime dynamically
 may not be flexible.



 On Wed, Mar 4, 2015 at 5:36 PM, Sean Owen so...@cloudera.com wrote:

 Hm, what do you mean? You can control, to some extent, the number of
 partitions when you read the data, and can repartition if needed.

 You can set the default parallelism too so that it takes effect for most
 ops thay create an RDD. One # of partitions is usually about right for all
 work (2x or so the number of execution slots).

 If you know a stage needs unusually high parallelism for example you can
 repartition further for that stage.
  On Mar 4, 2015 1:50 AM, Jeff Zhang zjf...@gmail.com wrote:

 Thanks Sean.

 But if the partitions of RDD is determined before hand, it would not be
 flexible to run the same program on the different dataset. Although for the
 first stage the partitions can be determined by the input data set, for the
 intermediate stage it is not possible. Users have to create policy to
 repartition or coalesce based on the data set size.


 On Tue, Mar 3, 2015 at 6:29 PM, Sean Owen so...@cloudera.com wrote:

 An RDD has a certain fixed number of partitions, yes. You can't change
 an RDD. You can repartition() or coalese() and RDD to make a new one
 with a different number of RDDs, possibly requiring a shuffle.

 On Tue, Mar 3, 2015 at 10:21 AM, Jeff Zhang zjf...@gmail.com wrote:
  I mean is it possible to change the partition number at runtime.
 Thanks
 
 
  --
  Best Regards
 
  Jeff Zhang




 --
 Best Regards

 Jeff Zhang




 --
 Best Regards

 Jeff Zhang



Re: Workaround for spark 1.2.X roaringbitmap kryo problem?

2015-03-03 Thread Imran Rashid
the scala syntax for arrays is Array[T], not T[], so you want to use
something:

kryo.register(classOf[Array[org.roaringbitmap.RoaringArray$Element]])
kryo.register(classOf[Array[Short]])

nonetheless, the spark should take care of this itself.  I'll look into it
later today.


On Mon, Mar 2, 2015 at 2:55 PM, Arun Luthra arun.lut...@gmail.com wrote:

 I think this is a Java vs scala syntax issue. Will check.

 On Thu, Feb 26, 2015 at 8:17 PM, Arun Luthra arun.lut...@gmail.com
 wrote:

 Problem is noted here: https://issues.apache.org/jira/browse/SPARK-5949

 I tried this as a workaround:

 import org.apache.spark.scheduler._
 import org.roaringbitmap._

 ...


 kryo.register(classOf[org.roaringbitmap.RoaringBitmap])
 kryo.register(classOf[org.roaringbitmap.RoaringArray])
 kryo.register(classOf[org.roaringbitmap.ArrayContainer])

 kryo.register(classOf[org.apache.spark.scheduler.HighlyCompressedMapStatus])
 kryo.register(classOf[org.roaringbitmap.RoaringArray$Element])
 kryo.register(classOf[org.roaringbitmap.RoaringArray$Element[]])
 kryo.register(classOf[short[]])


 in build file:

 libraryDependencies += org.roaringbitmap % RoaringBitmap % 0.4.8


 This fails to compile:

 ...:53: identifier expected but ']' found.

 [error]
 kryo.register(classOf[org.roaringbitmap.RoaringArray$Element[]])

 also:

 :54: identifier expected but ']' found.

 [error] kryo.register(classOf[short[]])
 also:

 :51: class HighlyCompressedMapStatus in package scheduler cannot be
 accessed in package org.apache.spark.scheduler
 [error]
 kryo.register(classOf[org.apache.spark.scheduler.HighlyCompressedMapStatus])


 Suggestions?

 Arun





Re: Global sequential access of elements in RDD

2015-02-27 Thread Imran Rashid
Why would you want to use spark to sequentially process your entire data
set?  The entire purpose is to let you do distributed processing -- which
means letting partitions get processed simultaneously by different cores /
nodes.

that being said, occasionally in a bigger pipeline with a lot of
distributed operations, you might need to do one segment in a completely
sequential manner.  You have a few options -- just be aware that with all
of them, you are working *around* the idea of an RDD, so make sure you have
a really good reason.

1) rdd.toLocalIterator.  Still pulls all of the data to the driver, just
like rdd.collect(), but its slightly more scalable since it won't store
*all* of the data in memory on the driver (it does still store all of the
data in one partition in memory, though.)

2) write the rdd to some external data storage (eg. hdfs), and then read
the data sequentially off of hdfs on your driver.  Still needs to pull all
of the data to the driver, but you can get it to avoid pulling an entire
partition into memory and make it streaming.

3) create a number of rdds that consist of just one partition of your
original rdd, and then execute actions on them sequentially:

val originalRDD = ... //this should be cached to make sure you don't
recompute it
(0 until originalRDD.partitions.size).foreach{partitionIdx =
  val prunedRdd = new PartitionPruningRDD(originalRDD, {x = x ==
partitionIdx})
  prunedRDD.runSomeActionHere()
}

note that PartitionPruningRDD is a developer api, however.  This will run
your action on one partition at a time, and ideally the tasks will be
scheduled on the same node where the partitions have been cached, so you
don't need to move the data around.  But again, b/c you're turning it into
a sequential program, most of your cluster is sitting idle, and your not
really leveraging spark ...


imran

On Fri, Feb 27, 2015 at 1:38 AM, Wush Wu w...@bridgewell.com wrote:

 Dear all,

 I want to implement some sequential algorithm on RDD.

 For example:

 val conf = new SparkConf()
   conf.setMaster(local[2]).
   setAppName(SequentialSuite)
 val sc = new SparkContext(conf)
 val rdd = sc.
parallelize(Array(1, 3, 2, 7, 1, 4, 2, 5, 1, 8, 9), 2).
sortBy(x = x, true)
 rdd.foreach(println)

 I want to see the ordered number on my screen, but it shows unordered
 integers. The two partitions execute the println simultaneously.

 How do I make the RDD execute a function globally sequential?

 Best,
 Wush



Re: NegativeArraySizeException when doing joins on skewed data

2015-02-26 Thread Imran Rashid
Hi Tristan,

at first I thought you were just hitting another instance of
https://issues.apache.org/jira/browse/SPARK-1391, but I actually think its
entirely related to kryo.  Would it be possible for you to try serializing
your object using kryo, without involving spark at all?  If you are
unfamiliar w/ kryo, you could just try something like this, it would also
be OK to try out the utils in spark to do it, something like:

val outputStream = new
FileOutputStream(/some/local/path/doesn't/really/matter/just/delete/me/afterwards)

val kryoSer = new org.apache.spark.serializer.KryoSerializer(sparkConf)
val kryoStreamSer = kryoSer.newInstance().serializeStream(outputStream)

kryoStreamSer.writeObject(yourBigObject).close()

My guess is that this will fail.  There is a little of spark's wrapping
code involved here too, but I suspect the error is out of our control.
From the error, it seems like whatever object you are trying to serialize
has more than 2B references:
Caused by: java.lang.NegativeArraySizeException
at
com.esotericsoftware.kryo.util.IdentityObjectIntMap.
resize(IdentityObjectIntMap.java:409)

Though that is rather surprising -- it doesn't even seem possible to me
with an object that is only 6 GB.

There are a handful of other size restrictions and tuning parameters that
come with kryo as well.  It would be good for us to write up some docs on
those limitations, as well as work with the kryo devs to see which ones can
be removed.  (Eg., another one that I just noticed from browsing the code
is that even when writing to a stream, kryo has an internal buffer of
limited size, which is periodically flushes.  Perhaps we can get kryo to
turn off that buffer, or we can at least get it to flush more often.)

thanks,
Imran


On Thu, Feb 26, 2015 at 1:06 AM, Tristan Blakers tris...@blackfrog.org
wrote:

 I get the same exception simply by doing a large broadcast of about 6GB.
 Note that I’m broadcasting a small number (~3m) of fat objects. There’s
 plenty of free RAM. This and related kryo exceptions seem to crop-up
 whenever an object graph of more than a couple of GB gets passed around.

 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585)

 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)

 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)

 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)

 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)

 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)

 at
 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)

 at
 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)

 at
 com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)

 at
 com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:86)

 at
 com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:17)

 at
 com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)

 at
 org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:128)

 at
 org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:202)

 at
 org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:101)

 at
 org.apache.spark.broadcast.TorrentBroadcast.init(TorrentBroadcast.scala:84)

 at
 org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)

 at
 org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29)

 at
 org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)

 at org.apache.spark.SparkContext.broadcast(SparkContext.scala:945)

 at
 org.apache.spark.api.java.JavaSparkContext.broadcast(JavaSparkContext.scala:623)


 Caused by: java.lang.NegativeArraySizeException

 at
 com.esotericsoftware.kryo.util.IdentityObjectIntMap.resize(IdentityObjectIntMap.java:409)

 at
 com.esotericsoftware.kryo.util.IdentityObjectIntMap.putStash(IdentityObjectIntMap.java:227)

 at
 com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntMap.java:221)

 at
 com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:117)

 at
 com.esotericsoftware.kryo.util.IdentityObjectIntMap.putStash(IdentityObjectIntMap.java:228)

 at
 com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntMap.java:221)

 at
 com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:117)

 at
 

Re: GroupByKey causing problem

2015-02-26 Thread Imran Rashid
Hi Tushar,

The most scalable option is probably for you to consider doing some
approximation.  Eg., sample the first to come up with the bucket
boundaries.  Then you can assign data points to buckets without needing to
do a full groupByKey.  You could even have more passes which corrects any
errors in your approximation (eg., see how sortByKey() works, and how it
samples the underlying RDD when constructing the RangePartitioner).  Though
its more passes through the data, it will probably be much faster since you
avoid the expensive groupByKey()

Imran

On Thu, Feb 26, 2015 at 3:38 AM, Tushar Sharma tushars...@gmail.com wrote:

 Hi,

 I am trying to apply binning to a large CSV dataset. Here are the steps I
 am taking:

 1. Emit each value of CSV as (ColIndex,(RowIndex,value))
 2. Then I groupByKey (here ColumnIndex) and get all values of a particular
 index to one node, as I have to work on the collection of all values
 3. I apply my binning algorithm which is as follows:
 a. Sort the values
 b. Iterate through values and see if it is different than the previous
 one
 if no then add it to the same bin
 if yes then check the size of that bin, if it is greater than a
 particular size (say 5% of wholedataset) then change the bin
 number, else keep the same bin
 c. repeat for each column

 Due to this algorithm I can't calculate it partition wise and merge for
 final result. But even for groupByKey I expect it should work , maybe
 slowly, but it should finish. I increased the partition to reduce the
 output of each groupByKey so that it helps in successful completion of the
 process. But even with that it is stuck at the same stage. The log for
 executor says:

 ExternalMapAppendOnly(splilling to disk) (Trying ...)

 The code works for small CSV files but can't complete for big files.

 val inputfile = hdfs://hm41:9000/user/file1
 val table = sc.textFile(inputfile,1000)

 val withoutHeader: RDD[String] = dropHeader(table)

 val kvPairs = withoutHeader.flatMap(retAtrTuple)

 //val filter_na = kvPairs.map{case (x,y) = (x,if(y == NA)  else y)}

 val isNum = kvPairs.map{case (x,y) = (x,isNumeric(y))}.reduceByKey(__)

 val numeric_indexes = isNum.filter{case (x,y) = y}.sortByKey().map{case
 (x,y) = x}.collect()
 //val isNum_Arr = isNum.sortByKey().collect()

 val kvidx = withoutHeader.zipWithIndex
 //val t = kvidx.map{case (a,b) = retAtrTuple(a).map(x =(x,b)) }


 val t = kvidx.flatMap{case (a,b) = retAtrTuple(a).map(x =(x,b)) }
 val t2 = t.filter{case (a,b) = numeric_indexes contains a._1 }

 //val t2 = t.filter{case (a,b) = a._1 ==0 }
 val t3 = t2.map{case ((a,b),c) = (a,(c,b.toDouble))}
 //val t4 = t3.sortBy(_._2._1)
 val t4 = t3.groupByKey.map{case (a,b) =
 (a,classing_summary(b.toArray.sortBy(_._2)))}

 def dropHeader(data: RDD[String]): RDD[String] = {
 data.mapPartitionsWithIndex((idx, lines) = {
   if (idx == 0) {
 lines.drop(1)
   }
   lines
 })
   }


   def retAtrTuple(x: String) = {
 val newX = x.split(',')
 for (h - 0 until newX.length)
   yield (h, newX(h))
   }

 def isNumeric(s: String): Boolean = {
 (allCatch opt s.toDouble).isDefined
   }

 def classing_summary(arr: Array[(Long, Double)]) = {
   var idx = 0L
   var value = 0.0
   var prevValue = Double.MinValue
   var counter = 1
   var classSize = 0.0
   var size = arr.length

   val output = for(i - 0 until arr.length) yield {
   idx = arr(i)._1;
   value = arr(i)._2;
   if(value==prevValue){
 classSize+=1.0/size;
 //println(both values same)
 //println(idx,value,classSize,counter,classSize);
 prevValue = value;
 (idx,value,counter,classSize);
   }
   else if(classSize(0.05)){
 classSize+=1.0/size;
 //println(both values not same, adding to present bucket)
 //println(idx,value,classSize,counter,classSize);
 prevValue = value;
 (idx,value,counter,classSize);
   }
   else {
 classSize = 1.0/size;
 counter +=1;
 //println(both values not same, adding to different bucket)
 //println(idx,value,classSize,counter,classSize);
 prevValue = value;
 (idx,value,counter,classSize);
   }
   }
   output.toArray
 }

 Thanks in advance,

 Tushar Sharma



Re: Cartesian issue with user defined objects

2015-02-26 Thread Imran Rashid
any chance your input RDD is being read from hdfs, and you are running into
this issue (in the docs on SparkContext#hadoopFile):

* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable
object for each
* record, directly caching the returned RDD or directly passing it to an
aggregation or shuffle
* operation will create many references to the same object.
* If you plan to directly cache, sort, or aggregate Hadoop writable
objects, you should first
* copy them using a `map` function.



On Thu, Feb 26, 2015 at 10:38 AM, mrk91 marcogaid...@gmail.com wrote:

 Hello,

 I have an issue with the cartesian method. When I use it with the Java
 types everything is ok, but when I use it with RDD made of objects defined
 by me it has very strage behaviors which depends on whether the RDD is
 cached or not (you can see here
 http://stackoverflow.com/questions/28727823/creating-a-matrix-of-neighbors-with-spark-cartesian-issue
 what happens).

 Is this due to a bug in its implementation or are there any requirements
 for the objects to be passed to it?
 Thanks.
 Best regards.
 Marco
 --
 View this message in context: Cartesian issue with user defined objects
 http://apache-spark-user-list.1001560.n3.nabble.com/Cartesian-issue-with-user-defined-objects-tp21826.html
 Sent from the Apache Spark User List mailing list archive
 http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.



Re: Iterating on RDDs

2015-02-26 Thread Imran Rashid
val grouped = R.groupBy[VertexId](G).persist(StorageLeve.MEMORY_ONLY_SER)
// or whatever persistence makes more sense for you ...
while(true) {
  val res = grouped.flatMap(F)
  res.collect.foreach(func)
  if(criteria)
 break
}

On Thu, Feb 26, 2015 at 10:56 AM, Vijayasarathy Kannan kvi...@vt.edu
wrote:

 Hi,

 I have the following use case.

 (1) I have an RDD of edges of a graph (say R).
 (2) do a groupBy on R (by say source vertex) and call a function F on each
 group.
 (3) collect the results from Fs and do some computation
 (4) repeat the above steps until some criteria is met

 In (2), the groups are always going to be the same (since R is grouped by
 source vertex).

 Question:
 Is R distributed every iteration (when in (2)) or is it distributed only
 once when it is created?

 A sample code snippet is below.

 while(true) {
   val res = R.groupBy[VertexId](G).flatMap(F)
   res.collect.foreach(func)
   if(criteria)
  break
 }

 Since the groups remain the same, what is the best way to go about
 implementing the above logic?



Re: Help me understand the partition, parallelism in Spark

2015-02-26 Thread Imran Rashid
Hi Yong,

mostly correct except for:


- Since we are doing reduceByKey, shuffling will happen. Data will be
shuffled into 1000 partitions, as we have 1000 unique keys.

 no, you will not get 1000 partitions.  Spark has to decide how many
partitions to use before it even knows how many unique keys there are.  If
you have 200 as the default parallelism (or you just explicitly make it the
second parameter to reduceByKey()), then you will get 200 partitions.  The
1000 unique keys will be distributed across the 200 partitions.  ideally
they will be distributed pretty equally, but how they get distributed
depends on the partitioner (by default you will have a HashPartitioner, so
it depends on the hash of your keys).

Note that this is more or less the same as in Hadoop MapReduce.

the amount of parallelism matters b/c there are various places in spark
where there is some overhead proportional to the size of a partition.  So
in your example, if you have 1000 unique keys in 200 partitions, you expect
about 5 unique keys per partitions -- if instead you had 10 partitions,
you'd expect 100 unique keys per partitions, and thus more data and you'd
be more likely to hit an OOM.  But there are many other possible sources of
OOM, so this is definitely not the *only* solution.

Sorry I can't comment in particular about Spark SQL -- hopefully somebody
more knowledgeable can comment on that.



On Wed, Feb 25, 2015 at 8:58 PM, java8964 java8...@hotmail.com wrote:

 Hi, Sparkers:

 I come from the Hadoop MapReducer world, and try to understand some
 internal information of spark. From the web and this list, I keep seeing
 people talking about increase the parallelism if you get the OOM error. I
 tried to read document as much as possible to understand the RDD partition,
 and parallelism usage in the spark.

 I understand that for RDD from HDFS, by default, one partition will be one
 HDFS block, pretty straightforward. I saw that lots of RDD operations
 support 2nd parameter of parallelism. This is the part confuse me. From my
 understand, the parallelism is totally controlled by how many cores you
 give to your job. Adjust that parameter, or spark.default.parallelism
 shouldn't have any impact.

 For example, if I have a 10G data in HDFS, and assume the block size is
 128M, so we get 100 blocks/partitions in RDD. Now if I transfer that RDD to
 a Pair RDD, with 1000 unique keys in the pair RDD, and doing reduceByKey
 action, using 200 as the default parallelism. Here is what I assume:


- We have 100 partitions, as the data comes from 100 blocks. Most
likely the spark will generate 100 tasks to read and shuffle them?
- The 1000 unique keys mean the 1000 reducer group, like in MR
- If I set the max core to be 50, so there will be up to 50 tasks can
be run concurrently. The rest tasks just have to wait for the core, if
there are 50 tasks are running.
- Since we are doing reduceByKey, shuffling will happen. Data will be
shuffled into 1000 partitions, as we have 1000 unique keys.
- I don't know these 1000 partitions will be processed by how many
tasks, maybe this is the parallelism parameter comes in?
- No matter what parallelism this will be, there are ONLY 50 task can
be run concurrently. So if we set more cores, more partitions' data will be
processed in the executor (which runs more thread in this case), so more
memory needs. I don't see how increasing parallelism could help the OOM in
this case.
- In my test case of Spark SQL, I gave 24G as the executor heap, my
join between 2 big datasets keeps getting OOM. I keep increasing the
spark.default.parallelism, from 200 to 400, to 2000, even to 4000, no
help. What really makes the query finish finally without OOM is after I
change the --total-executor-cores from 10 to 4.


 So my questions are:
 1) What is the parallelism really mean in the Spark? In the simple example
 above, for reduceByKey, what difference it is between parallelism change
 from 10 to 20?
 2) When we talk about partition in the spark, for the data coming from
 HDFS, I can understand the partition clearly. For the intermediate data,
 the partition will be same as key, right? For group, reducing, join action,
 uniqueness of the keys will be partition. Is that correct?
 3) Why increasing parallelism could help OOM? I don't get this part. From
 my limited experience, adjusting the core count really matters for memory.

 Thanks

 Yong



Re: How to tell if one RDD depends on another

2015-02-26 Thread Imran Rashid
no, it does not give you transitive dependencies.  You'd have to walk the
tree of dependencies yourself, but that should just be a few lines.

On Thu, Feb 26, 2015 at 3:32 PM, Corey Nolet cjno...@gmail.com wrote:

 I see the rdd.dependencies() function, does that include ALL the
 dependencies of an RDD? Is it safe to assume I can say
 rdd2.dependencies.contains(rdd1)?

 On Thu, Feb 26, 2015 at 4:28 PM, Corey Nolet cjno...@gmail.com wrote:

 Let's say I'm given 2 RDDs and told to store them in a sequence file and
 they have the following dependency:

 val rdd1 = sparkContext.sequenceFile().cache()
 val rdd2 = rdd1.map()


 How would I tell programmatically without being the one who built rdd1
 and rdd2 whether or not rdd2 depends on rdd1?

 I'm working on a concurrency model for my application and I won't
 necessarily know how the two rdds are constructed. What I will know is
 whether or not rdd1 is cached but i want to maximum concurrency and run
 rdd1 and rdd2 together if rdd2 does not depend on rdd1.





Re: Brodcast Variable updated from one transformation and used from another

2015-02-25 Thread Imran Rashid
Hi Yiannis,

Broadcast variables are meant for *immutable* data.  They are not meant for
data structures that you intend to update.  (It might *happen* to work when
running local mode, though I doubt it, and it would probably be a bug if it
did.  It will certainly not work when running on a cluster.)

This probably seems like a huge restriction, but its really fundamental to
spark's execution model.  B/c they are immutable, spark can make
optimizations around when  how the broadcast variable is shared.
Furthermore, its very important for having clearly defined semantics.  Eg.,
imagine that your broadcast variable was a hashmap.  What would the
eventual result be if task 1 updated key X to have value A, but task 2
updated key X to have value B?  How should the updates from each task be
combined together?

You have a few alternatives.  It really depends a lot on your use case
which one is right, their are a lot of factors to consider.

1) put your updates in another RDD, collect() it, update your variable on
the driver, rebroadcast it.  (least scalable)

2) use an accumulator to get the updates from each stage.  (maybe a bit
more efficient, b)

3) use some completely different mechanism for storing the data in your
broadcast var.  Eg., use a distributed key-value store.  Or put the data in
another RDD, which you join against your data.  (most scalable, but may not
be applicable at all.)

which one is right depends a lot on what you are trying to do.

Imran



On Wed, Feb 25, 2015 at 8:02 AM, Yiannis Gkoufas johngou...@gmail.com
wrote:

 What I think is happening that the map operations are executed
 concurrently and the map operation in rdd2 has the initial copy of
 myObjectBroadcated.
 Is there a way to apply the transformations sequentially? First
 materialize rdd1 and then rdd2.

 Thanks a lot!

 On 24 February 2015 at 18:49, Yiannis Gkoufas johngou...@gmail.com
 wrote:

 Sorry for the mistake, I actually have it this way:

 val myObject = new MyObject();
 val myObjectBroadcasted = sc.broadcast(myObject);

 val rdd1 = sc.textFile(/file1).map(e =
 {
  myObjectBroadcasted.value.insert(e._1);
  (e._1,1)
 });
 rdd.cache.count(); //to make sure it is transformed.

 val rdd2 = sc.textFile(/file2).map(e =
 {
  val lookedUp = myObjectBroadcasted.value.lookup(e._1);
  (e._1, lookedUp)
 });

 On 24 February 2015 at 17:36, Ganelin, Ilya ilya.gane...@capitalone.com
 wrote:

  You're not using the broadcasted variable within your map operations.
 You're attempting to modify myObjrct directly which won't work because you
 are modifying the serialized copy on the executor. You want to do
 myObjectBroadcasted.value.insert and myObjectBroadcasted.value.lookup.



 Sent with Good (www.good.com)



 -Original Message-
 *From: *Yiannis Gkoufas [johngou...@gmail.com]
 *Sent: *Tuesday, February 24, 2015 12:12 PM Eastern Standard Time
 *To: *user@spark.apache.org
 *Subject: *Brodcast Variable updated from one transformation and used
 from another

 Hi all,

 I am trying to do the following.

 val myObject = new MyObject();
 val myObjectBroadcasted = sc.broadcast(myObject);

 val rdd1 = sc.textFile(/file1).map(e =
 {
  myObject.insert(e._1);
  (e._1,1)
 });
 rdd.cache.count(); //to make sure it is transformed.

 val rdd2 = sc.textFile(/file2).map(e =
 {
  val lookedUp = myObject.lookup(e._1);
  (e._1, lookedUp)
 });

 When I check the contents of myObject within the map of rdd1 everything
 seems ok.
 On the other hand when I check the contents of myObject within the map
 of rdd2 it seems to be empty.
 I am doing something wrong?

 Thanks a lot!

 --

 The information contained in this e-mail is confidential and/or
 proprietary to Capital One and/or its affiliates. The information
 transmitted herewith is intended only for use by the individual or entity
 to which it is addressed.  If the reader of this message is not the
 intended recipient, you are hereby notified that any review,
 retransmission, dissemination, distribution, copying or other use of, or
 taking of any action in reliance upon this information is strictly
 prohibited. If you have received this communication in error, please
 contact the sender and delete the material from your computer.






Re: How to get yarn logs to display in the spark or yarn history-server?

2015-02-24 Thread Imran Rashid
the spark history server and the yarn history server are totally
independent.  Spark knows nothing about yarn logs, and vice versa, so
unfortunately there isn't any way to get all the info in one place.

On Tue, Feb 24, 2015 at 12:36 PM, Colin Kincaid Williams disc...@uw.edu
wrote:

 Looks like in my tired state, I didn't mention spark the whole time.
 However, it might be implied by the application log above. Spark log
 aggregation appears to be working, since I can run the yarn command above.
 I do have yarn logging setup for the yarn history server. I was trying to
 use the spark history-server, but maybe I should try setting

 spark.yarn.historyServer.address

 to the yarn history-server, instead of the spark history-server? I tried
 this configuration when I started, but didn't have much luck.

 Are you getting your spark apps run in yarn client or cluster mode in your
 yarn history server? If so can you share any spark settings?

 On Tue, Feb 24, 2015 at 8:48 AM, Christophe Préaud 
 christophe.pre...@kelkoo.com wrote:

 Hi Colin,

 Here is how I have configured my hadoop cluster to have yarn logs
 available through both the yarn CLI and the _yarn_ history server (with
 gzip compression and 10 days retention):

 1. Add the following properties in the yarn-site.xml on each node
 managers and on the resource manager:
   property
 nameyarn.log-aggregation-enable/name
 valuetrue/value
   /property
   property
 nameyarn.log-aggregation.retain-seconds/name
 value864000/value
   /property
   property
 nameyarn.log.server.url/name
 value
 http://dc1-kdp-dev-hadoop-03.dev.dc1.kelkoo.net:19888/jobhistory/logs
 /value
   /property
   property
 nameyarn.nodemanager.log-aggregation.compression-type/name
 valuegz/value
   /property

 2. Restart yarn and then start the yarn history server on the server
 defined in the yarn.log.server.url property above:

 /opt/hadoop/sbin/mr-jobhistory-daemon.sh stop historyserver # should fail
 if historyserver is not yet started
 /opt/hadoop/sbin/stop-yarn.sh
 /opt/hadoop/sbin/start-yarn.sh
 /opt/hadoop/sbin/mr-jobhistory-daemon.sh start historyserver


 It may be slightly different for you if the resource manager and the
 history server are not on the same machine.

 Hope it will work for you as well!
 Christophe.

 On 24/02/2015 06:31, Colin Kincaid Williams wrote:
  Hi,
 
  I have been trying to get my yarn logs to display in the spark
 history-server or yarn history-server. I can see the log information
 
 
  yarn logs -applicationId application_1424740955620_0009
  15/02/23 22:15:14 INFO client.ConfiguredRMFailoverProxyProvider:
 Failing over to us3sm2hbqa04r07-comp-prod-local
 
 
  Container: container_1424740955620_0009_01_02 on
 us3sm2hbqa07r07.comp.prod.local_8041
 
 ===
  LogType: stderr
  LogLength: 0
  Log Contents:
 
  LogType: stdout
  LogLength: 897
  Log Contents:
  [GC [PSYoungGen: 262656K-23808K(306176K)] 262656K-23880K(1005568K),
 0.0283450 secs] [Times: user=0.14 sys=0.03, real=0.03 secs]
  Heap
   PSYoungGen  total 306176K, used 111279K [0xeaa8,
 0x0001, 0x0001)
eden space 262656K, 33% used
 [0xeaa8,0xeffebbe0,0xfab0)
from space 43520K, 54% used
 [0xfab0,0xfc240320,0xfd58)
to   space 43520K, 0% used
 [0xfd58,0xfd58,0x0001)
   ParOldGen   total 699392K, used 72K [0xbff8,
 0xeaa8, 0xeaa8)
object space 699392K, 0% used
 [0xbff8,0xbff92010,0xeaa8)
   PSPermGen   total 35328K, used 34892K [0xbad8,
 0xbd00, 0xbff8)
object space 35328K, 98% used
 [0xbad8,0xbcf93088,0xbd00)
 
 
 
  Container: container_1424740955620_0009_01_03 on
 us3sm2hbqa09r09.comp.prod.local_8041
 
 ===
  LogType: stderr
  LogLength: 0
  Log Contents:
 
  LogType: stdout
  LogLength: 896
  Log Contents:
  [GC [PSYoungGen: 262656K-23725K(306176K)] 262656K-23797K(1005568K),
 0.0358650 secs] [Times: user=0.28 sys=0.04, real=0.04 secs]
  Heap
   PSYoungGen  total 306176K, used 65712K [0xeaa8,
 0x0001, 0x0001)
eden space 262656K, 15% used
 [0xeaa8,0xed380bf8,0xfab0)
from space 43520K, 54% used
 [0xfab0,0xfc22b4f8,0xfd58)
to   space 43520K, 0% used
 [0xfd58,0xfd58,0x0001)
   ParOldGen   total 699392K, used 72K [0xbff8,
 0xeaa8, 0xeaa8)
object space 699392K, 0% used
 [0xbff8,0xbff92010,0xeaa8)
   PSPermGen   total 29696K, used 29486K [0xbad8,
 

Re: sorting output of join operation

2015-02-23 Thread Imran Rashid
sortByKey() is the probably the easiest way:

import org.apache.spark.SparkContext._
joinedRdd.map{case(word, (file1Counts, file2Counts)) = (file1Counts,
(word, file1Counts, file2Counts))}.sortByKey()

On Mon, Feb 23, 2015 at 10:41 AM, Anupama Joshi anupama.jo...@gmail.com
wrote:

 Hi ,
  To simplify my problem -
 I have 2 files from which I reading words.
 the o/p is like
 file 1
 aaa 4
 bbb 6
 ddd 3

 file 2
 ddd 2
 bbb 6
 ttt 5

 if I do file1.join(file2)
 I get (ddd(3,2)
 bbb(6,6)

 If I want to sort the output by the number of occurances of the word i
 file1
 . How do I achive that.
 Any help would be appreciated.
 Thanks
 AJ



Re: Union and reduceByKey will trigger shuffle even same partition?

2015-02-23 Thread Imran Rashid
I think you're getting tripped up lazy evaluation and the way stage
boundaries work (admittedly its pretty confusing in this case).

It is true that up until recently, if you unioned two RDDs with the same
partitioner, the result did not have the same partitioner.  But that was
just fixed here:
https://github.com/apache/spark/pull/4629

That does mean that after you update ranks, it will no longer have a
partitioner, which will effect the join on your second iteration here:
 val contributions = links.join(ranks).flatMap

But, I think most of the shuffles you are pointing to are a different
issue.  I may be belaboring something you already know, but I think this is
easily confusing.  I think
 the first thing is understanding where you get stage boundaries, and how
they are named.  Each shuffle introduces a stage boundary.  However, the
stages get named by
the last thing in a stage, which is not really what is always causing the
shuffle.  Eg., reduceByKey() causes a shuffle, but we don't see that in a
stage name.  Similarly, map()
does not cause a shuffle, but we see a stage with that name.

So, what do the stage boundaries we see actually correspond to?

1) map -- that is doing the shuffle write for the following groupByKey
2) groupByKey -- in addition to reading the shuffle output from your map,
this is *also* doing the shuffle write for the next shuffle you introduce
w/ partitionBy
3) union -- this is doing the shuffle reading from your partitionBy, and
then all the work from there right up until the shuffle write for what is
immediatley after union -- your
 reduceByKey.
4) lookup is an action, which is why that has another stage.

a couple of things to note:
(a) your join does not cause a shuffle, b/c both rdds share a partitioner
(b) you have two shuffles from groupByKey followed by partitionBy -- you
really probably want the 1 arg form of groupByKey(partitioner)


hopefully this is helpful to understand how your stages  shuffles
correspond to your code.

Imran



On Mon, Feb 23, 2015 at 3:35 PM, Shuai Zheng szheng.c...@gmail.com wrote:

 This also trigger an interesting question:  how can I do this locally by
 code if I want. For example: I have RDD A and B, which has some partition,
 then if I want to join A to B, I might just want to do a mapper side join
 (although B itself might be big, but B’s local partition is known small
 enough put in memory), how can I access other RDD’s local partition in the 
 mapParitition
 method? Is it anyway to do this in Spark?



 *From:* Shao, Saisai [mailto:saisai.s...@intel.com]
 *Sent:* Monday, February 23, 2015 3:13 PM
 *To:* Shuai Zheng
 *Cc:* user@spark.apache.org
 *Subject:* RE: Union and reduceByKey will trigger shuffle even same
 partition?



 If you call reduceByKey(), internally Spark will introduce a shuffle
 operations, not matter the data is already partitioned locally, Spark
 itself do not know the data is already well partitioned.



 So if you want to avoid Shuffle, you have  to write the code explicitly to
 avoid this, from my understanding. You can call mapParitition to get a
 partition of data and reduce by key locally by your logic.



 Thanks

 Saisai



 *From:* Shuai Zheng [mailto:szheng.c...@gmail.com szheng.c...@gmail.com]

 *Sent:* Monday, February 23, 2015 12:00 PM
 *To:* user@spark.apache.org
 *Subject:* Union and reduceByKey will trigger shuffle even same partition?



 Hi All,



 I am running a simple page rank program, but it is slow. And I dig out
 part of reason is there is shuffle happen when I call an union action even
 both RDD share the same partition:



 Below is my test code in spark shell:



 import org.apache.spark.HashPartitioner



 sc.getConf.set(spark.serializer,
 org.apache.spark.serializer.KryoSerializer)

 val beta = 0.8

 val numOfPartition = 6

   val links =
 sc.textFile(c:/Download/web-Google.txt).filter(!_.contains(#)).map(line={val
 part=line.split(\t);
 (part(0).toInt,part(1).toInt)}).groupByKey.partitionBy(new
 HashPartitioner(numOfPartition)).persist

   var ranks = links.mapValues(_ = 1.0)

   var leakedMatrix = links.mapValues(_ = (1.0-beta)).persist



   for (i - 1 until 2) {

 val contributions = links.join(ranks).flatMap {

   case (pageId, (links, rank)) =

 links.map(dest = (dest, rank / links.size * beta))

 }

 *ranks = contributions.union(leakedMatrix).reduceByKey(_ + _)*

   }

   ranks.lookup(1)



 In above code, links will join ranks and should preserve the partition,
 and leakedMatrix also share the same partition, so I expect there is no
 shuffle happen on the contributions.union(leakedMatrix), also on the coming
 reduceByKey after that. But finally there is shuffle write for all steps,
 map, groupByKey, Union, partitionBy, etc.



 I expect there should only happen once on the shuffle then all should
 local operation, but the screen shows not, do I have any misunderstanding
 here?






Re: what does Submitting ... missing tasks from Stage mean?

2015-02-20 Thread Imran Rashid
yeah, this is just the totally normal message when spark executes
something.  The first time something is run, all of its tasks are
missing.  I would not worry about cases when all tasks aren't missing
if you're new to spark, its probably an advanced concept that you don't
care about.  (and would take me some time to explain :)

On Fri, Feb 20, 2015 at 8:20 AM, shahab shahab.mok...@gmail.com wrote:

 Hi,

 Probably this is silly question, but I couldn't find any clear
 documentation explaining why  one  should submitting... missing tasks from
 Stage ... in the logs?

 Specially in my case when I do not have any failure in job execution, I
 wonder why this should happen?
 Does it have any relation to lazy evaluation?

 best,
 /Shahab



Re: Unzipping large files and 2GB partition size.

2015-02-19 Thread Imran Rashid
Hi Joe,

The issue is not that you have input partitions that are bigger than 2GB --
its just that they are getting cached.  You can see in the stack trace, the
problem is when you try to read data out of the DiskStore:

org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132)

Also, just because you see this:

15/02/18 20:35:52 INFO scheduler.DAGScheduler: Submitting 100 missing tasks
from Stage 1 (MappedRDD[17] at mapToPair at NativeMethodAccessorImpl.java:
-2)

it doesn't *necessarily* mean that this is coming from your map.  It can be
pretty confusing how your operations on RDDs get turned into stages, it
could be a lot more than just your map.  and actually, it might not even be
your map at all -- some of the other operations you invoke call map
underneath the covers.  So its hard to say what is going on here w/ out
seeing more code.  Anyway, maybe you've already considered all this (you
did mention the lazy execution of the DAG), but I wanted to make sure.  it
might help to use rdd.setName() and also to look at rdd.toDebugString.

As far as what you can do about this -- it could be as simple as moving
your rdd.persist() to after you have compressed and repartitioned your
data.  eg., I'm blindly guessing you have something like this:

val rawData = sc.hadoopFile(...)
rawData.persist(DISK)
rawData.count()
val compressedData = rawData.map{...}
val repartitionedData = compressedData.repartition(N)
...

change it to something like:

val rawData = sc.hadoopFile(...)
val compressedData = rawData.map{...}
val repartitionedData = compressedData.repartition(N)
repartitionedData.persist(DISK)
repartitionedData.count()
...


The point is, you avoid caching any data until you have ensured that the
partitions are small.  You might have big partitions before that in
rawData, but that is OK.

Imran


On Thu, Feb 19, 2015 at 4:43 AM, Joe Wass jw...@crossref.org wrote:

 Thanks for your reply Sean.

 Looks like it's happening in a map:

 15/02/18 20:35:52 INFO scheduler.DAGScheduler: Submitting 100 missing
 tasks from Stage 1 (MappedRDD[17] at mapToPair at
 NativeMethodAccessorImpl.java:-2)

 That's my initial 'parse' stage, done before repartitioning. It reduces
 the data size significantly so I thought it would be sensible to do before
 repartitioning, which involves moving lots of data around. That might be a
 stupid idea in hindsight!

 So the obvious thing to try would be to try repartitioning before the map
 as the first transformation. I would have done that if I could be sure that
 it would succeed or fail quickly.

 I'm not entirely clear about the lazy execution of transformations in DAG.
 It could be that the error is manifesting during the mapToPair, but caused
 by the earlier read from text file stage.

 Thanks for pointers to those compression formats. I'll give them a go
 (although it's not trivial to re-encode 200 GB of data on S3, so if I can
 get this working reasonably with gzip I'd like to).

 Any advice about whether this error can be worked round with an early
 partition?

 Cheers

 Joe


 On 19 February 2015 at 09:51, Sean Owen so...@cloudera.com wrote:

 gzip and zip are not splittable compression formats; bzip and lzo are.
 Ideally, use a splittable compression format.

 Repartitioning is not a great solution since it means a shuffle,
 typically.

 This is not necessarily related to how big your partitions are. The
 question is, when does this happen? what operation?

 On Thu, Feb 19, 2015 at 9:35 AM, Joe Wass jw...@crossref.org wrote:
  On the advice of some recent discussions on this list, I thought I
 would try
  and consume gz files directly. I'm reading them, doing a preliminary
 map,
  then repartitioning, then doing normal spark things.
 
  As I understand it, zip files aren't readable in partitions because of
 the
  format, so I thought that repartitioning would be the next best thing
 for
  parallelism. I have about 200 files, some about 1GB compressed and some
 over
  2GB uncompressed.
 
  I'm hitting the 2GB maximum partition size. It's been discussed on this
 list
  (topic: 2GB limit for partitions?, tickets SPARK-1476 and SPARK-1391).
  Stack trace at the end. This happened at 10 hours in (probably when it
 saw
  its first file). I can't just re-run it quickly!
 
  Does anyone have any advice? Might I solve this by re-partitioning as
 the
  first step after reading the file(s)? Or is it effectively impossible to
  read a gz file that expands to over 2GB? Does anyone have any experience
  with this?
 
  Thanks in advance
 
  Joe
 
  Stack trace:
 
  Exception in thread main 15/02/18 20:44:25 INFO
 scheduler.TaskSetManager:
  Lost task 5.3 in stage 1.0 (TID 283) on executor:
  java.lang.IllegalArgumentException (Size exceeds Integer.MAX_VALUE)
  [duplicate 6]
  org.apache.spark.SparkException: Job aborted due to stage failure: Task
 2 in
  stage 1.0 failed 4 times, most recent failure: Lost task 2.3 in stage
 1.0:
  java.lang.IllegalArgumentException: Size exceeds 

Re: Unzipping large files and 2GB partition size.

2015-02-19 Thread Imran Rashid
oh, I think you are just choosing a number that is too small for your
number of partitions.  All of the data in /dir/to/gzfiles is going to be
sucked into one RDD, with the data divided into partitions.  So if you're
parsing 200 files, each about 2 GB, and then repartitioning down to 100
partitions, you would expect 4 GB per partition.  Though you're filtering
the data down some, there may also be some bloat from from your parsed
objects.  Also if you're not using kryo for serialization, I'd strongly
recommend that over the default serialization, and try to register all your
classes.

I think you can get some information about how much data is in your RDDs
from the UI -- but it might depend on what version you are running of
spark, plus I think the info isn't saved on failed stages, so you might
just need to monitor it in the UI as its happening (I am not 100% sure
about that ...)

So I'd suggest (a) using a lot more partitions (maybe 1k, given your data
size) (b) turn on kryo if you haven't already.



On Thu, Feb 19, 2015 at 9:36 AM, Joe Wass jw...@crossref.org wrote:

 Thanks for your detailed reply Imran. I'm writing this in Clojure (using
 Flambo which uses the Java API) but I don't think that's relevant. So
 here's the pseudocode (sorry I've not written Scala for a long time):

 val rawData = sc.hadoopFile(/dir/to/gzfiles) // NB multiple files.
 val parsedFiles = rawData.map(parseFunction)   // can return nil on failure
 val filtered = parsedFiles.filter(notNil)
 val partitioned = filtered.repartition(100) // guessed number
 val persisted = partitioned.persist(StorageLevels.DISK_ONLY)

 val resultA = stuffA(persisted)
 val resultB = stuffB(persisted)
 val resultC = stuffC(persisted)

 So, I think I'm already doing what you suggested. I would have assumed
 that partition size would be («size of expanded file» / «number of
 partitions»). In this case, 100 (which I picked out of the air).

 I wonder whether the «size of expanded file» is actually the size of all
 concatenated input files (probably about 800 GB)? In that case should I
 multiply it by the number of files? Or perhaps I'm barking up completely
 the wrong tree.

 Joe




 On 19 February 2015 at 14:44, Imran Rashid iras...@cloudera.com wrote:

 Hi Joe,

 The issue is not that you have input partitions that are bigger than 2GB
 -- its just that they are getting cached.  You can see in the stack trace,
 the problem is when you try to read data out of the DiskStore:

 org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132)

 Also, just because you see this:

 15/02/18 20:35:52 INFO scheduler.DAGScheduler: Submitting 100 missing
 tasks from Stage 1 (MappedRDD[17] at mapToPair at
 NativeMethodAccessorImpl.java:-2)

 it doesn't *necessarily* mean that this is coming from your map.  It can
 be pretty confusing how your operations on RDDs get turned into stages, it
 could be a lot more than just your map.  and actually, it might not even be
 your map at all -- some of the other operations you invoke call map
 underneath the covers.  So its hard to say what is going on here w/ out
 seeing more code.  Anyway, maybe you've already considered all this (you
 did mention the lazy execution of the DAG), but I wanted to make sure.  it
 might help to use rdd.setName() and also to look at rdd.toDebugString.

 As far as what you can do about this -- it could be as simple as moving
 your rdd.persist() to after you have compressed and repartitioned your
 data.  eg., I'm blindly guessing you have something like this:

 val rawData = sc.hadoopFile(...)
 rawData.persist(DISK)
 rawData.count()
 val compressedData = rawData.map{...}
 val repartitionedData = compressedData.repartition(N)
 ...

 change it to something like:

 val rawData = sc.hadoopFile(...)
 val compressedData = rawData.map{...}
 val repartitionedData = compressedData.repartition(N)
 repartitionedData.persist(DISK)
 repartitionedData.count()
 ...


 The point is, you avoid caching any data until you have ensured that the
 partitions are small.  You might have big partitions before that in
 rawData, but that is OK.

 Imran


 On Thu, Feb 19, 2015 at 4:43 AM, Joe Wass jw...@crossref.org wrote:

 Thanks for your reply Sean.

 Looks like it's happening in a map:

 15/02/18 20:35:52 INFO scheduler.DAGScheduler: Submitting 100 missing
 tasks from Stage 1 (MappedRDD[17] at mapToPair at
 NativeMethodAccessorImpl.java:-2)

 That's my initial 'parse' stage, done before repartitioning. It reduces
 the data size significantly so I thought it would be sensible to do before
 repartitioning, which involves moving lots of data around. That might be a
 stupid idea in hindsight!

 So the obvious thing to try would be to try repartitioning before the
 map as the first transformation. I would have done that if I could be sure
 that it would succeed or fail quickly.

 I'm not entirely clear about the lazy execution of transformations in
 DAG. It could be that the error is manifesting during the mapToPair

Re: Some tasks taking too much time to complete in a stage

2015-02-19 Thread Imran Rashid
almost all your data is going to one task.  You can see that the shuffle
read for task 0 is 153.3 KB, and for most other tasks its just 26B (which
is probably just some header saying there are no actual records).  You need
to ensure your data is more evenly distributed before this step.

On Thu, Feb 19, 2015 at 10:53 AM, jatinpreet jatinpr...@gmail.com wrote:

 Hi,

 I am running Spark 1.2.1 for compute intensive jobs comprising of multiple
 tasks. I have observed that most tasks complete very quickly, but there are
 always one or two tasks that take a lot of time to complete thereby
 increasing the overall stage time. What could be the reason for this?

 Following are the statistics for one such stage. As you can see, the task
 with index 0 takes 1.1 minutes whereas others completed much more quickly.

 Aggregated Metrics by Executor
 Executor ID Address Task Time   Total Tasks Failed
 TasksSucceeded Tasks
 Input   Output  Shuffle ReadShuffle Write   Shuffle Spill (Memory)
 Shuffle
 Spill (Disk)
 0   slave1:5631146 s13  0   13  0.0 B   0.0 B
  0.0 B   0.0 B   0.0 B   0.0 B
 1   slave2:426482.1 min 13  0   13  0.0 B
  0.0 B   384.3 KB0.0 B   0.0 B
 0.0 B
 2   slave3:4432223 s12  0   12  0.0 B   0.0 B
  136.4 KB0.0 B   0.0 B   0.0
 B
 3   slave4:3798744 s12  0   12  0.0 B   0.0 B
  213.9 KB0.0 B   0.0 B   0.0
 B
 Tasks
 Index   ID  Attempt Status  Locality Level  Executor ID / Host
 Launch Time
 DurationGC Time Shuffle ReadErrors
 0   213 0   SUCCESS PROCESS_LOCAL   1 / slave2
 2015/02/19 11:40:05 1.1 min
 1 s 153.3 KB
 5   218 0   SUCCESS PROCESS_LOCAL   3 / slave4
 2015/02/19 11:40:05 23 ms
 26.0 B
 1   214 0   SUCCESS PROCESS_LOCAL   3 / slave4
 2015/02/19 11:40:05 2 s 0.9
 s   13.8 KB
 4   217 0   SUCCESS PROCESS_LOCAL   1 / slave2
 2015/02/19 11:40:05 26 ms
 26.0 B
 3   216 0   SUCCESS PROCESS_LOCAL   0 / slave1
 2015/02/19 11:40:05 11 ms
 0.0 B
 2   215 0   SUCCESS PROCESS_LOCAL   2 / slave3
 2015/02/19 11:40:05 27 ms
 26.0 B
 7   220 0   SUCCESS PROCESS_LOCAL   0 / slave1
 2015/02/19 11:40:05 11 ms
 0.0 B
 10  223 0   SUCCESS PROCESS_LOCAL   2 / slave3
 2015/02/19 11:40:05 23 ms
 26.0 B
 6   219 0   SUCCESS PROCESS_LOCAL   2 / slave3
 2015/02/19 11:40:05 23 ms
 26.0 B
 9   222 0   SUCCESS PROCESS_LOCAL   3 / slave4
 2015/02/19 11:40:05 23 ms
 26.0 B
 8   221 0   SUCCESS PROCESS_LOCAL   1 / slave2
 2015/02/19 11:40:05 23 ms
 26.0 B
 11  224 0   SUCCESS PROCESS_LOCAL   0 / slave1
 2015/02/19 11:40:05 10 ms
 0.0 B
 14  227 0   SUCCESS PROCESS_LOCAL   2 / slave3
 2015/02/19 11:40:05 24 ms
 26.0 B
 13  226 0   SUCCESS PROCESS_LOCAL   3 / slave4
 2015/02/19 11:40:05 23 ms
 26.0 B
 16  229 0   SUCCESS PROCESS_LOCAL   1 / slave2
 2015/02/19 11:40:05 22 ms
 26.0 B
 12  225 0   SUCCESS PROCESS_LOCAL   1 / slave2
 2015/02/19 11:40:05 22 ms
 26.0 B
 15  228 0   SUCCESS PROCESS_LOCAL   0 / slave1
 2015/02/19 11:40:05 10 ms
 0.0 B
 17  230 0   SUCCESS PROCESS_LOCAL   3 / slave4
 2015/02/19 11:40:05 22 ms
 26.0 B
 23  236 0   SUCCESS PROCESS_LOCAL   0 / slave1
 2015/02/19 11:40:05 10 ms
 0.0 B
 22  235 0   SUCCESS PROCESS_LOCAL   2 / slave3
 2015/02/19 11:40:05 21 ms
 26.0 B
 19  232 0   SUCCESS PROCESS_LOCAL   0 / slave1
 2015/02/19 11:40:05 10 ms
 0.0 B
 21  234 0   SUCCESS PROCESS_LOCAL   3 / slave4
 2015/02/19 11:40:05 25 ms
 26.0 B
 18  231 0   SUCCESS PROCESS_LOCAL   2 / slave3
 2015/02/19 11:40:05 24 ms
 26.0 B
 20  233 0   SUCCESS PROCESS_LOCAL   1 / slave2
 2015/02/19 11:40:05 28 ms
 26.0 B
 25  238 0   SUCCESS PROCESS_LOCAL   3 / slave4
 2015/02/19 11:40:05 20 ms
 26.0 B
 28  241 0   SUCCESS PROCESS_LOCAL   1 / slave2
 2015/02/19 11:40:05 27 ms
 26.0 B
 27  240 0   SUCCESS PROCESS_LOCAL   0 / slave1
 2015/02/19 11:40:05 10 ms
 0.0 B


 Thanks



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Some-tasks-taking-too-much-time-to-complete-in-a-stage-tp21724.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Incorrect number of records after left outer join (I think)

2015-02-19 Thread Imran Rashid
if you have duplicate values for a key, join creates all pairs.  Eg. if you
2 values for key X in rdd A  2 values for key X in rdd B, then a.join(B)
will have 4 records for key X

On Thu, Feb 19, 2015 at 3:39 PM, Darin McBeath ddmcbe...@yahoo.com.invalid
wrote:

 Consider the following left outer join

 potentialDailyModificationsRDD =
 reducedDailyPairRDD.leftOuterJoin(baselinePairRDD).partitionBy(new
 HashPartitioner(1024)).persist(StorageLevel.MEMORY_AND_DISK_SER());


 Below are the record counts for the RDDs involved
 Number of records for reducedDailyPairRDD: 2565206
 Number of records for baselinePairRDD: 56102812
 Number of records for potentialDailyModificationsRDD: 2570115

 Below are the partitioners for the RDDs involved.
 Partitioner for reducedDailyPairRDD:
 Some(org.apache.spark.HashPartitioner@400)
 Partitioner for baselinePairRDD: Some(org.apache.spark.HashPartitioner@400
 )
 Partitioner for potentialDailyModificationsRDD:
 Some(org.apache.spark.HashPartitioner@400)


 I realize in the above statement that the .partitionBy is probably not
 needed as the underlying RDDs used in the left outer join are already hash
 partitioned.

 My question is how the resulting RDD (potentialDailyModificationsRDD) can
 end up with more records than
 reducedDailyPairRDD.  I would think the number of records in
 potentialDailyModificationsRDD should be 2565206 instead of 2570115.  Am I
 missing something or is this possibly a bug?

 I'm using Apache Spark 1.2 on a stand-alone cluster on ec2.  To get the
 counts for the records, I'm using the .count() for the RDD.

 Thanks.

 Darin.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Filter data from one RDD based on data from another RDD

2015-02-19 Thread Imran Rashid
the more scalable alternative is to do a join (or a variant like cogroup,
leftOuterJoin, subtractByKey etc. found in PairRDDFunctions)

the downside is this requires a shuffle of both your RDDs

On Thu, Feb 19, 2015 at 3:36 PM, Himanish Kushary himan...@gmail.com
wrote:

 Hi,

 I have two RDD's with csv data as below :

 RDD-1

 101970_5854301840,fbcf5485-e696-4100-9468-a17ec7c5bb43,19229261643
 101970_5854301839,fbaf5485-e696-4100-9468-a17ec7c5bb39,9229261645
 101970_5854301839,fbbf5485-e696-4100-9468-a17ec7c5bb39,9229261647
 101970_17038953,546853f9-cf07-4700-b202-00f21e7c56d8,791191603
 101970_5854301840,fbcf5485-e696-4100-9468-a17ec7c5bb42,19229261643
 101970_5851048323,218f5485-e58c-4200-a473-348ddb858578,290542385
 101970_5854301839,fbcf5485-e696-4100-9468-a17ec7c5bb41,922926164

 RDD-2

 101970_17038953,546853f9-cf07-4700-b202-00f21e7c56d9,7911160
 101970_5851048323,218f5485-e58c-4200-a473-348ddb858578,2954238
 101970_5854301839,fbaf5485-e696-4100-9468-a17ec7c5bb39,9226164
 101970_5854301839,fbbf5485-e696-4100-9468-a17ec7c5bb39,92292164
 101970_5854301839,fbcf5485-e696-4100-9468-a17ec7c5bb41,9226164

 101970_5854301838,fbcf5485-e696-4100-9468-a17ec7c5bb40,929164
 101970_5854301838,fbcf5485-e696-4100-9468-a17ec7c5bb39,26164

 I need to filter RDD-2 to include only those records where the first
 column value in RDD-2 matches any of the first column values in RDD-1

 Currently , I am broadcasting the first column values from RDD-1 as a list
 and then filtering RDD-2 based on that list.

 val rdd1broadcast = sc.broadcast(rdd1.map { uu = uu.split(,)(0) 
 }.collect().toSet)

 val rdd2filtered = rdd2.filter{ h = 
 rdd1broadcast.value.contains(h.split(,)(0)) }

 This will result in data with first column 101970_5854301838 (last two 
 records) to be filtered out from RDD-2.

 Is this is the best way to accomplish this ? I am worried that for large data 
 volume , the broadcast step may become an issue. Appreciate any other 
 suggestion.

 ---
 Thanks
 Himanish



Re: Failure on a Pipe operation

2015-02-19 Thread Imran Rashid
The error msg is telling you the exact problem, it can't find
ProgramSIM, the thing you are trying to run

Lost task 3520.3 in stage 0.0 (TID 11, compute3.research.dev):
java.io.IOException: Cannot run program ProgramSIM: error=2, No s\
uch file or directory


On Thu, Feb 19, 2015 at 5:52 PM, athing goingon athinggoin...@gmail.com
wrote:

 Hi, I'm trying to figure out why the following job is failing on a pipe
 http://pastebin.com/raw.php?i=U5E8YiNN

 With this exception:
 http://pastebin.com/raw.php?i=07NTGyPP

 Any help is welcome. Thank you.



Re: Magic number 16: Why doesn't Spark Streaming process more than 16 files?

2015-02-18 Thread Imran Rashid
so if you only change this line:

https://gist.github.com/emres/0fb6de128baea099e741#file-mymoduledriver-java-L137

to

json.print()

it processes 16 files instead?  I am totally perplexed.  My only
suggestions to help debug are
(1) see what happens when you get rid of MyModuleWorker completely --
change MyModuleDriver#process to just
inStream.print()
and see what happens

(2) stick a bunch of printlns into MyModuleWorker#call

(3) turn on DEBUG logging
for org.apache.spark.streaming.dstream.FileInputDStream

my gut instinct is that something else is flaky about the file input stream
(eg., it makes some assumption about the file system which maybe aren't
valid in your case, it has a bunch of caveats), and that it has just
happened to work sometimes with your foreachRdd and failed sometimes with
print.

Sorry I am not a lot of help in this case, hope this leads you down the
right track or somebody else can help out.

Imran


On Wed, Feb 18, 2015 at 2:28 AM, Emre Sevinc emre.sev...@gmail.com wrote:

 Hello Imran,

 (a) I know that all 20 files are processed when I use foreachRDD, because
 I can see the processed files in the output directory. (My application
 logic writes them to an output directory after they are processed, *but*
 that writing operation does not happen in foreachRDD, below you can see the
 URL that includes my code and clarifies this).

 (b) I know only 16 files are processed because in the output directory I
 see only 16 files processed. I wait for minutes and minutes and no more
 files appear in the output directory. When I see only 16 files are
 processed and Spark Streaming went to the mode of idly watching the input
 directory, and then if I copy a few more files, they are also processed.

 (c) Sure, you can see part of my code in the following gist:
 https://gist.github.com/emres/0fb6de128baea099e741
  It might seem a little convoluted at first, because my application is
 divided into two classes, a Driver class (setting up things and
 initializing them), and a Worker class (that implements the core
 functionality). I've also put the relevant methods from the my utility
 classes for completeness.

 I am as perplexed as you are as to why forcing the output via foreachRDD
 ended up in different behaviour compared to simply using print() method.

 Kind regards,
 Emre



 On Tue, Feb 17, 2015 at 4:23 PM, Imran Rashid iras...@cloudera.com
 wrote:

 Hi Emre,

 there shouldn't be any difference in which files get processed w/ print()
 vs. foreachRDD().  In fact, if you look at the definition of print(), it is
 just calling foreachRDD() underneath.  So there is something else going on
 here.

 We need a little more information to figure out exactly what is going on.
  (I think Sean was getting at the same thing ...)

 (a) how do you know that when you use foreachRDD, all 20 files get
 processed?

 (b) How do you know that only 16 files get processed when you print()? Do
 you know the other files are being skipped, or maybe they are just stuck
 somewhere?  eg., suppose you start w/ 20 files, and you see 16 get
 processed ... what happens after you add a few more files to the
 directory?  Are they processed immediately, or are they never processed
 either?

 (c) Can you share any more code of what you are doing to the dstreams
 *before* the print() / foreachRDD()?  That might give us more details about
 what the difference is.

 I can't see how .count.println() would be different than just println(),
 but maybe I am missing something also.

 Imran

 On Mon, Feb 16, 2015 at 7:49 AM, Emre Sevinc emre.sev...@gmail.com
 wrote:

 Sean,

 In this case, I've been testing the code on my local machine and using
 Spark locally, so I all the log output was available on my terminal. And
 I've used the .print() method to have an output operation, just to force
 Spark execute.

 And I was not using foreachRDD, I was only using print() method on a
 JavaDStream object, and it was working fine for a few files, up to 16 (and
 without print() it did not do anything because there were no output
 operations).

 To sum it up, in my case:

  - Initially, use .print() and no foreachRDD: processes up to 16 files
 and does not do anything for the remaining 4.
  - Remove .print() and use foreachRDD: processes all of the 20 files.

 Maybe, as in Akhil Das's suggestion, using .count.print() might also
 have fixed my problem, but I'm satisfied with foreachRDD approach for now.
 (Though it is still a mystery to me why using .print() had a difference,
 maybe my mental model of Spark is wrong, I thought no matter what output
 operation I used, the number of files processed by Spark would be
 independent of that because the processing is done in a different method,
 .print() is only used to force Spark execute that processing, am I wrong?).

 --
 Emre


 On Mon, Feb 16, 2015 at 2:26 PM, Sean Owen so...@cloudera.com wrote:

 Materialization shouldn't be relevant. The collect by itself doesn't
 let you detect whether

Re: OutOfMemory and GC limits (TODO) Error in map after self-join

2015-02-18 Thread Imran Rashid
Hi Tom,

there are a couple of things you can do here to make this more efficient.
 first, I think you can replace your self-join with a groupByKey. on your
example data set, this would give you

(1, Iterable(2,3))
(4, Iterable(3))

this reduces the amount of data that needs to be shuffled, and that way you
can produce all of your pairs just from the Iterable(2,3).

second, if you expect the same pairs to appear many times in your dataset,
you might first want to replace them with a count.  eg., if you start with

(1,2)
(1,2)
(1,2)
...
(1,2)
(1,3)
(1,3)
(4,3)
...

you might want to first convert that to get a count of each pair

val pairCounts = rdd.map{x = (x,1)}.reduceByKey{_ + _}

to give you something like:

((1,2), 145)
((1,3), 2)
((4,3), 982)
...

and then with a little more massaging you can group by key and also keep
the counts of each item:

val groupedCounts: RDD[(Int, Iterable[(Int,Int)])] =
pairCounts.map{case((key, value), counts) =
  key - (value,counts)
}.groupByKey

which would give you something like

(1, Iterable((2,145), (3, 2))
(4, Iterable((3, 982))


hope this helps
Imran

On Wed, Feb 18, 2015 at 1:43 AM, Tom Walwyn twal...@gmail.com wrote:

 Thanks for the reply, I'll try your suggestions.

 Apologies, in my previous post I was mistaken. rdd is actually an PairRDD
 of (Int, Int). I'm doing the self-join so I can count two things. First, I
 can count the number of times a value appears in the data set. Second I can
 count number of times values occur with the same key. For example, if I
 have the following:

 (1,2)
 (1,3)
 (4,3)

 Then joining with itself I get:

 (1,(2,2)) - map - ((2,2),1) - reduceByKey - ((2,2),1)
 (1,(2,3)) - map - ((2,3),1) - reduceByKey - ((2,3),1)
 (1,(3,2)) - map - ((3,2),1) - reduceByKey - ((3,2),1)
 (1,(3,3)) - map - ((3,3),1) - reduceByKey - ((3,3),2)
 (4,(3,3)) - map - ((3,3),1) _|

 Note that I want to keep the duplicates (2,2) and reflections.

 Rgds

 On 18 February 2015 at 09:00, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Why are you joining the rdd with itself?

 You can try these things:

 - Change the StorageLevel of both rdds to MEMORY_AND_DISK_2 or
 MEMORY_AND_DISK_SER, so that it doesnt need to keep everything up in memory.

 - Set your default Serializer to Kryo (.set(spark.serializer,
 org.apache.spark.serializer.KryoSerializer))

 - Enable rdd compression (.set(spark.rdd.compress,true))


 Thanks
 Best Regards

 On Wed, Feb 18, 2015 at 12:21 PM, Tom Walwyn twal...@gmail.com wrote:

 Hi All,

 I'm a new Spark (and Hadoop) user and I want to find out if the cluster
 resources I am using are feasible for my use-case. The following is a
 snippet of code that is causing a OOM exception in the executor after about
 125/1000 tasks during the map stage.

  val rdd2 = rdd.join(rdd, numPartitions=1000)
  .map(fp=((fp._2._1, fp._2._2), 1))
  .reduceByKey((x,y)=x+y)
  rdd2.count()

 Which errors with a stack trace like:

  15/02/17 16:30:11 ERROR executor.Executor: Exception in task 98.0 in
 stage 2.0 (TID 498)
  java.lang.OutOfMemoryError: GC overhead limit exceeded
  at
 scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:168)
  at
 scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:45)
  at
 scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
  at
 scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
  at scala.collection.immutable.List.foreach(List.scala:318)

 rdd is a PairRDD of (Int, (Int, Int)). The idea is to get the count of
 co-occuring values by key in the dataset, i.e. 'These two numbers occurred
 with the same key n times'. I intentionally don't want to filter out
 duplicates and reflections. rdd is about 3.6 million records, which has a
 size in memory of about 120MB, and results in a 'joined' RDD (before the
 reduceByKey stage) of around 460 million records, with a size in memory of
 about 35GB.

 My cluster setup is as follows. I have 3 nodes, where each node has 2
 cores and about 7.5GB of memory. I'm running Spark on YARN. The driver and
 executors are allowed 1280m each and the job has 5 executors and 1 driver.
 Additionally, I have set spark.storage.memoryFraction to 0.06, and
 spark.shuffle.memoryFraction to 0.65 in the hopes that this would mitigate
 the issue. I've also tried increasing the number of partitions after the
 join dramatically (up to 15000). Nothing has been effective. Thus, I'm
 beginning to suspect I don't have enough resources for the job.

 Does anyone have a feeling about what the resource requirements would be
 for a use-case like this? I could scale the cluster up if necessary, but
 would like to avoid it. I'm willing to accept longer computation times if
 that is an option.

 Warm Regards,
 Thomas






Re: MapValues and Shuffle Reads

2015-02-17 Thread Imran Rashid
Hi Darrin,

You are asking for something near  dear to me:
https://issues.apache.org/jira/browse/SPARK-1061

There is a PR attached there as well.  Note that you could do everything in
that PR in your own user code, you don't need to wait for it to get merged,
*except* for the change to HadoopRDD so that it sorts the input partitions.
 (Though of course, you could always just have your implementation of
HadoopRDD as well ...)

you could also vote for the issue  watch it as well to encourage some
progress on it :)

On Tue, Feb 17, 2015 at 2:56 PM, Darin McBeath ddmcbe...@yahoo.com wrote:

 Thanks Imran.

 I think you are probably correct.  I was a bit surprised that there was no
 shuffle read in the initial hash partition step.  I will adjust the code as
 you suggest to prove that is the case.

 I have a slightly different question.  If I save an RDD to S3 (or some 
 equivalent)
 and this RDD was hash partitioned at the time, do I still need to hash
 partition the RDD again when I read it in?  Is there a way that I could
 prevent all of the shuffling (such as providing a hint)?  My parts for the
 RDD will be gzipped so they would not be splittable).  In reality, that's
 what I would really want to do in the first place.

 Thanks again for your insights.

 Darin.

   --
  *From:* Imran Rashid iras...@cloudera.com
 *To:* Darin McBeath ddmcbe...@yahoo.com
 *Cc:* User user@spark.apache.org
 *Sent:* Tuesday, February 17, 2015 3:29 PM
 *Subject:* Re: MapValues and Shuffle Reads

 Hi Darin,

 When you say you see 400GB of shuffle writes from the first code
 snippet, what do you mean?  There is no action in that first set, so it
 won't do anything.  By itself, it won't do any shuffle writing, or anything
 else for that matter.

 Most likely, the .count() on your second code snippet is actually causing
 the execution of some of the first snippet as well.  The .partitionBy will
 result in both shuffle writes and shuffle reads, but they aren't set in
 motion until the .count further down the line.  Its confusing b/c the stage
 boundaries don't line up exactly with your RDD variables here.  
 hsfBaselinePairRDD
 spans 2 stages, and baselinePairRDD actually gets merged into the stage
 above it.

 If you do a hsfBaselinePairRDD.count after your first code snippet, and
 then run the second code snippet afterwards, is it more like what you
 expect?

 Imran



 On Tue, Feb 17, 2015 at 1:52 PM, Darin McBeath 
 ddmcbe...@yahoo.com.invalid wrote:

 In the following code, I read in a large sequence file from S3 (1TB)
 spread across 1024 partitions.  When I look at the job/stage summary, I see
 about 400GB of shuffle writes which seems to make sense as I'm doing a hash
 partition on this file.

 // Get the baseline input file
 JavaPairRDDText,Text hsfBaselinePairRDDReadable =
 sc.hadoopFile(baselineInputBucketFile, SequenceFileInputFormat.class,
 Text.class, Text.class);

 JavaPairRDDString, String hsfBaselinePairRDD =
 hsfBaselinePairRDDReadable.mapToPair(new
 ConvertFromWritableTypes()).partitionBy(new
 HashPartitioner(Variables.NUM_OUTPUT_PARTITIONS)).persist(StorageLevel.MEMORY_AND_DISK_SER());

 I then execute the following code (with a count to force execution) and
 what I find very strange is that when I look at the job/stage summary, I
 see more than 340GB of shuffle read.  Why would there be any shuffle read
 in this step?  I would expect there to be little (if any) shuffle reads in
 this step.

 // Use 'substring' to extract the epoch value from each record.
 JavaPairRDDString, Long baselinePairRDD =
 hsfBaselinePairRDD.mapValues(new
 ExtractEpoch(accumBadBaselineRecords)).persist(StorageLevel.MEMORY_AND_DISK_SER());

 log.info(Number of baseline records:  + baselinePairRDD.count());

 Both hsfBaselinePairRDD and baselinePairRDD have 1024 partitions.

 Any insights would be appreciated.

 I'm using Spark 1.2.0 in a stand-alone cluster.


 Darin.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org







Re: How do you get the partitioner for an RDD in Java?

2015-02-17 Thread Imran Rashid
a JavaRDD is just a wrapper around a normal RDD defined in scala, which is
stored in the rdd field.  You can access everything that way.  The
JavaRDD wrappers just provide some interfaces that are a bit easier to work
with in Java.

If this is at all convincing, here's me demonstrating it inside the
spark-shell (yes its scala, but I'm using the java api)

scala val jsc = new JavaSparkContext(sc)
 jsc: org.apache.spark.api.java.JavaSparkContext =
 org.apache.spark.api.java.JavaSparkContext@7d365529



scala val data = jsc.parallelize(java.util.Arrays.asList(Array(a, b,
 c)))
 data: org.apache.spark.api.java.JavaRDD[Array[String]] =
 ParallelCollectionRDD[0] at parallelize at console:15



scala data.rdd.partitioner
 res0: Option[org.apache.spark.Partitioner] = None


On Tue, Feb 17, 2015 at 3:44 PM, Darin McBeath ddmcbe...@yahoo.com.invalid
wrote:

 In an 'early release' of the Learning Spark book, there is the following
 reference:

 In Scala and Java, you can determine how an RDD is partitioned using its
 partitioner property (or partitioner() method in Java)

 However, I don't see the mentioned 'partitioner()' method in Spark 1.2 or
 a way of getting this information.

 I'm curious if anyone has any suggestions for how I might go about finding
 how an RDD is partitioned in a Java program.

 Thanks.

 Darin.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Percentile example

2015-02-17 Thread Imran Rashid
(trying to repost to the list w/out URLs -- rejected as spam earlier)

Hi,

Using take() is not a good idea, as you have noted it will pull a lot of
data down to the driver so its not scalable.  Here are some more scalable
alternatives:

1. Approximate solutions

1a. Sample the data.  Just sample some of the data to the driver, sort that
data in memory, and take the 66th percentile of that sample.

1b.  Make a histogram with pre-determined buckets.  Eg., if you know your
data ranges from 0 to 1 and is uniform-ish, you could make buckets every
0.01.  Then count how many data points go into each bucket.  Or if you only
care about relative error and you have integers (often the case if your
data is counts), then you can span the full range of integers with a
relatively small number of buckets.  Eg., you only need 200 buckets for 5%
error.  See the Histogram class in twitter's Ostrich library

The problem is, if you have no idea what the distribution of your data is,
its very hard to come up with good buckets; you could have an arbitrary
amount of data going to one bucket, and thus tons of error.

1c.  Use a TDigest , a compact  scalable data structure for approximating
distributions, and performs reasonably across a wide range of
distributions.  You would make one TDigest for each partition (with
mapPartitions), and then merge all of the TDigests together.  I wrote up a
little more detail on this earlier, you can search the spark-user on nabble
for tdigest

2. Exact solutions.  There are also a few options here, but I'll give one
that is a variant of what you suggested.  Start out by doing a sortByKey.
Then figure out how many records you have in each partitions (with
mapPartitions).  Figure out which partition the 66th percentile would be
in.  Then just read the one partition you want, and go down to the Nth
record in that partition.

To read the one partition you want, you can either (a) use
mapPartitionsWithIndex, and just ignore every partition that isnt' the one
you want or (b) use PartitionPruningRDD.  PartitionPruningRDD will avoid
launching empty tasks on the other partitions, so it will be slightly more
efficient, but its also a developer api, so perhaps not worth going to that
level of detail.

Note that internally, sortByKey will sample your data to get an approximate
distribution, to figure out what data to put in each partition.  However,
your still getting an exact answer this way -- the approximation is only
important for distributing work among all executors.  Even if the
approximation is inaccurate, you'll still correct for it, you will just
have unequal partitions.

Imran


 On Sun, Feb 15, 2015 at 9:37 AM, SiMaYunRui myl...@hotmail.com wrote:

 hello,

 I am a newbie to spark and trying to figure out how to get percentile
 against a big data set. Actually, I googled this topic but not find any
 very useful code example and explanation. Seems that I can use transformer
 SortBykey to get my data set in order, but not pretty sure how can I get
 value of , for example, percentile 66.

 Should I use take() to pick up the value of percentile 66? I don't
 believe any machine can load my data set in memory. I believe there must be
 more efficient approaches.

 Can anyone shed some light on this problem?

 *Regards*





Re: Magic number 16: Why doesn't Spark Streaming process more than 16 files?

2015-02-17 Thread Imran Rashid
Hi Emre,

there shouldn't be any difference in which files get processed w/ print()
vs. foreachRDD().  In fact, if you look at the definition of print(), it is
just calling foreachRDD() underneath.  So there is something else going on
here.

We need a little more information to figure out exactly what is going on.
 (I think Sean was getting at the same thing ...)

(a) how do you know that when you use foreachRDD, all 20 files get
processed?

(b) How do you know that only 16 files get processed when you print()? Do
you know the other files are being skipped, or maybe they are just stuck
somewhere?  eg., suppose you start w/ 20 files, and you see 16 get
processed ... what happens after you add a few more files to the
directory?  Are they processed immediately, or are they never processed
either?

(c) Can you share any more code of what you are doing to the dstreams
*before* the print() / foreachRDD()?  That might give us more details about
what the difference is.

I can't see how .count.println() would be different than just println(),
but maybe I am missing something also.

Imran

On Mon, Feb 16, 2015 at 7:49 AM, Emre Sevinc emre.sev...@gmail.com wrote:

 Sean,

 In this case, I've been testing the code on my local machine and using
 Spark locally, so I all the log output was available on my terminal. And
 I've used the .print() method to have an output operation, just to force
 Spark execute.

 And I was not using foreachRDD, I was only using print() method on a
 JavaDStream object, and it was working fine for a few files, up to 16 (and
 without print() it did not do anything because there were no output
 operations).

 To sum it up, in my case:

  - Initially, use .print() and no foreachRDD: processes up to 16 files and
 does not do anything for the remaining 4.
  - Remove .print() and use foreachRDD: processes all of the 20 files.

 Maybe, as in Akhil Das's suggestion, using .count.print() might also have
 fixed my problem, but I'm satisfied with foreachRDD approach for now.
 (Though it is still a mystery to me why using .print() had a difference,
 maybe my mental model of Spark is wrong, I thought no matter what output
 operation I used, the number of files processed by Spark would be
 independent of that because the processing is done in a different method,
 .print() is only used to force Spark execute that processing, am I wrong?).

 --
 Emre


 On Mon, Feb 16, 2015 at 2:26 PM, Sean Owen so...@cloudera.com wrote:

 Materialization shouldn't be relevant. The collect by itself doesn't let
 you detect whether it happened. Print should print some results to the
 console but on different machines, so may not be a reliable way to see what
 happened.

 Yes I understand your real process uses foreachRDD and that's what you
 should use. It sounds like that works. But you must always have been using
 that right? What do you mean that you changed to use it?

 Basically I'm not clear on what the real code does and what about the
 output of that code tells you only 16 files were processed.
 On Feb 16, 2015 1:18 PM, Emre Sevinc emre.sev...@gmail.com wrote:

 Hello Sean,

 I did not understand your question very well, but what I do is checking
 the output directory (and I have various logger outputs at various stages
 showing the contents of an input file being processed, the response from
 the web service, etc.).

 By the way, I've already solved my problem by using foreachRDD instead
 of print (see my second message in this thread). Apparently forcing Spark
 to materialize DAG via print() is not the way to go. (My interpretation
 might be wrong, but this is what I've just seen in my case).

 --
 Emre




 On Mon, Feb 16, 2015 at 2:11 PM, Sean Owen so...@cloudera.com wrote:

 How are you deciding whether files are processed or not? It doesn't
 seem possible from this code. Maybe it just seems so.
 On Feb 16, 2015 12:51 PM, Emre Sevinc emre.sev...@gmail.com wrote:

 I've managed to solve this, but I still don't know exactly why my
 solution works:

 In my code I was trying to force the Spark to output via:

   jsonIn.print();

 jsonIn being a JavaDStreamString.

 When removed the code above, and added the code below to force the
 output operation, hence the execution:

 jsonIn.foreachRDD(new FunctionJavaRDDString, Void() {
   @Override
   public Void call(JavaRDDString stringJavaRDD) throws Exception
 {
 stringJavaRDD.collect();
 return null;
   }
 });

 It works as I expect, processing all of the 20 files I give to it,
 instead of stopping at 16.

 --
 Emre


 On Mon, Feb 16, 2015 at 12:56 PM, Emre Sevinc emre.sev...@gmail.com
 wrote:

 Hello,

 I have an application in Java that uses Spark Streaming 1.2.1 in the
 following manner:

  - Listen to the input directory.
  - If a new file is copied to that input directory process it.
  - Process: contact a RESTful web service (running also locally and
 responsive), send the contents of the file, receive the response from the
 web 

Re: MapValues and Shuffle Reads

2015-02-17 Thread Imran Rashid
Hi Darin,

When you say you see 400GB of shuffle writes from the first code snippet,
what do you mean?  There is no action in that first set, so it won't do
anything.  By itself, it won't do any shuffle writing, or anything else for
that matter.

Most likely, the .count() on your second code snippet is actually causing
the execution of some of the first snippet as well.  The .partitionBy will
result in both shuffle writes and shuffle reads, but they aren't set in
motion until the .count further down the line.  Its confusing b/c the stage
boundaries don't line up exactly with your RDD variables here.
hsfBaselinePairRDD
spans 2 stages, and baselinePairRDD actually gets merged into the stage
above it.

If you do a hsfBaselinePairRDD.count after your first code snippet, and
then run the second code snippet afterwards, is it more like what you
expect?

Imran

On Tue, Feb 17, 2015 at 1:52 PM, Darin McBeath ddmcbe...@yahoo.com.invalid
wrote:

 In the following code, I read in a large sequence file from S3 (1TB)
 spread across 1024 partitions.  When I look at the job/stage summary, I see
 about 400GB of shuffle writes which seems to make sense as I'm doing a hash
 partition on this file.

 // Get the baseline input file
 JavaPairRDDText,Text hsfBaselinePairRDDReadable =
 sc.hadoopFile(baselineInputBucketFile, SequenceFileInputFormat.class,
 Text.class, Text.class);

 JavaPairRDDString, String hsfBaselinePairRDD =
 hsfBaselinePairRDDReadable.mapToPair(new
 ConvertFromWritableTypes()).partitionBy(new
 HashPartitioner(Variables.NUM_OUTPUT_PARTITIONS)).persist(StorageLevel.MEMORY_AND_DISK_SER());

 I then execute the following code (with a count to force execution) and
 what I find very strange is that when I look at the job/stage summary, I
 see more than 340GB of shuffle read.  Why would there be any shuffle read
 in this step?  I would expect there to be little (if any) shuffle reads in
 this step.

 // Use 'substring' to extract the epoch value from each record.
 JavaPairRDDString, Long baselinePairRDD =
 hsfBaselinePairRDD.mapValues(new
 ExtractEpoch(accumBadBaselineRecords)).persist(StorageLevel.MEMORY_AND_DISK_SER());

 log.info(Number of baseline records:  + baselinePairRDD.count());

 Both hsfBaselinePairRDD and baselinePairRDD have 1024 partitions.

 Any insights would be appreciated.

 I'm using Spark 1.2.0 in a stand-alone cluster.


 Darin.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




  1   2   >