Re: Facing error: java.lang.ArrayIndexOutOfBoundsException while executing SparkSQL join query

2015-02-28 Thread anamika gupta
The issue is now resolved.

One of the csv files had an incorrect record at the end.

On Fri, Feb 27, 2015 at 4:24 PM, anamika gupta anamika.guo...@gmail.com
wrote:

 I have three tables with the following schema:

 case class* date_d*(WID: Int, CALENDAR_DATE: java.sql.Timestamp,
 DATE_STRING: String, DAY_OF_WEEK: String, DAY_OF_MONTH: Int, DAY_OF_YEAR:
 Int, END_OF_MONTH_FLAG: String, YEARWEEK: Int, CALENDAR_MONTH: String,
 MONTH_NUM: Int, YEARMONTH: Int, QUARTER: Int, YEAR: Int)



 case class* interval_f*(ORG_ID: Int, CHANNEL_WID: Int, SDP_WID: Int,
 MEAS_WID: Int, DATE_WID: Int, TIME_WID: Int, VALIDATION_STATUS_CD: Int,
 VAL_FAIL_CD:Int, INTERVAL_FLAG_CD: Int, CHANGE_METHOD_WID:Int,
 SOURCE_LAST_UPD_TIME: java.sql.Timestamp, INTERVAL_END_TIME:
 java.sql.Timestamp, LOCKED: String, EXT_VERSION_TIME: java.sql.Timestamp,
 INTERVAL_VALUE: Double, INSERT_TIME: java.sql.Timestamp, LAST_UPD_TIME:
 java.sql.Timestamp)



 class *sdp_d*( WID :Option[Int], BATCH_ID :Option[Int], SRC_ID
 :Option[String], ORG_ID :Option[Int], CLASS_WID :Option[Int], DESC_TEXT
 :Option[String], PREMISE_WID :Option[Int], FEED_LOC :Option[String],
 GPS_LAT :Option[Double], GPS_LONG :Option[Double], PULSE_OUTPUT_BLOCK
 :Option[String], UDC_ID :Option[String], UNIVERSAL_ID :Option[String],
 IS_VIRTUAL_FLG :Option[String], SEAL_INFO :Option[String], ACCESS_INFO
 :Option[String], ALT_ACCESS_INFO :Option[String], LOC_INFO :Option[String],
 ALT_LOC_INFO :Option[String], TYPE :Option[String], SUB_TYPE
 :Option[String], TIMEZONE_ID :Option[Int], GIS_ID :Option[String],
 BILLED_UPTO_TIME :Option[java.sql.Timestamp], POWER_STATUS :Option[String],
 LOAD_STATUS :Option[String], BILLING_HOLD_STATUS :Option[String],
 INSERT_TIME :Option[java.sql.Timestamp], LAST_UPD_TIME
 :Option[java.sql.Timestamp]) extends Product{

 @throws(classOf[IndexOutOfBoundsException])
 override def productElement(n: Int) = n match
 {
 case 0 = WID; case 1 = BATCH_ID; case 2 = SRC_ID; case 3 =
 ORG_ID; case 4 = CLASS_WID; case 5 = DESC_TEXT; case 6 = PREMISE_WID;
 case 7 = FEED_LOC; case 8 = GPS_LAT; case 9 = GPS_LONG; case 10 =
 PULSE_OUTPUT_BLOCK; case 11 = UDC_ID; case 12 = UNIVERSAL_ID; case 13 =
 IS_VIRTUAL_FLG; case 14 = SEAL_INFO; case 15 = ACCESS_INFO; case 16 =
 ALT_ACCESS_INFO; case 17 = LOC_INFO; case 18 = ALT_LOC_INFO; case 19 =
 TYPE; case 20 = SUB_TYPE; case 21 = TIMEZONE_ID; case 22 = GIS_ID; case
 23 = BILLED_UPTO_TIME; case 24 = POWER_STATUS; case 25 = LOAD_STATUS;
 case 26 = BILLING_HOLD_STATUS; case 27 = INSERT_TIME; case 28 =
 LAST_UPD_TIME; case _ = throw new IndexOutOfBoundsException(n.toString())
 }

 override def productArity: Int = 29; override def canEqual(that: Any):
 Boolean = that.isInstanceOf[sdp_d]
 }



 Non-join queries work fine:

 *val q1 = sqlContext.sql(SELECT YEAR, DAY_OF_YEAR, MAX(WID), MIN(WID),
 COUNT(*) FROM date_d GROUP BY YEAR, DAY_OF_YEAR ORDER BY YEAR,
 DAY_OF_YEAR)*

 res4: Array[org.apache.spark.sql.Row] =
 Array([2014,305,20141101,20141101,1], [2014,306,20141102,20141102,1],
 [2014,307,20141103,20141103,1], [2014,308,20141104,20141104,1],
 [2014,309,20141105,20141105,1], [2014,310,20141106,20141106,1],
 [2014,311,20141107,20141107,1], [2014,312,20141108,20141108,1],
 [2014,313,20141109,20141109,1], [2014,314,20141110,20141110,1],
 [2014,315,2014,2014,1], [2014,316,20141112,20141112,1],
 [2014,317,20141113,20141113,1], [2014,318,20141114,20141114,1],
 [2014,319,20141115,20141115,1], [2014,320,20141116,20141116,1],
 [2014,321,20141117,20141117,1], [2014,322,20141118,20141118,1],
 [2014,323,20141119,20141119,1], [2014,324,20141120,20141120,1],
 [2014,325,20141121,20141121,1], [2014,326,20141122,20141122,1],
 [2014,327,20141123,20141123,1], [2014,328,20141...



 But the join queries throw this error:*
 java.lang.ArrayIndexOutOfBoundsException*

 *scala val q = sqlContext.sql(select * from date_d dd join interval_f
 intf on intf.DATE_WID = dd.WID Where intf.DATE_WID = 20141101 AND
 intf.DATE_WID = 20141110)*

 q: org.apache.spark.sql.SchemaRDD =
 SchemaRDD[38] at RDD at SchemaRDD.scala:103
 == Query Plan ==
 == Physical Plan ==
 Project
 [WID#0,CALENDAR_DATE#1,DATE_STRING#2,DAY_OF_WEEK#3,DAY_OF_MONTH#4,DAY_OF_YEAR#5,END_OF_MONTH_FLAG#6,YEARWEEK#7,CALENDAR_MONTH#8,MONTH_NUM#9,YEARMONTH#10,QUARTER#11,YEAR#12,ORG_ID#13,CHANNEL_WID#14,SDP_WID#15,MEAS_WID#16,DATE_WID#17,TIME_WID#18,VALIDATION_STATUS_CD#19,VAL_FAIL_CD#20,INTERVAL_FLAG_CD#21,CHANGE_METHOD_WID#22,SOURCE_LAST_UPD_TIME#23,INTERVAL_END_TIME#24,LOCKED#25,EXT_VERSION_TIME#26,INTERVAL_VALUE#27,INSERT_TIME#28,LAST_UPD_TIME#29]
  ShuffledHashJoin [WID#0], [DATE_WID#17], BuildRight
   Exchange (HashPartitioning [WID#0], 200)
InMemoryColumnarTableScan
 [WID#0,CALENDAR_DATE#1,DATE_STRING#2,DAY_OF_WEEK#3,DAY_OF_MONTH#4,DAY_OF_YEAR#5,END_OF_MONTH_FLA...


 *scala q.take(5).foreach(println)*

 15/02/27 15:50:26 INFO SparkContext: Starting job: runJob at
 basicOperators.scala:136
 15/02/27 15:50:26 INFO DAGScheduler: Registering RDD 46 

SparkSQL production readiness

2015-02-28 Thread Ashish Mukherjee
Hi,

I am exploring SparkSQL for my purposes of performing large relational
operations across a cluster. However, it seems to be in alpha right now. Is
there any indication when it would be considered production-level? I don't
see any info on the site.

Regards,
Ashish


RE: SparkSQL production readiness

2015-02-28 Thread Wang, Daoyuan
Hopefully  the alpha tag will be remove in 1.4.0, if the community can review 
code a little bit faster :P

Thanks,
Daoyuan

From: Ashish Mukherjee [mailto:ashish.mukher...@gmail.com]
Sent: Saturday, February 28, 2015 4:28 PM
To: user@spark.apache.org
Subject: SparkSQL production readiness

Hi,

I am exploring SparkSQL for my purposes of performing large relational 
operations across a cluster. However, it seems to be in alpha right now. Is 
there any indication when it would be considered production-level? I don't see 
any info on the site.

Regards,
Ashish


Re: Upgrade to Spark 1.2.1 using Guava

2015-02-28 Thread Pat Ferrel
Maybe but any time the work around is to use spark-submit --conf 
spark.executor.extraClassPath=/guava.jar blah” that means that standalone apps 
must have hard coded paths that are honored on every worker. And as you know a 
lib is pretty much blocked from use of this version of Spark—hence the blocker 
severity.

I could easily be wrong but userClassPathFirst doesn’t seem to be the issue. 
There is no class conflict.

On Feb 27, 2015, at 7:13 PM, Sean Owen so...@cloudera.com wrote:

This seems like a job for userClassPathFirst. Or could be. It's
definitely an issue of visibility between where the serializer is and
where the user class is.

At the top you said Pat that you didn't try this, but why not?

On Fri, Feb 27, 2015 at 10:11 PM, Pat Ferrel p...@occamsmachete.com wrote:
 I’ll try to find a Jira for it. I hope a fix is in 1.3
 
 
 On Feb 27, 2015, at 1:59 PM, Pat Ferrel p...@occamsmachete.com wrote:
 
 Thanks! that worked.
 
 On Feb 27, 2015, at 1:50 PM, Pat Ferrel p...@occamsmachete.com wrote:
 
 I don’t use spark-submit I have a standalone app.
 
 So I guess you want me to add that key/value to the conf in my code and make 
 sure it exists on workers.
 
 
 On Feb 27, 2015, at 1:47 PM, Marcelo Vanzin van...@cloudera.com wrote:
 
 On Fri, Feb 27, 2015 at 1:42 PM, Pat Ferrel p...@occamsmachete.com wrote:
 I changed in the spark master conf, which is also the only worker. I added a 
 path to the jar that has guava in it. Still can’t find the class.
 
 Sorry, I'm still confused about what config you're changing. I'm
 suggesting using:
 
 spark-submit --conf spark.executor.extraClassPath=/guava.jar blah
 
 
 --
 Marcelo
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 
 
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 
 
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 
 
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Running in-memory SQL on streamed relational data

2015-02-28 Thread Ashish Mukherjee
Hi,

I have been looking at Spark Streaming , which seems to be for the use case
of live streams which are processed one line at a time generally in
real-time.

Since SparkSQL reads data from some filesystem, I was wondering if there is
something which connects SparkSQL with Spark Streaming, so I can send live
relational tuples in a stream (rather than read filesystem data) for SQL
operations.

Also, at present, doing it with Spark Streaming would have complexities of
handling multiple Dstreams etc. since I may want to run multiple adhoc
queries of this kind on adhoc data I stream through.

Has anyone done this kind of thing with Spark before? i.e combination of
SparkSQL with Streaming.

Regards,
Ashish


Re: Running in-memory SQL on streamed relational data

2015-02-28 Thread Akhil Das
I think you can do simple operations like foreachRDD or transform to get
access to the RDDs in the stream and then you can do SparkSQL over it.

Thanks
Best Regards

On Sat, Feb 28, 2015 at 3:27 PM, Ashish Mukherjee 
ashish.mukher...@gmail.com wrote:

 Hi,

 I have been looking at Spark Streaming , which seems to be for the use
 case of live streams which are processed one line at a time generally in
 real-time.

 Since SparkSQL reads data from some filesystem, I was wondering if there
 is something which connects SparkSQL with Spark Streaming, so I can send
 live relational tuples in a stream (rather than read filesystem data) for
 SQL operations.

 Also, at present, doing it with Spark Streaming would have complexities of
 handling multiple Dstreams etc. since I may want to run multiple adhoc
 queries of this kind on adhoc data I stream through.

 Has anyone done this kind of thing with Spark before? i.e combination of
 SparkSQL with Streaming.

 Regards,
 Ashish



SORT BY and ORDER BY file size v/s RAM size

2015-02-28 Thread DEVAN M.S.
*Hi devs,*

*Is there any connection between the input file size and RAM size for
sorting using SparkSQL ?*
*I tried 1 GB file with 8 GB RAM with 4 cores and got
java.lang.OutOfMemoryError: GC overhead limit exceeded.*
*Or could it  be for any other reason ? Its working for other SparkSQL
operations.*


15/02/28 16:33:03 INFO Utils: Successfully started service 'sparkDriver' on
port 41392.
15/02/28 16:33:03 INFO SparkEnv: Registering MapOutputTracker
15/02/28 16:33:03 INFO SparkEnv: Registering BlockManagerMaster
15/02/28 16:33:03 INFO DiskBlockManager: Created local directory at
/tmp/spark-ecf4d6f0-c526-48fa-bd8a-d74a8bf64820/spark-4865c193-05e6-4aa1-999b-ab8c426479ab
15/02/28 16:33:03 INFO MemoryStore: MemoryStore started with capacity 944.7
MB
15/02/28 16:33:03 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
15/02/28 16:33:03 INFO HttpFileServer: HTTP File server directory is
/tmp/spark-af545c0b-15e6-4efa-a151-2c73faba8948/spark-987f58b4-5735-4965-91d1-38f238f4bb11
15/02/28 16:33:03 INFO HttpServer: Starting HTTP Server
15/02/28 16:33:03 INFO Utils: Successfully started service 'HTTP file
server' on port 44588.
15/02/28 16:33:08 INFO Utils: Successfully started service 'SparkUI' on
port 4040.
15/02/28 16:33:08 INFO SparkUI: Started SparkUI at http://10.30.9.7:4040
15/02/28 16:33:08 INFO Executor: Starting executor ID driver on host
localhost
15/02/28 16:33:08 INFO AkkaUtils: Connecting to HeartbeatReceiver:
akka.tcp://sparkDriver@10.30.9.7:41392/user/HeartbeatReceiver
15/02/28 16:33:08 INFO NettyBlockTransferService: Server created on 34475
15/02/28 16:33:08 INFO BlockManagerMaster: Trying to register BlockManager
15/02/28 16:33:08 INFO BlockManagerMasterActor: Registering block manager
localhost:34475 with 944.7 MB RAM, BlockManagerId(driver, localhost,
34475)
15/02/28 16:33:08 INFO BlockManagerMaster: Registered BlockManager
15/02/28 16:33:09 INFO MemoryStore: ensureFreeSpace(193213) called with
curMem=0, maxMem=990550425
15/02/28 16:33:09 INFO MemoryStore: Block broadcast_0 stored as values in
memory (estimated size 188.7 KB, free 944.5 MB)
15/02/28 16:33:09 INFO MemoryStore: ensureFreeSpace(25432) called with
curMem=193213, maxMem=990550425
15/02/28 16:33:09 INFO MemoryStore: Block broadcast_0_piece0 stored as
bytes in memory (estimated size 24.8 KB, free 944.5 MB)
15/02/28 16:33:09 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory
on localhost:34475 (size: 24.8 KB, free: 944.6 MB)
15/02/28 16:33:09 INFO BlockManagerMaster: Updated info of block
broadcast_0_piece0
15/02/28 16:33:09 INFO SparkContext: Created broadcast 0 from textFile at
SortSQL.scala:20
15/02/28 16:33:10 INFO HiveMetaStore: 0: Opening raw store with
implemenation class:org.apache.hadoop.hive.metastore.ObjectStore
15/02/28 16:33:10 INFO ObjectStore: ObjectStore, initialize called
15/02/28 16:33:10 INFO Persistence: Property datanucleus.cache.level2
unknown - will be ignored
15/02/28 16:33:10 INFO Persistence: Property
hive.metastore.integral.jdo.pushdown unknown - will be ignored
15/02/28 16:33:12 INFO ObjectStore: Setting MetaStore object pin classes
with
hive.metastore.cache.pinobjtypes=Table,StorageDescriptor,SerDeInfo,Partition,Database,Type,FieldSchema,Order
15/02/28 16:33:12 INFO MetaStoreDirectSql: MySQL check failed, assuming we
are not on mysql: Lexical error at line 1, column 5.  Encountered: @
(64), after : .
15/02/28 16:33:13 INFO Datastore: The class
org.apache.hadoop.hive.metastore.model.MFieldSchema is tagged as
embedded-only so does not have its own datastore table.
15/02/28 16:33:13 INFO Datastore: The class
org.apache.hadoop.hive.metastore.model.MOrder is tagged as
embedded-only so does not have its own datastore table.
15/02/28 16:33:13 INFO Datastore: The class
org.apache.hadoop.hive.metastore.model.MFieldSchema is tagged as
embedded-only so does not have its own datastore table.
15/02/28 16:33:13 INFO Datastore: The class
org.apache.hadoop.hive.metastore.model.MOrder is tagged as
embedded-only so does not have its own datastore table.
15/02/28 16:33:13 INFO Query: Reading in results for query
org.datanucleus.store.rdbms.query.SQLQuery@0 since the connection used is
closing
15/02/28 16:33:13 INFO ObjectStore: Initialized ObjectStore
15/02/28 16:33:14 INFO HiveMetaStore: Added admin role in metastore
15/02/28 16:33:14 INFO HiveMetaStore: Added public role in metastore
15/02/28 16:33:14 INFO HiveMetaStore: No user is added in admin role, since
config is empty
15/02/28 16:33:14 INFO SessionState: No Tez session required at this point.
hive.execution.engine=mr.
15/02/28 16:33:14 INFO ParseDriver: Parsing command: SELECT * FROM people
SORT BY B DESC
15/02/28 16:33:14 INFO ParseDriver: Parse Completed
15/02/28 16:33:14 INFO deprecation: mapred.tip.id is deprecated. Instead,
use mapreduce.task.id
15/02/28 16:33:14 INFO deprecation: mapred.task.id is deprecated. Instead,
use mapreduce.task.attempt.id
15/02/28 16:33:14 INFO deprecation: 

Reg. Difference in Performance

2015-02-28 Thread Deep Pradhan
Hi,
I am running Spark applications in GCE. I set up cluster with different
number of nodes varying from 1 to 7. The machines are single core machines.
I set the spark.default.parallelism to the number of nodes in the cluster
for each cluster. I ran the four applications available in Spark Examples,
SparkTC, SparkALS, SparkLR, SparkPi for each of the configurations.
What I notice is the following:
In case of SparkTC and SparkALS, the time to complete the job increases
with the increase in number of nodes in cluster, where as in SparkLR and
SparkPi, the time to complete the job remains the same across all the
configurations.
Could anyone explain me this?

Thank You
Regards,
Deep


bitten by spark.yarn.executor.memoryOverhead

2015-02-28 Thread Koert Kuipers
hey,
running my first map-red like (meaning disk-to-disk, avoiding in memory
RDDs) computation in spark on yarn i immediately got bitten by a too low
spark.yarn.executor.memoryOverhead. however it took me about an hour to
find out this was the cause. at first i observed failing shuffles leading
to restarting of tasks, then i realized this was because executors could
not be reached, then i noticed in containers got shut down and reallocated
in resourcemanager logs (no mention of errors, it seemed the containers
finished their business and shut down successfully), and finally i found
the reason in nodemanager logs.

i dont think this is a pleasent first experience. i realize
spark.yarn.executor.memoryOverhead needs to be set differently from
situation to situation. but shouldnt the default be a somewhat higher value
so that these errors are unlikely, and then the experts that are willing to
deal with these errors can tune it lower? so why not make the default 10%
instead of 7%? that gives something that works in most situations out of
the box (at the cost of being a little wasteful). it worked for me.


Re: bitten by spark.yarn.executor.memoryOverhead

2015-02-28 Thread Ted Yu
I have created SPARK-6085 with pull request:
https://github.com/apache/spark/pull/4836

Cheers

On Sat, Feb 28, 2015 at 12:08 PM, Corey Nolet cjno...@gmail.com wrote:

 +1 to a better default as well.

 We were working find until we ran against a real dataset which was much
 larger than the test dataset we were using locally. It took me a couple
 days and digging through many logs to figure out this value was what was
 causing the problem.

 On Sat, Feb 28, 2015 at 11:38 AM, Ted Yu yuzhih...@gmail.com wrote:

 Having good out-of-box experience is desirable.

 +1 on increasing the default.


 On Sat, Feb 28, 2015 at 8:27 AM, Sean Owen so...@cloudera.com wrote:

 There was a recent discussion about whether to increase or indeed make
 configurable this kind of default fraction. I believe the suggestion
 there too was that 9-10% is a safer default.

 Advanced users can lower the resulting overhead value; it may still
 have to be increased in some cases, but a fatter default may make this
 kind of surprise less frequent.

 I'd support increasing the default; any other thoughts?

 On Sat, Feb 28, 2015 at 3:34 PM, Koert Kuipers ko...@tresata.com
 wrote:
  hey,
  running my first map-red like (meaning disk-to-disk, avoiding in memory
  RDDs) computation in spark on yarn i immediately got bitten by a too
 low
  spark.yarn.executor.memoryOverhead. however it took me about an hour
 to find
  out this was the cause. at first i observed failing shuffles leading to
  restarting of tasks, then i realized this was because executors could
 not be
  reached, then i noticed in containers got shut down and reallocated in
  resourcemanager logs (no mention of errors, it seemed the containers
  finished their business and shut down successfully), and finally i
 found the
  reason in nodemanager logs.
 
  i dont think this is a pleasent first experience. i realize
  spark.yarn.executor.memoryOverhead needs to be set differently from
  situation to situation. but shouldnt the default be a somewhat higher
 value
  so that these errors are unlikely, and then the experts that are
 willing to
  deal with these errors can tune it lower? so why not make the default
 10%
  instead of 7%? that gives something that works in most situations out
 of the
  box (at the cost of being a little wasteful). it worked for me.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






Re: bitten by spark.yarn.executor.memoryOverhead

2015-02-28 Thread Corey Nolet
Thanks for taking this on Ted!

On Sat, Feb 28, 2015 at 4:17 PM, Ted Yu yuzhih...@gmail.com wrote:

 I have created SPARK-6085 with pull request:
 https://github.com/apache/spark/pull/4836

 Cheers

 On Sat, Feb 28, 2015 at 12:08 PM, Corey Nolet cjno...@gmail.com wrote:

 +1 to a better default as well.

 We were working find until we ran against a real dataset which was much
 larger than the test dataset we were using locally. It took me a couple
 days and digging through many logs to figure out this value was what was
 causing the problem.

 On Sat, Feb 28, 2015 at 11:38 AM, Ted Yu yuzhih...@gmail.com wrote:

 Having good out-of-box experience is desirable.

 +1 on increasing the default.


 On Sat, Feb 28, 2015 at 8:27 AM, Sean Owen so...@cloudera.com wrote:

 There was a recent discussion about whether to increase or indeed make
 configurable this kind of default fraction. I believe the suggestion
 there too was that 9-10% is a safer default.

 Advanced users can lower the resulting overhead value; it may still
 have to be increased in some cases, but a fatter default may make this
 kind of surprise less frequent.

 I'd support increasing the default; any other thoughts?

 On Sat, Feb 28, 2015 at 3:34 PM, Koert Kuipers ko...@tresata.com
 wrote:
  hey,
  running my first map-red like (meaning disk-to-disk, avoiding in
 memory
  RDDs) computation in spark on yarn i immediately got bitten by a too
 low
  spark.yarn.executor.memoryOverhead. however it took me about an hour
 to find
  out this was the cause. at first i observed failing shuffles leading
 to
  restarting of tasks, then i realized this was because executors could
 not be
  reached, then i noticed in containers got shut down and reallocated in
  resourcemanager logs (no mention of errors, it seemed the containers
  finished their business and shut down successfully), and finally i
 found the
  reason in nodemanager logs.
 
  i dont think this is a pleasent first experience. i realize
  spark.yarn.executor.memoryOverhead needs to be set differently from
  situation to situation. but shouldnt the default be a somewhat higher
 value
  so that these errors are unlikely, and then the experts that are
 willing to
  deal with these errors can tune it lower? so why not make the default
 10%
  instead of 7%? that gives something that works in most situations out
 of the
  box (at the cost of being a little wasteful). it worked for me.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org







Re: SparkSQL production readiness

2015-02-28 Thread Michael Armbrust
We are planning to remove the alpha tag in 1.3.0.

On Sat, Feb 28, 2015 at 12:30 AM, Wang, Daoyuan daoyuan.w...@intel.com
wrote:

  Hopefully  the alpha tag will be remove in 1.4.0, if the community can
 review code a little bit faster :P



 Thanks,

 Daoyuan



 *From:* Ashish Mukherjee [mailto:ashish.mukher...@gmail.com]
 *Sent:* Saturday, February 28, 2015 4:28 PM
 *To:* user@spark.apache.org
 *Subject:* SparkSQL production readiness



 Hi,



 I am exploring SparkSQL for my purposes of performing large relational
 operations across a cluster. However, it seems to be in alpha right now. Is
 there any indication when it would be considered production-level? I don't
 see any info on the site.



 Regards,

 Ashish



Re: Tools to manage workflows on Spark

2015-02-28 Thread Qiang Cao
Thanks, Ashish! Is Oozie integrated with Spark? I knew it can accommodate
some Hadoop jobs.


On Sat, Feb 28, 2015 at 6:07 PM, Ashish Nigam ashnigamt...@gmail.com
wrote:

 Qiang,
 Did you look at Oozie?
 We use oozie to run spark jobs in production.


 On Feb 28, 2015, at 2:45 PM, Qiang Cao caoqiang...@gmail.com wrote:

 Hi Everyone,

 We need to deal with workflows on Spark. In our scenario, each workflow
 consists of multiple processing steps. Among different steps, there could
 be dependencies.  I'm wondering if there are tools available that can
 help us schedule and manage workflows on Spark. I'm looking for something
 like pig on Hadoop, but it should fully function on Spark.

 Any suggestion?

 Thanks in advance!

 Qiang





Re: Reg. Difference in Performance

2015-02-28 Thread Joseph Bradley
Hi Deep,

Compute times may not be very meaningful for small examples like those.  If
you increase the sizes of the examples, then you may start to observe more
meaningful trends and speedups.

Joseph

On Sat, Feb 28, 2015 at 7:26 AM, Deep Pradhan pradhandeep1...@gmail.com
wrote:

 Hi,
 I am running Spark applications in GCE. I set up cluster with different
 number of nodes varying from 1 to 7. The machines are single core machines.
 I set the spark.default.parallelism to the number of nodes in the cluster
 for each cluster. I ran the four applications available in Spark Examples,
 SparkTC, SparkALS, SparkLR, SparkPi for each of the configurations.
 What I notice is the following:
 In case of SparkTC and SparkALS, the time to complete the job increases
 with the increase in number of nodes in cluster, where as in SparkLR and
 SparkPi, the time to complete the job remains the same across all the
 configurations.
 Could anyone explain me this?

 Thank You
 Regards,
 Deep



Re: Tools to manage workflows on Spark

2015-02-28 Thread Ashish Nigam
You have to call spark-submit from oozie.
I used this link to get the idea for my implementation - 

http://mail-archives.apache.org/mod_mbox/oozie-user/201404.mbox/%3CCAHCsPn-0Grq1rSXrAZu35yy_i4T=fvovdox2ugpcuhkwmjp...@mail.gmail.com%3E



 On Feb 28, 2015, at 3:25 PM, Qiang Cao caoqiang...@gmail.com wrote:
 
 Thanks, Ashish! Is Oozie integrated with Spark? I knew it can accommodate 
 some Hadoop jobs.
 
 
 On Sat, Feb 28, 2015 at 6:07 PM, Ashish Nigam ashnigamt...@gmail.com 
 mailto:ashnigamt...@gmail.com wrote:
 Qiang,
 Did you look at Oozie?
 We use oozie to run spark jobs in production.
 
 
 On Feb 28, 2015, at 2:45 PM, Qiang Cao caoqiang...@gmail.com 
 mailto:caoqiang...@gmail.com wrote:
 
 Hi Everyone,
 
 We need to deal with workflows on Spark. In our scenario, each workflow 
 consists of multiple processing steps. Among different steps, there could be 
 dependencies.  I'm wondering if there are tools available that can help us 
 schedule and manage workflows on Spark. I'm looking for something like pig 
 on Hadoop, but it should fully function on Spark.
 
 Any suggestion?
 
 Thanks in advance!
 
 Qiang
 
 



How to debug a hung spark application

2015-02-28 Thread manasdebashiskar
Hi,
 I have a spark application that hangs on doing just one task (Rest 200-300
task gets completed in reasonable time)
I can see in the Thread dump which function gets stuck how ever I don't
have a clue as to what value is causing that behaviour.
Also, logging the inputs before the function is executed does not help as
the actual message gets buried in logs.

How do one go about debugging such case?
Also, is there a way I can wrap my function inside some sort of timer based
environment and if it took too long I would throw a stack trace or some
sort.

Thanks




-
Manas Kar
--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-debug-a-hung-spark-application-tp21859.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: How to debug a Hung task

2015-02-28 Thread Michael Albert
For what it's worth, I was seeing mysterious hangs, but it went away when 
upgrading from spark1.2 to 1.2.1.I don't know if this is your problem.Also, I'm 
using AWS EMR images, which were also upgraded.
Anyway, that's my experience.
-Mike

  From: Manas Kar manasdebashis...@gmail.com
 To: user@spark.apache.org user@spark.apache.org 
 Sent: Friday, February 27, 2015 3:50 PM
 Subject: How to debug a Hung task
   
Hi,  I have a spark application that hangs on doing just one task (Rest 200-300 
task gets completed in reasonable time)I can see in the Thread dump which 
function gets stuck how ever I don't have a clue as to what value is causing 
that behaviour.Also, logging the inputs before the function is executed does 
not help as the actual message gets buried in logs.
How do one go about debugging such case?Also, is there a way I can wrap my 
function inside some sort of timer based environment and if it took too long I 
would throw a stack trace or some sort. 
ThanksManas

  

Re: Getting to proto buff classes in Spark Context

2015-02-28 Thread John Meehan
Maybe try including the jar with 

--driver-class-path jar



 On Feb 26, 2015, at 12:16 PM, Akshat Aranya aara...@gmail.com wrote:
 
 My guess would be that you are packaging too many things in your job, which 
 is causing problems with the classpath.  When your jar goes in first, you get 
 the correct version of protobuf, but some other version of something else.  
 When your jar goes in later, other things work, but protobuf breaks.  This is 
 just a guess though; take a look at what you're packaging in your jar and 
 look for things that Spark or Kafka could also be using.
 
 On Thu, Feb 26, 2015 at 10:06 AM, necro351 . necro...@gmail.com wrote:
 Hello everyone,
 
 We are trying to decode a message inside a Spark job that we receive from 
 Kafka. The message is encoded using Proto Buff. The problem is when decoding 
 we get class-not-found exceptions. We have tried remedies we found online in 
 Stack Exchange and mail list archives but nothing seems to work.
 
 (This question is a re-ask, but we really cannot figure this one out.)
 
 We created a standalone repository with a very simple Spark job that 
 exhibits the above issues. The spark job reads the messages from the FS, 
 decodes them, and prints them. Its easy to checkout and try to see the 
 exception yourself: just uncomment the code that prints the messages from 
 within the RDD. The only sources are the generated Proto Buff java sources 
 and a small Spark Job that decodes a message. I'd appreciate if anyone could 
 take a look.
 
 https://github.com/vibhav/spark-protobuf
 
 We tried a couple remedies already.
 
 Setting spark.files.userClassPathFirst didn't fix the problem for us. I am 
 not very familiar with the Spark and Scala environment, so please correct 
 any incorrect assumptions or statements I make.
 
 However, I don't believe this to be a classpath visibility issue. I wrote a 
 small helper method to print out the classpath from both the driver and 
 worker, and the output is identical. (I'm printing out 
 System.getProperty(java.class.path) -- is there a better way to do this or 
 check the class path?). You can print out the class paths the same way we 
 are from the example project above.
 
 Furthermore, userClassPathFirst seems to have a detrimental effect on 
 otherwise working code, which I cannot explain or do not understand. 
 
 For example, I created a trivial RDD as such:
 
 val l = List(1, 2, 3)
 sc.makeRDD(l).foreach((x: Int) = {
 println(x.toString)
 })
 
 With userClassPathFirst set, I encounter a java.lang.ClassCastException 
 trying to execute that code. Is that to be expected? You can re-create this 
 issue by commenting out the block of code that tries to print the above in 
 the example project we linked to above.
 
 We also tried dynamically adding the jar with .addJar to the Spark Context 
 but this seemed to have no effect.
 
 Thanks in advance for any help y'all can provide.
 


Re: Problem getting program to run on 15TB input

2015-02-28 Thread Arun Luthra
A correction to my first post:

There is also a repartition right before groupByKey to help avoid
too-many-open-files error:

rdd2.union(rdd1).map(...).filter(...).repartition(15000).groupByKey().map(...).flatMap(...).saveAsTextFile()

On Sat, Feb 28, 2015 at 11:10 AM, Arun Luthra arun.lut...@gmail.com wrote:

 The job fails before getting to groupByKey.

 I see a lot of timeout errors in the yarn logs, like:

 15/02/28 12:47:16 WARN util.AkkaUtils: Error sending message in 1 attempts
 akka.pattern.AskTimeoutException: Timed out

 and

 15/02/28 12:47:49 WARN util.AkkaUtils: Error sending message in 2 attempts
 java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]

 and some of these are followed by:

 15/02/28 12:48:02 ERROR executor.CoarseGrainedExecutorBackend: Driver
 Disassociated [akka.tcp://sparkExecutor@...] - [akka.tcp://sparkDriver@...]
 disassociated! Shutting down.
 15/02/28 12:48:02 ERROR executor.Executor: Exception in task 421027.0 in
 stage 1.0 (TID 336601)
 java.io.FileNotFoundException:
 /hadoop/yarn/local//spark-local-20150228123450-3a71/36/shuffle_0_421027_0
 (No such file or directory)




 On Sat, Feb 28, 2015 at 9:33 AM, Paweł Szulc paul.sz...@gmail.com wrote:

 I would first check whether  there is any possibility that after doing
 groupbykey one of the groups does not fit in one of the executors' memory.

 To back up my theory, instead of doing groupbykey + map try reducebykey +
 mapvalues.

 Let me know if that helped.

 Pawel Szulc
 http://rabbitonweb.com

 sob., 28 lut 2015, 6:22 PM Arun Luthra użytkownik arun.lut...@gmail.com
 napisał:

 So, actually I am removing the persist for now, because there is
 significant filtering that happens after calling textFile()... but I will
 keep that option in mind.

 I just tried a few different combinations of number of executors,
 executor memory, and more importantly, number of tasks... *all three
 times it failed when approximately 75.1% of the tasks were completed (no
 matter how many tasks resulted from repartitioning the data in
 textfile(..., N))*. Surely this is a strong clue to something?



 On Fri, Feb 27, 2015 at 1:07 PM, Burak Yavuz brk...@gmail.com wrote:

 Hi,

 Not sure if it can help, but `StorageLevel.MEMORY_AND_DISK_SER`
 generates many small objects that lead to very long GC time, causing the
 executor losts, heartbeat not received, and GC overhead limit exceeded
 messages.
 Could you try using `StorageLevel.MEMORY_AND_DISK` instead? You can
 also try `OFF_HEAP` (and use Tachyon).

 Burak

 On Fri, Feb 27, 2015 at 11:39 AM, Arun Luthra arun.lut...@gmail.com
 wrote:

 My program in pseudocode looks like this:

 val conf = new SparkConf().setAppName(Test)
   .set(spark.storage.memoryFraction,0.2) // default 0.6
   .set(spark.shuffle.memoryFraction,0.12) // default 0.2
   .set(spark.shuffle.manager,SORT) // preferred setting for
 optimized joins
   .set(spark.shuffle.consolidateFiles,true) // helpful for
 too many files open
   .set(spark.mesos.coarse, true) // helpful for
 MapOutputTracker errors?
   .set(spark.akka.frameSize,500) // helpful when using
 consildateFiles=true
   .set(spark.akka.askTimeout, 30)
   .set(spark.shuffle.compress,false) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
   .set(spark.file.transferTo,false) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
   .set(spark.core.connection.ack.wait.timeout,600) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
   .set(spark.speculation,true)
   .set(spark.worker.timeout,600) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html
   .set(spark.akka.timeout,300) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html
   .set(spark.storage.blockManagerSlaveTimeoutMs,12)
   .set(spark.driver.maxResultSize,2048) // in response to
 error: Total size of serialized results of 39901 tasks (1024.0 MB) is
 bigger than spark.driver.maxResultSize (1024.0 MB)
   .set(spark.serializer,
 org.apache.spark.serializer.KryoSerializer)

 .set(spark.kryo.registrator,com.att.bdcoe.cip.ooh.MyRegistrator)
   .set(spark.kryo.registrationRequired, true)

 val rdd1 = sc.textFile(file1).persist(StorageLevel
 .MEMORY_AND_DISK_SER).map(_.split(\\|, -1)...filter(...)

 val rdd2 =
 sc.textFile(file2).persist(StorageLevel.MEMORY_AND_DISK_SER).map(_.split(\\|,
 -1)...filter(...)


 rdd2.union(rdd1).map(...).filter(...).groupByKey().map(...).flatMap(...).saveAsTextFile()


 I run the code with:
   --num-executors 500 \
   --driver-memory 20g \
   --executor-memory 20g \
   --executor-cores 32 \


 I'm using kryo serialization on everything, including broadcast
 variables.

 Spark creates 145k tasks, and the first stage includes everything
 before groupByKey(). It fails before getting to groupByKey. I have 

Re: Upgrade to Spark 1.2.1 using Guava

2015-02-28 Thread Erlend Hamnaberg
Yes. I ran into this problem with mahout snapshot and spark 1.2.0 not
really trying to figure out why that was a problem, since there were
already too many moving parts in my app. Obviously there is a classpath
issue somewhere.

/Erlend
On 27 Feb 2015 22:30, Pat Ferrel p...@occamsmachete.com wrote:

 @Erlend hah, we were trying to merge your PR and ran into this—small
 world. You actually compile the JavaSerializer source in your project?

 @Marcelo do you mean by modifying spark.executor.extraClassPath on all
 workers, that didn’t seem to work?

 On Feb 27, 2015, at 1:23 PM, Erlend Hamnaberg erl...@hamnaberg.net
 wrote:

 Hi.

 I have had a simliar issue. I had to pull the JavaSerializer source into
 my own project, just so I got the classloading of this class under control.

 This must be a class loader issue with spark.

 -E

 On Fri, Feb 27, 2015 at 8:52 PM, Pat Ferrel p...@occamsmachete.com wrote:

 I understand that I need to supply Guava to Spark. The HashBiMap is
 created in the client and broadcast to the workers. So it is needed in
 both. To achieve this there is a deps.jar with Guava (and Scopt but that is
 only for the client). Scopt is found so I know the jar is fine for the
 client.

 I pass in the deps.jar to the context creation code. I’ve checked the
 content of the jar and have verified that it is used at context creation
 time.

 I register the serializer as follows:

 class MyKryoRegistrator extends KryoRegistrator {

   override def registerClasses(kryo: Kryo) = {

 val h: HashBiMap[String, Int] = HashBiMap.create[String, Int]()
 //kryo.addDefaultSerializer(h.getClass, new JavaSerializer())
 log.info(\n\n\nRegister Serializer for  +
 h.getClass.getCanonicalName + \n\n\n) // just to be sure this does indeed
 get logged
 kryo.register(classOf[com.google.common.collect.HashBiMap[String,
 Int]], new JavaSerializer())
   }
 }

 The job proceeds up until the broadcast value, a HashBiMap, is
 deserialized, which is where I get the following error.

 Have I missed a step for deserialization of broadcast values? Odd that
 serialization happened but deserialization failed. I’m running on a
 standalone localhost-only cluster.


 15/02/27 11:40:34 WARN scheduler.TaskSetManager: Lost task 1.0 in stage
 4.0 (TID 9, 192.168.0.2): java.io.IOException:
 com.esotericsoftware.kryo.KryoException: Error during Java deserialization.
 at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1093)
 at
 org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164)
 at
 org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
 at
 org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
 at
 org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87)
 at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
 at
 my.TDIndexedDatasetReader$$anonfun$5.apply(TextDelimitedReaderWriter.scala:95)
 at
 my.TDIndexedDatasetReader$$anonfun$5.apply(TextDelimitedReaderWriter.scala:94)
 at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
 at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
 at
 org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:366)
 at
 org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:211)
 at
 org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
 at org.apache.spark.scheduler.Task.run(Task.scala:56)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)
 Caused by: com.esotericsoftware.kryo.KryoException: Error during Java
 deserialization.
 at
 com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:42)
 at
 com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
 at
 org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:144)
 at
 org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:216)
 at
 org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:177)
 at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1090)
 ... 19 more

  root eror ==
 Caused by: java.lang.ClassNotFoundException:
 com.google.common.collect.HashBiMap
 at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
 at 

Tools to manage workflows on Spark

2015-02-28 Thread Qiang Cao
Hi Everyone,

We need to deal with workflows on Spark. In our scenario, each workflow
consists of multiple processing steps. Among different steps, there could
be dependencies.  I'm wondering if there are tools available that can help
us schedule and manage workflows on Spark. I'm looking for something like
pig on Hadoop, but it should fully function on Spark.

Any suggestion?

Thanks in advance!

Qiang


Re: Tools to manage workflows on Spark

2015-02-28 Thread Ashish Nigam
Qiang,
Did you look at Oozie?
We use oozie to run spark jobs in production.


 On Feb 28, 2015, at 2:45 PM, Qiang Cao caoqiang...@gmail.com wrote:
 
 Hi Everyone,
 
 We need to deal with workflows on Spark. In our scenario, each workflow 
 consists of multiple processing steps. Among different steps, there could be 
 dependencies.  I'm wondering if there are tools available that can help us 
 schedule and manage workflows on Spark. I'm looking for something like pig on 
 Hadoop, but it should fully function on Spark.
 
 Any suggestion?
 
 Thanks in advance!
 
 Qiang



Re: Unable to find org.apache.spark.sql.catalyst.ScalaReflection class

2015-02-28 Thread Ashish Nigam
Ted,
spark-catalyst_2.11-1.2.1.jar is present in the class path. BTW, I am running 
the code locally in eclipse workspace.

Here’s complete exception stack trace - 

Exception in thread main scala.ScalaReflectionException: class 
org.apache.spark.sql.catalyst.ScalaReflection in JavaMirror with primordial 
classloader with boot classpath 
[/Applications/eclipse/plugins/org.scala-lang.scala-library_2.11.5.v20150101-184742-3fafbc204f.jar:/Applications/eclipse/plugins/org.scala-lang.scala-reflect_2.11.5.v20150101-184742-3fafbc204f.jar:/Applications/eclipse/plugins/org.scala-lang.scala-actors_2.11.5.v20150101-184742-3fafbc204f.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_31.jdk/Contents/Home/jre/lib/resources.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_31.jdk/Contents/Home/jre/lib/rt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_31.jdk/Contents/Home/jre/lib/sunrsasign.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_31.jdk/Contents/Home/jre/lib/jsse.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_31.jdk/Contents/Home/jre/lib/jce.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_31.jdk/Contents/Home/jre/lib/charsets.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_31.jdk/Contents/Home/jre/lib/jfr.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_31.jdk/Contents/Home/jre/classes]
 not found.
at 
scala.reflect.internal.Mirrors$RootsBase.staticClass(Mirrors.scala:123)
at 
scala.reflect.internal.Mirrors$RootsBase.staticClass(Mirrors.scala:22)
at 
org.apache.spark.sql.catalyst.ScalaReflection$$typecreator1$1.apply(ScalaReflection.scala:115)
at 
scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe$lzycompute(TypeTags.scala:232)
at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe(TypeTags.scala:232)
at scala.reflect.api.TypeTags$class.typeOf(TypeTags.scala:341)
at scala.reflect.api.Universe.typeOf(Universe.scala:61)
at 
org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:115)
at 
org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:33)
at 
org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:100)
at 
org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:33)
at 
org.apache.spark.sql.catalyst.ScalaReflection$class.attributesFor(ScalaReflection.scala:94)
at 
org.apache.spark.sql.catalyst.ScalaReflection$.attributesFor(ScalaReflection.scala:33)
at org.apache.spark.sql.SQLContext.createSchemaRDD(SQLContext.scala:111)
——





 On Feb 28, 2015, at 9:31 AM, Ted Yu yuzhih...@gmail.com wrote:
 
 Have you verified that spark-catalyst_2.10 jar was in the classpath ?
 
 Cheers
 
 On Sat, Feb 28, 2015 at 9:18 AM, Ashish Nigam ashnigamt...@gmail.com 
 mailto:ashnigamt...@gmail.com wrote:
 Hi,
 I wrote a very simple program in scala to convert an existing RDD to 
 SchemaRDD.
 But createSchemaRDD function is throwing exception 
 
 Exception in thread main scala.ScalaReflectionException: class 
 org.apache.spark.sql.catalyst.ScalaReflection in JavaMirror with primordial 
 classloader with boot classpath [.] not found
 
 
 Here's more info on the versions I am using - 
 
 scala.binary.version2.11/scala.binary.version
 spark.version1.2.1/spark.version
 scala.version2.11.5/scala.version
 
 Please let me know how can I resolve this problem.
 
 Thanks
 Ashish
 



Re: Is there any Sparse Matrix implementation in Spark/MLib?

2015-02-28 Thread Joseph Bradley
Hi Shahab,

There are actually a few distributed Matrix types which support sparse
representations: RowMatrix, IndexedRowMatrix, and CoordinateMatrix.
The documentation has a bit more info about the various uses:
http://spark.apache.org/docs/latest/mllib-data-types.html#distributed-matrix

The Spark 1.3 RC includes a new one: BlockMatrix.

But since these are distributed, they are represented using RDDs, so they
of course will not be as fast as computations on smaller, locally stored
matrices.

Joseph

On Fri, Feb 27, 2015 at 4:39 AM, Ritesh Kumar Singh 
riteshoneinamill...@gmail.com wrote:

 try using breeze (scala linear algebra library)

 On Fri, Feb 27, 2015 at 5:56 PM, shahab shahab.mok...@gmail.com wrote:

 Thanks a lot Vijay, let me see how it performs.

 Best
 Shahab


 On Friday, February 27, 2015, Vijay Saraswat vi...@saraswat.org wrote:

 Available in GML --

 http://x10-lang.org/x10-community/applications/global-
 matrix-library.html

 We are exploring how to make it available within Spark. Any ideas would
 be much appreciated.

 On 2/27/15 7:01 AM, shahab wrote:

 Hi,

 I just wonder if there is any Sparse Matrix implementation available
 in Spark, so it can be used in spark application?

 best,
 /Shahab



 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Some questions after playing a little with the new ml.Pipeline.

2015-02-28 Thread Joseph Bradley
Hi Jao,

You can use external tools and libraries if they can be called from your
Spark program or script (with appropriate conversion of data types, etc.).
The best way to apply a pre-trained model to a dataset would be to call the
model from within a closure, e.g.:

myRDD.map { myDatum = preTrainedModel.predict(myDatum) }

If your data is distributed in an RDD (myRDD), then the above call will
distribute the computation of prediction using the pre-trained model.  It
will require that all of your Spark workers be able to run the
preTrainedModel; that may mean installing Caffe and dependencies on all
nodes in the compute cluster.

For the second question, I would modify the above call as follows:

myRDD.mapPartitions { myDataOnPartition =
  val myModel = // instantiate neural network on this partition
  myDataOnPartition.map { myDatum = myModel.predict(myDatum) }
}

I hope this helps!
Joseph

On Fri, Feb 27, 2015 at 10:27 PM, Jaonary Rabarisoa jaon...@gmail.com
wrote:

 Dear all,


 We mainly do large scale computer vision task (image classification,
 retrieval, ...). The pipeline is really great stuff for that. We're trying
 to reproduce the tutorial given on that topic during the latest spark
 summit (
 http://ampcamp.berkeley.edu/5/exercises/image-classification-with-pipelines.html
  )
 using the master version of spark pipeline and dataframe. The tutorial
 shows different examples of feature extraction stages before running
 machine learning algorithms. Even the tutorial is straightforward to
 reproduce with this new API, we still have some questions :

- Can one use external tools (e.g via pipe) as a pipeline stage ? An
example of use case is to extract feature learned with convolutional neural
network. In our case, this corresponds to a pre-trained neural network with
Caffe library (http://caffe.berkeleyvision.org/) .


- The second question is about the performance of the pipeline.
Library such as Caffe processes the data in batch and instancing one Caffe
network can be time consuming when this network is very deep. So, we can
gain performance if we minimize the number of Caffe network creation and
give data in batch to the network. In the pipeline, this corresponds to run
transformers that work on a partition basis and give the whole partition to
a single caffe network. How can we create such a transformer ?



 Best,

 Jao



Re: bitten by spark.yarn.executor.memoryOverhead

2015-02-28 Thread Sean Owen
There was a recent discussion about whether to increase or indeed make
configurable this kind of default fraction. I believe the suggestion
there too was that 9-10% is a safer default.

Advanced users can lower the resulting overhead value; it may still
have to be increased in some cases, but a fatter default may make this
kind of surprise less frequent.

I'd support increasing the default; any other thoughts?

On Sat, Feb 28, 2015 at 3:34 PM, Koert Kuipers ko...@tresata.com wrote:
 hey,
 running my first map-red like (meaning disk-to-disk, avoiding in memory
 RDDs) computation in spark on yarn i immediately got bitten by a too low
 spark.yarn.executor.memoryOverhead. however it took me about an hour to find
 out this was the cause. at first i observed failing shuffles leading to
 restarting of tasks, then i realized this was because executors could not be
 reached, then i noticed in containers got shut down and reallocated in
 resourcemanager logs (no mention of errors, it seemed the containers
 finished their business and shut down successfully), and finally i found the
 reason in nodemanager logs.

 i dont think this is a pleasent first experience. i realize
 spark.yarn.executor.memoryOverhead needs to be set differently from
 situation to situation. but shouldnt the default be a somewhat higher value
 so that these errors are unlikely, and then the experts that are willing to
 deal with these errors can tune it lower? so why not make the default 10%
 instead of 7%? that gives something that works in most situations out of the
 box (at the cost of being a little wasteful). it worked for me.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: bitten by spark.yarn.executor.memoryOverhead

2015-02-28 Thread Ted Yu
Having good out-of-box experience is desirable.

+1 on increasing the default.


On Sat, Feb 28, 2015 at 8:27 AM, Sean Owen so...@cloudera.com wrote:

 There was a recent discussion about whether to increase or indeed make
 configurable this kind of default fraction. I believe the suggestion
 there too was that 9-10% is a safer default.

 Advanced users can lower the resulting overhead value; it may still
 have to be increased in some cases, but a fatter default may make this
 kind of surprise less frequent.

 I'd support increasing the default; any other thoughts?

 On Sat, Feb 28, 2015 at 3:34 PM, Koert Kuipers ko...@tresata.com wrote:
  hey,
  running my first map-red like (meaning disk-to-disk, avoiding in memory
  RDDs) computation in spark on yarn i immediately got bitten by a too low
  spark.yarn.executor.memoryOverhead. however it took me about an hour to
 find
  out this was the cause. at first i observed failing shuffles leading to
  restarting of tasks, then i realized this was because executors could
 not be
  reached, then i noticed in containers got shut down and reallocated in
  resourcemanager logs (no mention of errors, it seemed the containers
  finished their business and shut down successfully), and finally i found
 the
  reason in nodemanager logs.
 
  i dont think this is a pleasent first experience. i realize
  spark.yarn.executor.memoryOverhead needs to be set differently from
  situation to situation. but shouldnt the default be a somewhat higher
 value
  so that these errors are unlikely, and then the experts that are willing
 to
  deal with these errors can tune it lower? so why not make the default 10%
  instead of 7%? that gives something that works in most situations out of
 the
  box (at the cost of being a little wasteful). it worked for me.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Problem getting program to run on 15TB input

2015-02-28 Thread Aaron Davidson
All stated symptoms are consistent with GC pressure (other nodes timeout
trying to connect because of a long stop-the-world), quite possibly due to
groupByKey. groupByKey is a very expensive operation as it may bring all
the data for a particular partition into memory (in particular, it cannot
spill values for a single key, so if you have a single very skewed key you
can get behavior like this).

On Sat, Feb 28, 2015 at 11:33 AM, Paweł Szulc paul.sz...@gmail.com wrote:

 But groupbykey will repartition according to numer of keys as I understand
 how it works. How do you know that you haven't reached the groupbykey
 phase? Are you using a profiler or do yoi base that assumption only on logs?

 sob., 28 lut 2015, 8:12 PM Arun Luthra użytkownik arun.lut...@gmail.com
 napisał:

 A correction to my first post:

 There is also a repartition right before groupByKey to help avoid
 too-many-open-files error:


 rdd2.union(rdd1).map(...).filter(...).repartition(15000).groupByKey().map(...).flatMap(...).saveAsTextFile()

 On Sat, Feb 28, 2015 at 11:10 AM, Arun Luthra arun.lut...@gmail.com
 wrote:

 The job fails before getting to groupByKey.

 I see a lot of timeout errors in the yarn logs, like:

 15/02/28 12:47:16 WARN util.AkkaUtils: Error sending message in 1
 attempts
 akka.pattern.AskTimeoutException: Timed out

 and

 15/02/28 12:47:49 WARN util.AkkaUtils: Error sending message in 2
 attempts
 java.util.concurrent.TimeoutException: Futures timed out after [30
 seconds]

 and some of these are followed by:

 15/02/28 12:48:02 ERROR executor.CoarseGrainedExecutorBackend: Driver
 Disassociated [akka.tcp://sparkExecutor@...] - [akka.tcp://sparkDriver@...]
 disassociated! Shutting down.
 15/02/28 12:48:02 ERROR executor.Executor: Exception in task 421027.0 in
 stage 1.0 (TID 336601)
 java.io.FileNotFoundException:
 /hadoop/yarn/local//spark-local-20150228123450-3a71/36/shuffle_0_421027_0
 (No such file or directory)




 On Sat, Feb 28, 2015 at 9:33 AM, Paweł Szulc paul.sz...@gmail.com
 wrote:

 I would first check whether  there is any possibility that after doing
 groupbykey one of the groups does not fit in one of the executors' memory.

 To back up my theory, instead of doing groupbykey + map try reducebykey
 + mapvalues.

 Let me know if that helped.

 Pawel Szulc
 http://rabbitonweb.com

 sob., 28 lut 2015, 6:22 PM Arun Luthra użytkownik 
 arun.lut...@gmail.com napisał:

 So, actually I am removing the persist for now, because there is
 significant filtering that happens after calling textFile()... but I will
 keep that option in mind.

 I just tried a few different combinations of number of executors,
 executor memory, and more importantly, number of tasks... *all three
 times it failed when approximately 75.1% of the tasks were completed (no
 matter how many tasks resulted from repartitioning the data in
 textfile(..., N))*. Surely this is a strong clue to something?



 On Fri, Feb 27, 2015 at 1:07 PM, Burak Yavuz brk...@gmail.com wrote:

 Hi,

 Not sure if it can help, but `StorageLevel.MEMORY_AND_DISK_SER`
 generates many small objects that lead to very long GC time, causing the
 executor losts, heartbeat not received, and GC overhead limit exceeded
 messages.
 Could you try using `StorageLevel.MEMORY_AND_DISK` instead? You can
 also try `OFF_HEAP` (and use Tachyon).

 Burak

 On Fri, Feb 27, 2015 at 11:39 AM, Arun Luthra arun.lut...@gmail.com
 wrote:

 My program in pseudocode looks like this:

 val conf = new SparkConf().setAppName(Test)
   .set(spark.storage.memoryFraction,0.2) // default 0.6
   .set(spark.shuffle.memoryFraction,0.12) // default 0.2
   .set(spark.shuffle.manager,SORT) // preferred setting for
 optimized joins
   .set(spark.shuffle.consolidateFiles,true) // helpful for
 too many files open
   .set(spark.mesos.coarse, true) // helpful for
 MapOutputTracker errors?
   .set(spark.akka.frameSize,500) // helpful when using
 consildateFiles=true
   .set(spark.akka.askTimeout, 30)
   .set(spark.shuffle.compress,false) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
   .set(spark.file.transferTo,false) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
   .set(spark.core.connection.ack.wait.timeout,600) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
   .set(spark.speculation,true)
   .set(spark.worker.timeout,600) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html
   .set(spark.akka.timeout,300) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html
   .set(spark.storage.blockManagerSlaveTimeoutMs,12)
   .set(spark.driver.maxResultSize,2048) // in response to
 error: Total size of serialized results of 39901 tasks (1024.0 MB) is
 bigger than spark.driver.maxResultSize (1024.0 MB)
   .set(spark.serializer,
 

Re: Scheduler hang?

2015-02-28 Thread Victor Tso-Guillen
Moving user to bcc.

What I found was that the TaskSetManager for my task set that had 5 tasks
had preferred locations set for 4 of the 5. Three had localhost/driver
and had completed. The one that had nothing had also completed. The last
one was set by our code to be my IP address. Local mode can hang on this
because of https://issues.apache.org/jira/browse/SPARK-4939 addressed by
https://github.com/apache/spark/pull/4147, which is obviously not an
optimal solution but since it's only local mode, it's very good enough. I'm
not going to wait for those seconds to tick by to complete the task, so
I'll fix the IP address reporting side for local mode in my code.

On Thu, Feb 26, 2015 at 8:32 PM, Victor Tso-Guillen v...@paxata.com wrote:

 Of course, breakpointing on every status update and revive offers
 invocation kept the problem from happening. Where could the race be?

 On Thu, Feb 26, 2015 at 7:55 PM, Victor Tso-Guillen v...@paxata.com
 wrote:

 Love to hear some input on this. I did get a standalone cluster up on my
 local machine and the problem didn't present itself. I'm pretty confident
 that means the problem is in the LocalBackend or something near it.

 On Thu, Feb 26, 2015 at 1:37 PM, Victor Tso-Guillen v...@paxata.com
 wrote:

 Okay I confirmed my suspicions of a hang. I made a request that stopped
 progressing, though the already-scheduled tasks had finished. I made a
 separate request that was small enough not to hang, and it kicked the hung
 job enough to finish. I think what's happening is that the scheduler or the
 local backend is not kicking the revive offers messaging at the right time,
 but I have to dig into the code some more to nail the culprit. Anyone on
 these list have experience in those code areas that could help?

 On Thu, Feb 26, 2015 at 2:27 AM, Victor Tso-Guillen v...@paxata.com
 wrote:

 Thanks for the link. Unfortunately, I turned on rdd compression and
 nothing changed. I tried moving netty - nio and no change :(

 On Thu, Feb 26, 2015 at 2:01 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Not many that i know of, but i bumped into this one
 https://issues.apache.org/jira/browse/SPARK-4516

 Thanks
 Best Regards

 On Thu, Feb 26, 2015 at 3:26 PM, Victor Tso-Guillen v...@paxata.com
 wrote:

 Is there any potential problem from 1.1.1 to 1.2.1 with shuffle
 dependencies that produce no data?

 On Thu, Feb 26, 2015 at 1:56 AM, Victor Tso-Guillen v...@paxata.com
 wrote:

 The data is small. The job is composed of many small stages.

 * I found that with fewer than 222 the problem exhibits. What will
 be gained by going higher?
 * Pushing up the parallelism only pushes up the boundary at which
 the system appears to hang. I'm worried about some sort of message loss 
 or
 inconsistency.
 * Yes, we are using Kryo.
 * I'll try that, but I'm again a little confused why you're
 recommending this. I'm stumped so might as well?

 On Wed, Feb 25, 2015 at 11:13 PM, Akhil Das 
 ak...@sigmoidanalytics.com wrote:

 What operation are you trying to do and how big is the data that
 you are operating on?

 Here's a few things which you can try:

 - Repartition the RDD to a higher number than 222
 - Specify the master as local[*] or local[10]
 - Use Kryo Serializer (.set(spark.serializer,
 org.apache.spark.serializer.KryoSerializer))
 - Enable RDD Compression (.set(spark.rdd.compress,true) )


 Thanks
 Best Regards

 On Thu, Feb 26, 2015 at 10:15 AM, Victor Tso-Guillen 
 v...@paxata.com wrote:

 I'm getting this really reliably on Spark 1.2.1. Basically I'm in
 local mode with parallelism at 8. I have 222 tasks and I never seem 
 to get
 far past 40. Usually in the 20s to 30s it will just hang. The last 
 logging
 is below, and a screenshot of the UI.

 2015-02-25 20:39:55.779 GMT-0800 INFO  [task-result-getter-3]
 TaskSetManager - Finished task 3.0 in stage 16.0 (TID 22) in 612 ms on
 localhost (1/5)
 2015-02-25 20:39:55.825 GMT-0800 INFO  [Executor task launch
 worker-10] Executor - Finished task 1.0 in stage 16.0 (TID 20). 2492 
 bytes
 result sent to driver
 2015-02-25 20:39:55.825 GMT-0800 INFO  [Executor task launch
 worker-8] Executor - Finished task 2.0 in stage 16.0 (TID 21). 2492 
 bytes
 result sent to driver
 2015-02-25 20:39:55.831 GMT-0800 INFO  [task-result-getter-0]
 TaskSetManager - Finished task 1.0 in stage 16.0 (TID 20) in 670 ms on
 localhost (2/5)
 2015-02-25 20:39:55.836 GMT-0800 INFO  [task-result-getter-1]
 TaskSetManager - Finished task 2.0 in stage 16.0 (TID 21) in 674 ms on
 localhost (3/5)
 2015-02-25 20:39:55.891 GMT-0800 INFO  [Executor task launch
 worker-9] Executor - Finished task 0.0 in stage 16.0 (TID 19). 2492 
 bytes
 result sent to driver
 2015-02-25 20:39:55.896 GMT-0800 INFO  [task-result-getter-2]
 TaskSetManager - Finished task 0.0 in stage 16.0 (TID 19) in 740 ms on
 localhost (4/5)

 [image: Inline image 1]
 What should I make of this? Where do I start?

 Thanks,
 Victor












Re: Failed to parse Hive query

2015-02-28 Thread Anusha Shamanur
Hi,
I reconfigured everything. Still facing the same issue.
Can someone please help?

On Friday, February 27, 2015, Anusha Shamanur anushas...@gmail.com wrote:

 I do.
 What tags should I change in this?
 I changed the value of hive.exec.scratchdir to /tmp/hive.
 What else?

 On Fri, Feb 27, 2015 at 2:14 PM, Michael Armbrust mich...@databricks.com
 javascript:_e(%7B%7D,'cvml','mich...@databricks.com'); wrote:

 Do you have a hive-site.xml file or a core-site.xml file?  Perhaps
 something is misconfigured there?

 On Fri, Feb 27, 2015 at 7:17 AM, Anusha Shamanur anushas...@gmail.com
 javascript:_e(%7B%7D,'cvml','anushas...@gmail.com'); wrote:

 Hi,

 I am trying to do this in spark-shell:

 val hiveCtx = neworg.apache.spark.sql.hive.HiveContext(sc) val listTables 
 =hiveCtx.hql(show tables)

 The second line fails to execute with this message:

 warning: there were 1 deprecation warning(s); re-run with -deprecation
 for details org.apache.spark.sql.hive.HiveQl$ParseException: Failed to
 parse: show tables at
 org.apache.spark.sql.hive.HiveQl$.createPlan(HiveQl.scala:239) at
 org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:50)
 at
 org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:49)
 at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136) at
 scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135) at
 scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)

 ... at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
 Caused by: java.lang.NullPointerException: Conf non-local session path
 expected to be non-null at
 com.google.common.base.Preconditions.checkNotNull(Preconditions.java:204)
 at
 org.apache.hadoop.hive.ql.session.SessionState.getHDFSSessionPath(SessionState.java:586)
 at org.apache.hadoop.hive.ql.Context.(Context.java:129) at
 org.apache.hadoop.hive.ql.Context.(Context.java:116) at
 org.apache.spark.sql.hive.HiveQl$.getAst(HiveQl.scala:227) at
 org.apache.spark.sql.hive.HiveQl$.createPlan(HiveQl.scala:240) ... 87 more


 Any help would be appreciated.



 --
 Sent from Gmail mobile





 --
 Regards,
 Anusha



-- 
Sent from Gmail mobile


Scalable JDBCRDD

2015-02-28 Thread Michal Klos
Hi Spark community,

We have a use case where we need to pull huge amounts of data from a SQL
query against a database into Spark. We need to execute the query against
our huge database and not a substitute (SparkSQL, Hive, etc) because of a
couple of factors including custom functions used in the queries that only
our database has.

We started by looking at JDBC RDD, which utilizes a prepared statement with
two parameters that are meant to be used to partition the result set to the
workers... e.g.:

select * from table limit ?,?

turns into

select * from table limit 1,100 on worker 1
select * from table limit 101,200 on worker 2

This will not work for us because our database cannot support multiple
execution of these queries without being crippled. But, additionally, our
database doesn't support the above LIMIT syntax and we don't have a generic
way of partitioning the various queries.

As a result -- we stated by forking JDBCRDD and made a version that
executes the SQL query once in getPartitions into a Vector and then hands
each worker node an index and iterator. Here's a snippet of getPartitions
and compute:

  override def getPartitions: Array[Partition] = {
//Compute the DB query once here
val results = computeQuery

(0 until numPartitions).map(i = {
  // TODO: would be better to do this partitioning when scrolling
through result set if still loading into memory
  val partitionItems = results.drop(i).sliding(1,
numPartitions).flatten.toVector
  new DBPartition(i, partitionItems)
}).toArray
  }

  override def compute(thePart: Partition, context: TaskContext) = new
NextIterator[T] {
val part = thePart.asInstanceOf[DBPartition[T]]

//Shift the result vector to our index number and then do a
sliding iterator over it
val iterator = part.items.iterator

override def getNext : T = {
  if (iterator.hasNext) {
iterator.next()
  } else {
finished = true
null.asInstanceOf[T]
  }
}

override def close: Unit = ()
  }

This is a little better since we can just execute the query once.
However, the result-set needs to fit in memory.

We've been trying to brainstorm a way to

A) have that result set distribute out to the worker RDD partitions as
it's streaming in from the cursor?
B) have the result set spill to disk if it exceeds memory and do
something clever around the iterators?
C) something else?

We're not familiar enough yet with all of the workings of Spark to
know how to proceed on this.

We also thought of the worker-around of having the DB query dump to
HDFS/S3 and then pick it up for there, but it adds more moving parts
and latency to our processing.

Does anyone have a clever suggestion? Are we missing something?

thanks,
Michal


Re: Problem getting program to run on 15TB input

2015-02-28 Thread Paweł Szulc
I would first check whether  there is any possibility that after doing
groupbykey one of the groups does not fit in one of the executors' memory.

To back up my theory, instead of doing groupbykey + map try reducebykey +
mapvalues.

Let me know if that helped.

Pawel Szulc
http://rabbitonweb.com

sob., 28 lut 2015, 6:22 PM Arun Luthra użytkownik arun.lut...@gmail.com
napisał:

 So, actually I am removing the persist for now, because there is
 significant filtering that happens after calling textFile()... but I will
 keep that option in mind.

 I just tried a few different combinations of number of executors, executor
 memory, and more importantly, number of tasks... *all three times it
 failed when approximately 75.1% of the tasks were completed (no matter how
 many tasks resulted from repartitioning the data in textfile(..., N))*.
 Surely this is a strong clue to something?



 On Fri, Feb 27, 2015 at 1:07 PM, Burak Yavuz brk...@gmail.com wrote:

 Hi,

 Not sure if it can help, but `StorageLevel.MEMORY_AND_DISK_SER`
 generates many small objects that lead to very long GC time, causing the
 executor losts, heartbeat not received, and GC overhead limit exceeded
 messages.
 Could you try using `StorageLevel.MEMORY_AND_DISK` instead? You can also
 try `OFF_HEAP` (and use Tachyon).

 Burak

 On Fri, Feb 27, 2015 at 11:39 AM, Arun Luthra arun.lut...@gmail.com
 wrote:

 My program in pseudocode looks like this:

 val conf = new SparkConf().setAppName(Test)
   .set(spark.storage.memoryFraction,0.2) // default 0.6
   .set(spark.shuffle.memoryFraction,0.12) // default 0.2
   .set(spark.shuffle.manager,SORT) // preferred setting for
 optimized joins
   .set(spark.shuffle.consolidateFiles,true) // helpful for too
 many files open
   .set(spark.mesos.coarse, true) // helpful for MapOutputTracker
 errors?
   .set(spark.akka.frameSize,500) // helpful when using
 consildateFiles=true
   .set(spark.akka.askTimeout, 30)
   .set(spark.shuffle.compress,false) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
   .set(spark.file.transferTo,false) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
   .set(spark.core.connection.ack.wait.timeout,600) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
   .set(spark.speculation,true)
   .set(spark.worker.timeout,600) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html
   .set(spark.akka.timeout,300) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html
   .set(spark.storage.blockManagerSlaveTimeoutMs,12)
   .set(spark.driver.maxResultSize,2048) // in response to error:
 Total size of serialized results of 39901 tasks (1024.0 MB) is bigger than
 spark.driver.maxResultSize (1024.0 MB)
   .set(spark.serializer,
 org.apache.spark.serializer.KryoSerializer)

 .set(spark.kryo.registrator,com.att.bdcoe.cip.ooh.MyRegistrator)
   .set(spark.kryo.registrationRequired, true)

 val rdd1 = 
 sc.textFile(file1).persist(StorageLevel.MEMORY_AND_DISK_SER).map(_.split(\\|,
 -1)...filter(...)

 val rdd2 =
 sc.textFile(file2).persist(StorageLevel.MEMORY_AND_DISK_SER).map(_.split(\\|,
 -1)...filter(...)


 rdd2.union(rdd1).map(...).filter(...).groupByKey().map(...).flatMap(...).saveAsTextFile()


 I run the code with:
   --num-executors 500 \
   --driver-memory 20g \
   --executor-memory 20g \
   --executor-cores 32 \


 I'm using kryo serialization on everything, including broadcast
 variables.

 Spark creates 145k tasks, and the first stage includes everything before
 groupByKey(). It fails before getting to groupByKey. I have tried doubling
 and tripling the number of partitions when calling textFile, with no
 success.

 Very similar code (trivial changes, to accomodate different input)
 worked on a smaller input (~8TB)... Not that it was easy to get that
 working.



 Errors vary, here is what I am getting right now:

 ERROR SendingConnection: Exception while reading SendingConnection
 ... java.nio.channels.ClosedChannelException
 (^ guessing that is symptom of something else)

 WARN BlockManagerMasterActor: Removing BlockManager
 BlockManagerId(...) with no recent heart beats: 120030ms exceeds 12ms
 (^ guessing that is symptom of something else)

 ERROR ActorSystemImpl: Uncaught fatal error from thread (...) shutting
 down ActorSystem [sparkDriver]
 *java.lang.OutOfMemoryError: GC overhead limit exceeded*



 Other times I will get messages about executor lost... about 1 message
 per second, after ~~50k tasks complete, until there are almost no executors
 left and progress slows to nothing.

 I ran with verbose GC info; I do see failing yarn containers that have
 multiple (like 30) Full GC messages but I don't know how to interpret if
 that is the problem. Typical Full GC time taken seems ok: [Times:
 user=23.30 sys=0.06, real=1.94 secs]




Re: Problem getting program to run on 15TB input

2015-02-28 Thread Paweł Szulc
But groupbykey will repartition according to numer of keys as I understand
how it works. How do you know that you haven't reached the groupbykey
phase? Are you using a profiler or do yoi base that assumption only on logs?

sob., 28 lut 2015, 8:12 PM Arun Luthra użytkownik arun.lut...@gmail.com
napisał:

 A correction to my first post:

 There is also a repartition right before groupByKey to help avoid
 too-many-open-files error:


 rdd2.union(rdd1).map(...).filter(...).repartition(15000).groupByKey().map(...).flatMap(...).saveAsTextFile()

 On Sat, Feb 28, 2015 at 11:10 AM, Arun Luthra arun.lut...@gmail.com
 wrote:

 The job fails before getting to groupByKey.

 I see a lot of timeout errors in the yarn logs, like:

 15/02/28 12:47:16 WARN util.AkkaUtils: Error sending message in 1 attempts
 akka.pattern.AskTimeoutException: Timed out

 and

 15/02/28 12:47:49 WARN util.AkkaUtils: Error sending message in 2 attempts
 java.util.concurrent.TimeoutException: Futures timed out after [30
 seconds]

 and some of these are followed by:

 15/02/28 12:48:02 ERROR executor.CoarseGrainedExecutorBackend: Driver
 Disassociated [akka.tcp://sparkExecutor@...] - [akka.tcp://sparkDriver@...]
 disassociated! Shutting down.
 15/02/28 12:48:02 ERROR executor.Executor: Exception in task 421027.0 in
 stage 1.0 (TID 336601)
 java.io.FileNotFoundException:
 /hadoop/yarn/local//spark-local-20150228123450-3a71/36/shuffle_0_421027_0
 (No such file or directory)




 On Sat, Feb 28, 2015 at 9:33 AM, Paweł Szulc paul.sz...@gmail.com
 wrote:

 I would first check whether  there is any possibility that after doing
 groupbykey one of the groups does not fit in one of the executors' memory.

 To back up my theory, instead of doing groupbykey + map try reducebykey
 + mapvalues.

 Let me know if that helped.

 Pawel Szulc
 http://rabbitonweb.com

 sob., 28 lut 2015, 6:22 PM Arun Luthra użytkownik arun.lut...@gmail.com
 napisał:

 So, actually I am removing the persist for now, because there is
 significant filtering that happens after calling textFile()... but I will
 keep that option in mind.

 I just tried a few different combinations of number of executors,
 executor memory, and more importantly, number of tasks... *all three
 times it failed when approximately 75.1% of the tasks were completed (no
 matter how many tasks resulted from repartitioning the data in
 textfile(..., N))*. Surely this is a strong clue to something?



 On Fri, Feb 27, 2015 at 1:07 PM, Burak Yavuz brk...@gmail.com wrote:

 Hi,

 Not sure if it can help, but `StorageLevel.MEMORY_AND_DISK_SER`
 generates many small objects that lead to very long GC time, causing the
 executor losts, heartbeat not received, and GC overhead limit exceeded
 messages.
 Could you try using `StorageLevel.MEMORY_AND_DISK` instead? You can
 also try `OFF_HEAP` (and use Tachyon).

 Burak

 On Fri, Feb 27, 2015 at 11:39 AM, Arun Luthra arun.lut...@gmail.com
 wrote:

 My program in pseudocode looks like this:

 val conf = new SparkConf().setAppName(Test)
   .set(spark.storage.memoryFraction,0.2) // default 0.6
   .set(spark.shuffle.memoryFraction,0.12) // default 0.2
   .set(spark.shuffle.manager,SORT) // preferred setting for
 optimized joins
   .set(spark.shuffle.consolidateFiles,true) // helpful for
 too many files open
   .set(spark.mesos.coarse, true) // helpful for
 MapOutputTracker errors?
   .set(spark.akka.frameSize,500) // helpful when using
 consildateFiles=true
   .set(spark.akka.askTimeout, 30)
   .set(spark.shuffle.compress,false) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
   .set(spark.file.transferTo,false) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
   .set(spark.core.connection.ack.wait.timeout,600) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
   .set(spark.speculation,true)
   .set(spark.worker.timeout,600) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html
   .set(spark.akka.timeout,300) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html
   .set(spark.storage.blockManagerSlaveTimeoutMs,12)
   .set(spark.driver.maxResultSize,2048) // in response to
 error: Total size of serialized results of 39901 tasks (1024.0 MB) is
 bigger than spark.driver.maxResultSize (1024.0 MB)
   .set(spark.serializer,
 org.apache.spark.serializer.KryoSerializer)

 .set(spark.kryo.registrator,com.att.bdcoe.cip.ooh.MyRegistrator)
   .set(spark.kryo.registrationRequired, true)

 val rdd1 = sc.textFile(file1).persist(StorageLevel
 .MEMORY_AND_DISK_SER).map(_.split(\\|, -1)...filter(...)

 val rdd2 =
 sc.textFile(file2).persist(StorageLevel.MEMORY_AND_DISK_SER).map(_.split(\\|,
 -1)...filter(...)


 rdd2.union(rdd1).map(...).filter(...).groupByKey().map(...).flatMap(...).saveAsTextFile()


 I run the code with:
   

Re: Problem getting program to run on 15TB input

2015-02-28 Thread Arun Luthra
So, actually I am removing the persist for now, because there is
significant filtering that happens after calling textFile()... but I will
keep that option in mind.

I just tried a few different combinations of number of executors, executor
memory, and more importantly, number of tasks... *all three times it failed
when approximately 75.1% of the tasks were completed (no matter how many
tasks resulted from repartitioning the data in textfile(..., N))*. Surely
this is a strong clue to something?



On Fri, Feb 27, 2015 at 1:07 PM, Burak Yavuz brk...@gmail.com wrote:

 Hi,

 Not sure if it can help, but `StorageLevel.MEMORY_AND_DISK_SER` generates
 many small objects that lead to very long GC time, causing the executor
 losts, heartbeat not received, and GC overhead limit exceeded messages.
 Could you try using `StorageLevel.MEMORY_AND_DISK` instead? You can also
 try `OFF_HEAP` (and use Tachyon).

 Burak

 On Fri, Feb 27, 2015 at 11:39 AM, Arun Luthra arun.lut...@gmail.com
 wrote:

 My program in pseudocode looks like this:

 val conf = new SparkConf().setAppName(Test)
   .set(spark.storage.memoryFraction,0.2) // default 0.6
   .set(spark.shuffle.memoryFraction,0.12) // default 0.2
   .set(spark.shuffle.manager,SORT) // preferred setting for
 optimized joins
   .set(spark.shuffle.consolidateFiles,true) // helpful for too
 many files open
   .set(spark.mesos.coarse, true) // helpful for MapOutputTracker
 errors?
   .set(spark.akka.frameSize,500) // helpful when using
 consildateFiles=true
   .set(spark.akka.askTimeout, 30)
   .set(spark.shuffle.compress,false) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
   .set(spark.file.transferTo,false) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
   .set(spark.core.connection.ack.wait.timeout,600) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
   .set(spark.speculation,true)
   .set(spark.worker.timeout,600) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html
   .set(spark.akka.timeout,300) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html
   .set(spark.storage.blockManagerSlaveTimeoutMs,12)
   .set(spark.driver.maxResultSize,2048) // in response to error:
 Total size of serialized results of 39901 tasks (1024.0 MB) is bigger than
 spark.driver.maxResultSize (1024.0 MB)
   .set(spark.serializer,
 org.apache.spark.serializer.KryoSerializer)
   .set(spark.kryo.registrator,com.att.bdcoe.cip.ooh.MyRegistrator)
   .set(spark.kryo.registrationRequired, true)

 val rdd1 = 
 sc.textFile(file1).persist(StorageLevel.MEMORY_AND_DISK_SER).map(_.split(\\|,
 -1)...filter(...)

 val rdd2 =
 sc.textFile(file2).persist(StorageLevel.MEMORY_AND_DISK_SER).map(_.split(\\|,
 -1)...filter(...)


 rdd2.union(rdd1).map(...).filter(...).groupByKey().map(...).flatMap(...).saveAsTextFile()


 I run the code with:
   --num-executors 500 \
   --driver-memory 20g \
   --executor-memory 20g \
   --executor-cores 32 \


 I'm using kryo serialization on everything, including broadcast
 variables.

 Spark creates 145k tasks, and the first stage includes everything before
 groupByKey(). It fails before getting to groupByKey. I have tried doubling
 and tripling the number of partitions when calling textFile, with no
 success.

 Very similar code (trivial changes, to accomodate different input) worked
 on a smaller input (~8TB)... Not that it was easy to get that working.



 Errors vary, here is what I am getting right now:

 ERROR SendingConnection: Exception while reading SendingConnection
 ... java.nio.channels.ClosedChannelException
 (^ guessing that is symptom of something else)

 WARN BlockManagerMasterActor: Removing BlockManager
 BlockManagerId(...) with no recent heart beats: 120030ms exceeds 12ms
 (^ guessing that is symptom of something else)

 ERROR ActorSystemImpl: Uncaught fatal error from thread (...) shutting
 down ActorSystem [sparkDriver]
 *java.lang.OutOfMemoryError: GC overhead limit exceeded*



 Other times I will get messages about executor lost... about 1 message
 per second, after ~~50k tasks complete, until there are almost no executors
 left and progress slows to nothing.

 I ran with verbose GC info; I do see failing yarn containers that have
 multiple (like 30) Full GC messages but I don't know how to interpret if
 that is the problem. Typical Full GC time taken seems ok: [Times:
 user=23.30 sys=0.06, real=1.94 secs]



 Suggestions, please?

 Huge thanks for useful suggestions,
 Arun





Re: Problem getting program to run on 15TB input

2015-02-28 Thread Arun Luthra
The job fails before getting to groupByKey.

I see a lot of timeout errors in the yarn logs, like:

15/02/28 12:47:16 WARN util.AkkaUtils: Error sending message in 1 attempts
akka.pattern.AskTimeoutException: Timed out

and

15/02/28 12:47:49 WARN util.AkkaUtils: Error sending message in 2 attempts
java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]

and some of these are followed by:

15/02/28 12:48:02 ERROR executor.CoarseGrainedExecutorBackend: Driver
Disassociated [akka.tcp://sparkExecutor@...] - [akka.tcp://sparkDriver@...]
disassociated! Shutting down.
15/02/28 12:48:02 ERROR executor.Executor: Exception in task 421027.0 in
stage 1.0 (TID 336601)
java.io.FileNotFoundException:
/hadoop/yarn/local//spark-local-20150228123450-3a71/36/shuffle_0_421027_0
(No such file or directory)




On Sat, Feb 28, 2015 at 9:33 AM, Paweł Szulc paul.sz...@gmail.com wrote:

 I would first check whether  there is any possibility that after doing
 groupbykey one of the groups does not fit in one of the executors' memory.

 To back up my theory, instead of doing groupbykey + map try reducebykey +
 mapvalues.

 Let me know if that helped.

 Pawel Szulc
 http://rabbitonweb.com

 sob., 28 lut 2015, 6:22 PM Arun Luthra użytkownik arun.lut...@gmail.com
 napisał:

 So, actually I am removing the persist for now, because there is
 significant filtering that happens after calling textFile()... but I will
 keep that option in mind.

 I just tried a few different combinations of number of executors,
 executor memory, and more importantly, number of tasks... *all three
 times it failed when approximately 75.1% of the tasks were completed (no
 matter how many tasks resulted from repartitioning the data in
 textfile(..., N))*. Surely this is a strong clue to something?



 On Fri, Feb 27, 2015 at 1:07 PM, Burak Yavuz brk...@gmail.com wrote:

 Hi,

 Not sure if it can help, but `StorageLevel.MEMORY_AND_DISK_SER`
 generates many small objects that lead to very long GC time, causing the
 executor losts, heartbeat not received, and GC overhead limit exceeded
 messages.
 Could you try using `StorageLevel.MEMORY_AND_DISK` instead? You can
 also try `OFF_HEAP` (and use Tachyon).

 Burak

 On Fri, Feb 27, 2015 at 11:39 AM, Arun Luthra arun.lut...@gmail.com
 wrote:

 My program in pseudocode looks like this:

 val conf = new SparkConf().setAppName(Test)
   .set(spark.storage.memoryFraction,0.2) // default 0.6
   .set(spark.shuffle.memoryFraction,0.12) // default 0.2
   .set(spark.shuffle.manager,SORT) // preferred setting for
 optimized joins
   .set(spark.shuffle.consolidateFiles,true) // helpful for too
 many files open
   .set(spark.mesos.coarse, true) // helpful for
 MapOutputTracker errors?
   .set(spark.akka.frameSize,500) // helpful when using
 consildateFiles=true
   .set(spark.akka.askTimeout, 30)
   .set(spark.shuffle.compress,false) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
   .set(spark.file.transferTo,false) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
   .set(spark.core.connection.ack.wait.timeout,600) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
   .set(spark.speculation,true)
   .set(spark.worker.timeout,600) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html
   .set(spark.akka.timeout,300) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Heartbeat-exceeds-td3798.html
   .set(spark.storage.blockManagerSlaveTimeoutMs,12)
   .set(spark.driver.maxResultSize,2048) // in response to
 error: Total size of serialized results of 39901 tasks (1024.0 MB) is
 bigger than spark.driver.maxResultSize (1024.0 MB)
   .set(spark.serializer,
 org.apache.spark.serializer.KryoSerializer)

 .set(spark.kryo.registrator,com.att.bdcoe.cip.ooh.MyRegistrator)
   .set(spark.kryo.registrationRequired, true)

 val rdd1 = 
 sc.textFile(file1).persist(StorageLevel.MEMORY_AND_DISK_SER).map(_.split(\\|,
 -1)...filter(...)

 val rdd2 =
 sc.textFile(file2).persist(StorageLevel.MEMORY_AND_DISK_SER).map(_.split(\\|,
 -1)...filter(...)


 rdd2.union(rdd1).map(...).filter(...).groupByKey().map(...).flatMap(...).saveAsTextFile()


 I run the code with:
   --num-executors 500 \
   --driver-memory 20g \
   --executor-memory 20g \
   --executor-cores 32 \


 I'm using kryo serialization on everything, including broadcast
 variables.

 Spark creates 145k tasks, and the first stage includes everything
 before groupByKey(). It fails before getting to groupByKey. I have tried
 doubling and tripling the number of partitions when calling textFile, with
 no success.

 Very similar code (trivial changes, to accomodate different input)
 worked on a smaller input (~8TB)... Not that it was easy to get that
 working.



 Errors vary, here is what I am getting right now:

 ERROR SendingConnection: 

getting this error while runing

2015-02-28 Thread shahid

conf =
SparkConf().setAppName(spark_calc3merged).setMaster(spark://ec2-54-145-68-13.compute-1.amazonaws.com:7077)
sc =
SparkContext(conf=conf,pyFiles=[/root/platinum.py,/root/collections2.py])
  
15/02/28 19:06:38 WARN scheduler.TaskSetManager: Lost task 5.0 in stage 3.0
(TID 38, ip-10-80-15-145.ec2.internal):
com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 0,
required: 73065
com.esotericsoftware.kryo.io.Output.require(Output.java:138)
com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220)
com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:206)
   
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:29)
   
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:18)
com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549)
   
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:312)
   
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)
com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
   
org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:156)
   
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
   
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
   
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)
15/02/28 19:06:38 INFO scheduler.TaskSetManager: Starting task 5.1 in stage
3.0 (TID 44, ip-10-80-15-145.ec2.internal, NODE_LOCAL, 1502 bytes)
15/02/28 19:06:38 INFO scheduler.TaskSetManager: Finished task 8.0 in stage
3.0 (TID 41) in 7040 ms on ip-10-80-98-118.ec2.internal (9/11)
15/02/28 19:06:38 INFO scheduler.TaskSetManager: Finished task 9.0 in stage
3.0 (TID 42) in 7847 ms on ip-10-80-15-145.ec2.internal (10/11)
15/02/28 19:06:50 WARN scheduler.TaskSetManager: Lost task 5.1 in stage 3.0
(TID 44, ip-10-80-15-145.ec2.internal):
com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 0,
required: 73065
com.esotericsoftware.kryo.io.Output.require(Output.java:138)
com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220)
com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:206)
   
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:29)
   
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:18)
com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549)
   
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:312)
   
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)
com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
   
org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:156)
   
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
   
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
   
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)
15/02/28 19:06:50 INFO scheduler.TaskSetManager: Starting task 5.2 in stage
3.0 (TID 45, ip-10-80-98-118.ec2.internal, NODE_LOCAL, 1502 bytes)
15/02/28 19:07:01 WARN scheduler.TaskSetManager: Lost task 5.2 in stage 3.0
(TID 45, ip-10-80-98-118.ec2.internal):
com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 0,
required: 73065
com.esotericsoftware.kryo.io.Output.require(Output.java:138)
com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220)
com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:206)
   
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:29)
   
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:18)
com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549)
   
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:312)
   
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)
com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
   
org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:156)
   
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
   

Re: Problem getting program to run on 15TB input

2015-02-28 Thread Arun Luthra
The Spark UI names the line number and name of the operation (repartition
in this case) that it is performing. Only if this information is wrong
(just a possibility), could it have started groupByKey already.

I will try to analyze the amount of skew in the data by using reduceByKey
(or simply countByKey) which is relatively inexpensive. For the purposes of
this algorithm I can simply log and remove keys with huge counts, before
doing groupByKey.

On Sat, Feb 28, 2015 at 11:38 AM, Aaron Davidson ilike...@gmail.com wrote:

 All stated symptoms are consistent with GC pressure (other nodes timeout
 trying to connect because of a long stop-the-world), quite possibly due to
 groupByKey. groupByKey is a very expensive operation as it may bring all
 the data for a particular partition into memory (in particular, it cannot
 spill values for a single key, so if you have a single very skewed key you
 can get behavior like this).

 On Sat, Feb 28, 2015 at 11:33 AM, Paweł Szulc paul.sz...@gmail.com
 wrote:

 But groupbykey will repartition according to numer of keys as I
 understand how it works. How do you know that you haven't reached the
 groupbykey phase? Are you using a profiler or do yoi base that assumption
 only on logs?

 sob., 28 lut 2015, 8:12 PM Arun Luthra użytkownik arun.lut...@gmail.com
 napisał:

 A correction to my first post:

 There is also a repartition right before groupByKey to help avoid
 too-many-open-files error:


 rdd2.union(rdd1).map(...).filter(...).repartition(15000).groupByKey().map(...).flatMap(...).saveAsTextFile()

 On Sat, Feb 28, 2015 at 11:10 AM, Arun Luthra arun.lut...@gmail.com
 wrote:

 The job fails before getting to groupByKey.

 I see a lot of timeout errors in the yarn logs, like:

 15/02/28 12:47:16 WARN util.AkkaUtils: Error sending message in 1
 attempts
 akka.pattern.AskTimeoutException: Timed out

 and

 15/02/28 12:47:49 WARN util.AkkaUtils: Error sending message in 2
 attempts
 java.util.concurrent.TimeoutException: Futures timed out after [30
 seconds]

 and some of these are followed by:

 15/02/28 12:48:02 ERROR executor.CoarseGrainedExecutorBackend: Driver
 Disassociated [akka.tcp://sparkExecutor@...] -
 [akka.tcp://sparkDriver@...] disassociated! Shutting down.
 15/02/28 12:48:02 ERROR executor.Executor: Exception in task 421027.0
 in stage 1.0 (TID 336601)
 java.io.FileNotFoundException:
 /hadoop/yarn/local//spark-local-20150228123450-3a71/36/shuffle_0_421027_0
 (No such file or directory)




 On Sat, Feb 28, 2015 at 9:33 AM, Paweł Szulc paul.sz...@gmail.com
 wrote:

 I would first check whether  there is any possibility that after doing
 groupbykey one of the groups does not fit in one of the executors' memory.

 To back up my theory, instead of doing groupbykey + map try
 reducebykey + mapvalues.

 Let me know if that helped.

 Pawel Szulc
 http://rabbitonweb.com

 sob., 28 lut 2015, 6:22 PM Arun Luthra użytkownik 
 arun.lut...@gmail.com napisał:

 So, actually I am removing the persist for now, because there is
 significant filtering that happens after calling textFile()... but I will
 keep that option in mind.

 I just tried a few different combinations of number of executors,
 executor memory, and more importantly, number of tasks... *all three
 times it failed when approximately 75.1% of the tasks were completed (no
 matter how many tasks resulted from repartitioning the data in
 textfile(..., N))*. Surely this is a strong clue to something?



 On Fri, Feb 27, 2015 at 1:07 PM, Burak Yavuz brk...@gmail.com
 wrote:

 Hi,

 Not sure if it can help, but `StorageLevel.MEMORY_AND_DISK_SER`
 generates many small objects that lead to very long GC time, causing the
 executor losts, heartbeat not received, and GC overhead limit exceeded
 messages.
 Could you try using `StorageLevel.MEMORY_AND_DISK` instead? You can
 also try `OFF_HEAP` (and use Tachyon).

 Burak

 On Fri, Feb 27, 2015 at 11:39 AM, Arun Luthra arun.lut...@gmail.com
  wrote:

 My program in pseudocode looks like this:

 val conf = new SparkConf().setAppName(Test)
   .set(spark.storage.memoryFraction,0.2) // default 0.6
   .set(spark.shuffle.memoryFraction,0.12) // default 0.2
   .set(spark.shuffle.manager,SORT) // preferred setting for
 optimized joins
   .set(spark.shuffle.consolidateFiles,true) // helpful for
 too many files open
   .set(spark.mesos.coarse, true) // helpful for
 MapOutputTracker errors?
   .set(spark.akka.frameSize,500) // helpful when using
 consildateFiles=true
   .set(spark.akka.askTimeout, 30)
   .set(spark.shuffle.compress,false) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
   .set(spark.file.transferTo,false) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
   .set(spark.core.connection.ack.wait.timeout,600) //
 http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787p20811.html
   .set(spark.speculation,true)
  

Re: bitten by spark.yarn.executor.memoryOverhead

2015-02-28 Thread Corey Nolet
+1 to a better default as well.

We were working find until we ran against a real dataset which was much
larger than the test dataset we were using locally. It took me a couple
days and digging through many logs to figure out this value was what was
causing the problem.

On Sat, Feb 28, 2015 at 11:38 AM, Ted Yu yuzhih...@gmail.com wrote:

 Having good out-of-box experience is desirable.

 +1 on increasing the default.


 On Sat, Feb 28, 2015 at 8:27 AM, Sean Owen so...@cloudera.com wrote:

 There was a recent discussion about whether to increase or indeed make
 configurable this kind of default fraction. I believe the suggestion
 there too was that 9-10% is a safer default.

 Advanced users can lower the resulting overhead value; it may still
 have to be increased in some cases, but a fatter default may make this
 kind of surprise less frequent.

 I'd support increasing the default; any other thoughts?

 On Sat, Feb 28, 2015 at 3:34 PM, Koert Kuipers ko...@tresata.com wrote:
  hey,
  running my first map-red like (meaning disk-to-disk, avoiding in memory
  RDDs) computation in spark on yarn i immediately got bitten by a too low
  spark.yarn.executor.memoryOverhead. however it took me about an hour to
 find
  out this was the cause. at first i observed failing shuffles leading to
  restarting of tasks, then i realized this was because executors could
 not be
  reached, then i noticed in containers got shut down and reallocated in
  resourcemanager logs (no mention of errors, it seemed the containers
  finished their business and shut down successfully), and finally i
 found the
  reason in nodemanager logs.
 
  i dont think this is a pleasent first experience. i realize
  spark.yarn.executor.memoryOverhead needs to be set differently from
  situation to situation. but shouldnt the default be a somewhat higher
 value
  so that these errors are unlikely, and then the experts that are
 willing to
  deal with these errors can tune it lower? so why not make the default
 10%
  instead of 7%? that gives something that works in most situations out
 of the
  box (at the cost of being a little wasteful). it worked for me.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Unable to find org.apache.spark.sql.catalyst.ScalaReflection class

2015-02-28 Thread Ted Yu
Have you verified that spark-catalyst_2.10 jar was in the classpath ?

Cheers

On Sat, Feb 28, 2015 at 9:18 AM, Ashish Nigam ashnigamt...@gmail.com
wrote:

 Hi,
 I wrote a very simple program in scala to convert an existing RDD to
 SchemaRDD.
 But createSchemaRDD function is throwing exception

 Exception in thread main scala.ScalaReflectionException: class
 org.apache.spark.sql.catalyst.ScalaReflection in JavaMirror with primordial
 classloader with boot classpath [.] not found


 Here's more info on the versions I am using -

 scala.binary.version2.11/scala.binary.version
 spark.version1.2.1/spark.version
 scala.version2.11.5/scala.version

 Please let me know how can I resolve this problem.

 Thanks
 Ashish



Re: Missing shuffle files

2015-02-28 Thread Corey Nolet
Just wanted to point out- raising the memory-head (as I saw in the logs)
was the fix for this issue and I have not seen dying executors since this
calue was increased

On Tue, Feb 24, 2015 at 3:52 AM, Anders Arpteg arp...@spotify.com wrote:

 If you thinking of the yarn memory overhead, then yes, I have increased
 that as well. However, I'm glad to say that my job finished successfully
 finally. Besides the timeout and memory settings, performing repartitioning
 (with shuffling) at the right time seems to be the key to make this large
 job succeed. With all the transformations in the job, the partition
 distribution was becoming increasingly skewed. Not easy to figure out when
 and to what number of partitions to set, and takes forever to tweak these
 settings since it's works perfectly for small datasets and you'll have to
 experiment with large time-consuming jobs. Imagine if there was an
 automatic partition reconfiguration function that automagically did that...


 On Tue, Feb 24, 2015 at 3:20 AM, Corey Nolet cjno...@gmail.com wrote:

 I *think* this may have been related to the default memory overhead
 setting being too low. I raised the value to 1G it and tried my job again
 but i had to leave the office before it finished. It did get further but
 I'm not exactly sure if that's just because i raised the memory. I'll see
 tomorrow- but i have a suspicion this may have been the cause of the
 executors being killed by the application master.
 On Feb 23, 2015 5:25 PM, Corey Nolet cjno...@gmail.com wrote:

 I've got the opposite problem with regards to partitioning. I've got
 over 6000 partitions for some of these RDDs which immediately blows the
 heap somehow- I'm still not exactly sure how. If I coalesce them down to
 about 600-800 partitions, I get the problems where the executors are dying
 without any other error messages (other than telling me the executor was
 lost in the UI). If I don't coalesce, I pretty immediately get Java heap
 space exceptions that kill the job altogether.

 Putting in the timeouts didn't seem to help the case where I am
 coalescing. Also, I don't see any dfferences between 'disk only' and
 'memory and disk' storage levels- both of them are having the same
 problems. I notice large shuffle files (30-40gb) that only seem to spill a
 few hundred mb.

 On Mon, Feb 23, 2015 at 4:28 PM, Anders Arpteg arp...@spotify.com
 wrote:

 Sounds very similar to what I experienced Corey. Something that seems
 to at least help with my problems is to have more partitions. Am already
 fighting between ending up with too many partitions in the end and having
 too few in the beginning. By coalescing at late as possible and avoiding
 too few in the beginning, the problems seems to decrease. Also, increasing
 spark.akka.askTimeout and spark.core.connection.ack.wait.timeout
 significantly (~700 secs), the problems seems to almost disappear. Don't
 wont to celebrate yet, still long way left before the job complete but it's
 looking better...

 On Mon, Feb 23, 2015 at 9:54 PM, Corey Nolet cjno...@gmail.com wrote:

 I'm looking @ my yarn container logs for some of the executors which
 appear to be failing (with the missing shuffle files). I see exceptions
 that say client.TransportClientFactor: Found inactive connection to
 host/ip:port, closing it.

 Right after that I see shuffle.RetryingBlockFetcher: Exception while
 beginning fetch of 1 outstanding blocks. java.io.IOException: Failed to
 connect to host/ip:port

 Right after that exception I see RECEIVED SIGNAL 15: SIGTERM

 Finally, following the sigterm, I see FileNotFoundExcception:
 /hdfs/01/yarn/nm/usercache../spark-local-uuid/shuffle_5_09_0.data (No
 such file for directory)

 I'm looking @ the nodemanager and application master logs and I see no
 indications whatsoever that there were any memory issues during this 
 period
 of time. The Spark UI is telling me none of the executors are really using
 too much memory when this happens. It is a big job that's catching several
 100's of GB but each node manager on the cluster has 64gb of ram just for
 yarn containers (physical nodes have 128gb). On this cluster, we have 128
 nodes. I've also tried using DISK_ONLY storage level but to no avail.

 Any further ideas on how to track this down? Again, we're able to run
 this same job on about 1/5th of the data just fine.The only thing that's
 pointing me towards a memory issue is that it seems to be happening in the
 same stages each time and when I lower the memory that each executor has
 allocated it happens in earlier stages but I can't seem to find anything
 that says an executor (or container for that matter) has run low on 
 memory.



 On Mon, Feb 23, 2015 at 9:24 AM, Anders Arpteg arp...@spotify.com
 wrote:

 No, unfortunately we're not making use of dynamic allocation or the
 external shuffle service. Hoping that we could reconfigure our cluster to
 make use of it, but since it requires changes to the cluster itself (and
 not just the 

Unable to find org.apache.spark.sql.catalyst.ScalaReflection class

2015-02-28 Thread Ashish Nigam
Hi,
I wrote a very simple program in scala to convert an existing RDD to
SchemaRDD.
But createSchemaRDD function is throwing exception

Exception in thread main scala.ScalaReflectionException: class
org.apache.spark.sql.catalyst.ScalaReflection in JavaMirror with primordial
classloader with boot classpath [.] not found


Here's more info on the versions I am using -

scala.binary.version2.11/scala.binary.version
spark.version1.2.1/spark.version
scala.version2.11.5/scala.version

Please let me know how can I resolve this problem.

Thanks
Ashish


Re: getting this error while runing

2015-02-28 Thread shahid
Also the data file is on hdfs



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/getting-this-error-while-runing-tp21860p21861.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Unable to find org.apache.spark.sql.catalyst.ScalaReflection class

2015-02-28 Thread Ashish Nigam
Also, can scala version play any role here?
I am using scala 2.11.5 but all spark packages have dependency to scala
2.11.2
Just wanted to make sure that scala version is not an issue here.

On Sat, Feb 28, 2015 at 9:18 AM, Ashish Nigam ashnigamt...@gmail.com
wrote:

 Hi,
 I wrote a very simple program in scala to convert an existing RDD to
 SchemaRDD.
 But createSchemaRDD function is throwing exception

 Exception in thread main scala.ScalaReflectionException: class
 org.apache.spark.sql.catalyst.ScalaReflection in JavaMirror with primordial
 classloader with boot classpath [.] not found


 Here's more info on the versions I am using -

 scala.binary.version2.11/scala.binary.version
 spark.version1.2.1/spark.version
 scala.version2.11.5/scala.version

 Please let me know how can I resolve this problem.

 Thanks
 Ashish



Re: Accumulator in SparkUI for streaming

2015-02-28 Thread Tim Smith
So somehow Spark Streaming doesn't support display of named accumulators in
the WebUI?


On Tue, Feb 24, 2015 at 7:58 AM, Petar Zecevic petar.zece...@gmail.com
wrote:


 Interesting. Accumulators are shown on Web UI if you are using the
 ordinary SparkContext (Spark 1.2). It just has to be named (and that's what
 you did).

 scala val acc = sc.accumulator(0, test accumulator)
 acc: org.apache.spark.Accumulator[Int] = 0
 scala val rdd = sc.parallelize(1 to 1000)
 rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at
 parallelize at console:12
 scala rdd.foreach(x = acc += 1)
 scala acc.value
 res1: Int = 1000

 The Stage details page shows:




 On 20.2.2015. 9:25, Tim Smith wrote:

  On Spark 1.2:

  I am trying to capture # records read from a kafka topic:

  val inRecords = ssc.sparkContext.accumulator(0, InRecords)

  ..

  kInStreams.foreach( k =
 {

   k.foreachRDD ( rdd =  inRecords += rdd.count().toInt  )
   inRecords.value


  Question is how do I get the accumulator to show up in the UI? I tried
 inRecords.value but that didn't help. Pretty sure it isn't showing up in
 Stage metrics.

  What's the trick here? collect?

  Thanks,

  Tim





Re: Tools to manage workflows on Spark

2015-02-28 Thread Mayur Rustagi
Sorry not really. Spork is a way to migrate your existing pig scripts to
Spark or write new pig jobs then can execute on spark.
For orchestration you are better off using Oozie especially if you are
using other execution engines/systems besides spark.


Regards,
Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoid.com http://www.sigmoidanalytics.com/
@mayur_rustagi http://www.twitter.com/mayur_rustagi

On Sat, Feb 28, 2015 at 6:59 PM, Qiang Cao caoqiang...@gmail.com wrote:

 Thanks Mayur! I'm looking for something that would allow me to easily
 describe and manage a workflow on Spark. A workflow in my context is a
 composition of Spark applications that may depend on one another based on
 hdfs inputs/outputs. Is Spork a good fit? The orchestration I want is on
 app level.



 On Sat, Feb 28, 2015 at 9:38 PM, Mayur Rustagi mayur.rust...@gmail.com
 wrote:

 We do maintain it but in apache repo itself. However Pig cannot do
 orchestration for you. I am not sure what you are looking at from Pig in
 this context.

 Regards,
 Mayur Rustagi
 Ph: +1 (760) 203 3257
 http://www.sigmoid.com http://www.sigmoidanalytics.com/
 @mayur_rustagi http://www.twitter.com/mayur_rustagi

 On Sat, Feb 28, 2015 at 6:36 PM, Ted Yu yuzhih...@gmail.com wrote:

 Here was latest modification in spork repo:
 Mon Dec 1 10:08:19 2014

 Not sure if it is being actively maintained.

 On Sat, Feb 28, 2015 at 6:26 PM, Qiang Cao caoqiang...@gmail.com
 wrote:

 Thanks for the pointer, Ashish! I was also looking at Spork
 https://github.com/sigmoidanalytics/spork Pig-on-Spark), but wasn't
 sure if that's the right direction.

 On Sat, Feb 28, 2015 at 6:36 PM, Ashish Nigam ashnigamt...@gmail.com
 wrote:

 You have to call spark-submit from oozie.
 I used this link to get the idea for my implementation -


 http://mail-archives.apache.org/mod_mbox/oozie-user/201404.mbox/%3CCAHCsPn-0Grq1rSXrAZu35yy_i4T=fvovdox2ugpcuhkwmjp...@mail.gmail.com%3E



 On Feb 28, 2015, at 3:25 PM, Qiang Cao caoqiang...@gmail.com wrote:

 Thanks, Ashish! Is Oozie integrated with Spark? I knew it can
 accommodate some Hadoop jobs.


 On Sat, Feb 28, 2015 at 6:07 PM, Ashish Nigam ashnigamt...@gmail.com
 wrote:

 Qiang,
 Did you look at Oozie?
 We use oozie to run spark jobs in production.


 On Feb 28, 2015, at 2:45 PM, Qiang Cao caoqiang...@gmail.com wrote:

 Hi Everyone,

 We need to deal with workflows on Spark. In our scenario, each
 workflow consists of multiple processing steps. Among different steps,
 there could be dependencies.  I'm wondering if there are tools
 available that can help us schedule and manage workflows on Spark. I'm
 looking for something like pig on Hadoop, but it should fully function on
 Spark.

 Any suggestion?

 Thanks in advance!

 Qiang











Re: Tools to manage workflows on Spark

2015-02-28 Thread Qiang Cao
Thanks for the pointer, Ashish! I was also looking at Spork
https://github.com/sigmoidanalytics/spork Pig-on-Spark), but wasn't sure if
that's the right direction.

On Sat, Feb 28, 2015 at 6:36 PM, Ashish Nigam ashnigamt...@gmail.com
wrote:

 You have to call spark-submit from oozie.
 I used this link to get the idea for my implementation -


 http://mail-archives.apache.org/mod_mbox/oozie-user/201404.mbox/%3CCAHCsPn-0Grq1rSXrAZu35yy_i4T=fvovdox2ugpcuhkwmjp...@mail.gmail.com%3E



 On Feb 28, 2015, at 3:25 PM, Qiang Cao caoqiang...@gmail.com wrote:

 Thanks, Ashish! Is Oozie integrated with Spark? I knew it can accommodate
 some Hadoop jobs.


 On Sat, Feb 28, 2015 at 6:07 PM, Ashish Nigam ashnigamt...@gmail.com
 wrote:

 Qiang,
 Did you look at Oozie?
 We use oozie to run spark jobs in production.


 On Feb 28, 2015, at 2:45 PM, Qiang Cao caoqiang...@gmail.com wrote:

 Hi Everyone,

 We need to deal with workflows on Spark. In our scenario, each workflow
 consists of multiple processing steps. Among different steps, there could
 be dependencies.  I'm wondering if there are tools available that can
 help us schedule and manage workflows on Spark. I'm looking for something
 like pig on Hadoop, but it should fully function on Spark.

 Any suggestion?

 Thanks in advance!

 Qiang







Connection pool in workers

2015-02-28 Thread A . K . M . Ashrafuzzaman
Hi guys,
I am new to spark and we are running a small project that collects data from 
Kinesis and inserts in to mongo.
I would like to share a high level view of how it is done and would love you 
input on it.

I am fetching kinesis data and for each RDD
  - Parsing String data
  - Inserting into a mongo storage

So what I understand is when in each RDD we are parsing data”, that is 
serialized and send to workers. So when I would want to write to mongo.
Each workers creates a new connection to write to data.

Is there any way I can use a connection pool? By the way I am using scala and 
spark streaming.


A.K.M. Ashrafuzzaman
Lead Software Engineer
NewsCred

(M) 880-175-5592433
Twitter | Blog | Facebook

Check out The Academy, your #1 source
for free content marketing resources

Re: Tools to manage workflows on Spark

2015-02-28 Thread Ted Yu
Here was latest modification in spork repo:
Mon Dec 1 10:08:19 2014

Not sure if it is being actively maintained.

On Sat, Feb 28, 2015 at 6:26 PM, Qiang Cao caoqiang...@gmail.com wrote:

 Thanks for the pointer, Ashish! I was also looking at Spork
 https://github.com/sigmoidanalytics/spork Pig-on-Spark), but wasn't sure
 if that's the right direction.

 On Sat, Feb 28, 2015 at 6:36 PM, Ashish Nigam ashnigamt...@gmail.com
 wrote:

 You have to call spark-submit from oozie.
 I used this link to get the idea for my implementation -


 http://mail-archives.apache.org/mod_mbox/oozie-user/201404.mbox/%3CCAHCsPn-0Grq1rSXrAZu35yy_i4T=fvovdox2ugpcuhkwmjp...@mail.gmail.com%3E



 On Feb 28, 2015, at 3:25 PM, Qiang Cao caoqiang...@gmail.com wrote:

 Thanks, Ashish! Is Oozie integrated with Spark? I knew it can accommodate
 some Hadoop jobs.


 On Sat, Feb 28, 2015 at 6:07 PM, Ashish Nigam ashnigamt...@gmail.com
 wrote:

 Qiang,
 Did you look at Oozie?
 We use oozie to run spark jobs in production.


 On Feb 28, 2015, at 2:45 PM, Qiang Cao caoqiang...@gmail.com wrote:

 Hi Everyone,

 We need to deal with workflows on Spark. In our scenario, each workflow
 consists of multiple processing steps. Among different steps, there could
 be dependencies.  I'm wondering if there are tools available that can
 help us schedule and manage workflows on Spark. I'm looking for something
 like pig on Hadoop, but it should fully function on Spark.

 Any suggestion?

 Thanks in advance!

 Qiang








Re: Tools to manage workflows on Spark

2015-02-28 Thread Qiang Cao
Thanks Mayur! I'm looking for something that would allow me to easily
describe and manage a workflow on Spark. A workflow in my context is a
composition of Spark applications that may depend on one another based on
hdfs inputs/outputs. Is Spork a good fit? The orchestration I want is on
app level.


On Sat, Feb 28, 2015 at 9:38 PM, Mayur Rustagi mayur.rust...@gmail.com
wrote:

 We do maintain it but in apache repo itself. However Pig cannot do
 orchestration for you. I am not sure what you are looking at from Pig in
 this context.

 Regards,
 Mayur Rustagi
 Ph: +1 (760) 203 3257
 http://www.sigmoid.com http://www.sigmoidanalytics.com/
 @mayur_rustagi http://www.twitter.com/mayur_rustagi

 On Sat, Feb 28, 2015 at 6:36 PM, Ted Yu yuzhih...@gmail.com wrote:

 Here was latest modification in spork repo:
 Mon Dec 1 10:08:19 2014

 Not sure if it is being actively maintained.

 On Sat, Feb 28, 2015 at 6:26 PM, Qiang Cao caoqiang...@gmail.com wrote:

 Thanks for the pointer, Ashish! I was also looking at Spork
 https://github.com/sigmoidanalytics/spork Pig-on-Spark), but wasn't
 sure if that's the right direction.

 On Sat, Feb 28, 2015 at 6:36 PM, Ashish Nigam ashnigamt...@gmail.com
 wrote:

 You have to call spark-submit from oozie.
 I used this link to get the idea for my implementation -


 http://mail-archives.apache.org/mod_mbox/oozie-user/201404.mbox/%3CCAHCsPn-0Grq1rSXrAZu35yy_i4T=fvovdox2ugpcuhkwmjp...@mail.gmail.com%3E



 On Feb 28, 2015, at 3:25 PM, Qiang Cao caoqiang...@gmail.com wrote:

 Thanks, Ashish! Is Oozie integrated with Spark? I knew it can
 accommodate some Hadoop jobs.


 On Sat, Feb 28, 2015 at 6:07 PM, Ashish Nigam ashnigamt...@gmail.com
 wrote:

 Qiang,
 Did you look at Oozie?
 We use oozie to run spark jobs in production.


 On Feb 28, 2015, at 2:45 PM, Qiang Cao caoqiang...@gmail.com wrote:

 Hi Everyone,

 We need to deal with workflows on Spark. In our scenario, each
 workflow consists of multiple processing steps. Among different steps,
 there could be dependencies.  I'm wondering if there are tools
 available that can help us schedule and manage workflows on Spark. I'm
 looking for something like pig on Hadoop, but it should fully function on
 Spark.

 Any suggestion?

 Thanks in advance!

 Qiang










Re: Reg. Difference in Performance

2015-02-28 Thread Deep Pradhan
You mean the size of the data that we take?

Thank You
Regards,
Deep

On Sun, Mar 1, 2015 at 6:04 AM, Joseph Bradley jos...@databricks.com
wrote:

 Hi Deep,

 Compute times may not be very meaningful for small examples like those.
 If you increase the sizes of the examples, then you may start to observe
 more meaningful trends and speedups.

 Joseph

 On Sat, Feb 28, 2015 at 7:26 AM, Deep Pradhan pradhandeep1...@gmail.com
 wrote:

 Hi,
 I am running Spark applications in GCE. I set up cluster with different
 number of nodes varying from 1 to 7. The machines are single core machines.
 I set the spark.default.parallelism to the number of nodes in the cluster
 for each cluster. I ran the four applications available in Spark Examples,
 SparkTC, SparkALS, SparkLR, SparkPi for each of the configurations.
 What I notice is the following:
 In case of SparkTC and SparkALS, the time to complete the job increases
 with the increase in number of nodes in cluster, where as in SparkLR and
 SparkPi, the time to complete the job remains the same across all the
 configurations.
 Could anyone explain me this?

 Thank You
 Regards,
 Deep





Re: Tools to manage workflows on Spark

2015-02-28 Thread Mayur Rustagi
We do maintain it but in apache repo itself. However Pig cannot do
orchestration for you. I am not sure what you are looking at from Pig in
this context.

Regards,
Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoid.com http://www.sigmoidanalytics.com/
@mayur_rustagi http://www.twitter.com/mayur_rustagi

On Sat, Feb 28, 2015 at 6:36 PM, Ted Yu yuzhih...@gmail.com wrote:

 Here was latest modification in spork repo:
 Mon Dec 1 10:08:19 2014

 Not sure if it is being actively maintained.

 On Sat, Feb 28, 2015 at 6:26 PM, Qiang Cao caoqiang...@gmail.com wrote:

 Thanks for the pointer, Ashish! I was also looking at Spork
 https://github.com/sigmoidanalytics/spork Pig-on-Spark), but wasn't sure
 if that's the right direction.

 On Sat, Feb 28, 2015 at 6:36 PM, Ashish Nigam ashnigamt...@gmail.com
 wrote:

 You have to call spark-submit from oozie.
 I used this link to get the idea for my implementation -


 http://mail-archives.apache.org/mod_mbox/oozie-user/201404.mbox/%3CCAHCsPn-0Grq1rSXrAZu35yy_i4T=fvovdox2ugpcuhkwmjp...@mail.gmail.com%3E



 On Feb 28, 2015, at 3:25 PM, Qiang Cao caoqiang...@gmail.com wrote:

 Thanks, Ashish! Is Oozie integrated with Spark? I knew it can
 accommodate some Hadoop jobs.


 On Sat, Feb 28, 2015 at 6:07 PM, Ashish Nigam ashnigamt...@gmail.com
 wrote:

 Qiang,
 Did you look at Oozie?
 We use oozie to run spark jobs in production.


 On Feb 28, 2015, at 2:45 PM, Qiang Cao caoqiang...@gmail.com wrote:

 Hi Everyone,

 We need to deal with workflows on Spark. In our scenario, each workflow
 consists of multiple processing steps. Among different steps, there could
 be dependencies.  I'm wondering if there are tools available that can
 help us schedule and manage workflows on Spark. I'm looking for something
 like pig on Hadoop, but it should fully function on Spark.

 Any suggestion?

 Thanks in advance!

 Qiang









Re: Unable to find org.apache.spark.sql.catalyst.ScalaReflection class

2015-02-28 Thread Michael Armbrust
I think its possible that the problem is that the scala compiler is not
being loaded by the primordial classloader (but instead by some child
classloader) and thus the scala reflection mirror is failing to initialize
when it can't find it. Unfortunately, the only solution that I know of is
to load all required jars when the JVM starts.

On Sat, Feb 28, 2015 at 5:26 PM, Ashish Nigam ashnigamt...@gmail.com
wrote:

 Also, can scala version play any role here?
 I am using scala 2.11.5 but all spark packages have dependency to scala
 2.11.2
 Just wanted to make sure that scala version is not an issue here.

 On Sat, Feb 28, 2015 at 9:18 AM, Ashish Nigam ashnigamt...@gmail.com
 wrote:

 Hi,
 I wrote a very simple program in scala to convert an existing RDD to
 SchemaRDD.
 But createSchemaRDD function is throwing exception

 Exception in thread main scala.ScalaReflectionException: class
 org.apache.spark.sql.catalyst.ScalaReflection in JavaMirror with primordial
 classloader with boot classpath [.] not found


 Here's more info on the versions I am using -

 scala.binary.version2.11/scala.binary.version
 spark.version1.2.1/spark.version
 scala.version2.11.5/scala.version

 Please let me know how can I resolve this problem.

 Thanks
 Ashish