Worker Random Port

2014-09-23 Thread Paul Magid
I am trying to get an edge server up and running connecting to our Spark 1.1 
cluster.  The edge server is in a different DMZ than the rest of the cluster 
and we have to specifically open firewall ports between the edge server and the 
rest of the cluster.   I can log on to any node in the cluster (other than the 
edge) and submit code through spark-shell just fine.   I have port 7077 from 
the edge to the master open (verified), and I have port 7078 open from the edge 
to all the workers (also verified).  I have tried setting the worker port to 
not be dynamic by using SPARK_WORKER_PORT in the spark-env.sh but it does not 
seem to stop the dynamic port behavior.   I have included the startup output 
when running spark-shell from the edge server in a different dmz and then from 
a node in the cluster.  Any help greatly appreciated.

Paul Magid
Toyota Motor Sales IS Enterprise Architecture (EA)
Architect I RD
Ph: 310-468-9091 (X69091)
PCN 1C2970, Mail Drop PN12


Running spark-shell from the edge server

14/09/23 14:20:38 INFO SecurityManager: Changing view acls to: root,
14/09/23 14:20:38 INFO SecurityManager: Changing modify acls to: root,
14/09/23 14:20:38 INFO SecurityManager: SecurityManager: authentication 
disabled; ui acls disabled; users with view permissions: Set(root, ); users 
with modify permissions: Set(root, )
14/09/23 14:20:38 INFO HttpServer: Starting HTTP Server
14/09/23 14:20:39 INFO Utils: Successfully started service 'HTTP class server' 
on port 22788.
Welcome to
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.1.0
  /_/

Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_55)
Type in expressions to have them evaluated.
Type :help for more information.
14/09/23 14:20:42 INFO SecurityManager: Changing view acls to: root,
14/09/23 14:20:42 INFO SecurityManager: Changing modify acls to: root,
14/09/23 14:20:42 INFO SecurityManager: SecurityManager: authentication 
disabled; ui acls disabled; users with view permissions: Set(root, ); users 
with modify permissions: Set(root, )
14/09/23 14:20:43 INFO Slf4jLogger: Slf4jLogger started
14/09/23 14:20:43 INFO Remoting: Starting remoting
14/09/23 14:20:43 INFO Remoting: Remoting started; listening on addresses 
:[akka.tcp://sparkdri...@votlbdcd09.tms.toyota.com:32356]
14/09/23 14:20:43 INFO Remoting: Remoting now listens on addresses: 
[akka.tcp://sparkdri...@votlbdcd09.tms.toyota.com:32356]
14/09/23 14:20:43 INFO Utils: Successfully started service 'sparkDriver' on 
port 32356.
14/09/23 14:20:43 INFO SparkEnv: Registering MapOutputTracker
14/09/23 14:20:43 INFO SparkEnv: Registering BlockManagerMaster
14/09/23 14:20:43 INFO DiskBlockManager: Created local directory at 
/tmp/spark-local-20140923142043-4454
14/09/23 14:20:43 INFO Utils: Successfully started service 'Connection manager 
for block manager' on port 48469.
14/09/23 14:20:43 INFO ConnectionManager: Bound socket to port 48469 with id = 
ConnectionManagerId(votlbdcd09.tms.toyota.com,48469)
14/09/23 14:20:43 INFO MemoryStore: MemoryStore started with capacity 265.9 MB
14/09/23 14:20:43 INFO BlockManagerMaster: Trying to register BlockManager
14/09/23 14:20:43 INFO BlockManagerMasterActor: Registering block manager 
votlbdcd09.tms.toyota.com:48469 with 265.9 MB RAM
14/09/23 14:20:43 INFO BlockManagerMaster: Registered BlockManager
14/09/23 14:20:43 INFO HttpFileServer: HTTP File server directory is 
/tmp/spark-888c359a-5a2a-4aaa-80e3-8009cdfa25c8
14/09/23 14:20:43 INFO HttpServer: Starting HTTP Server
14/09/23 14:20:43 INFO Utils: Successfully started service 'HTTP file server' 
on port 12470.
14/09/23 14:20:43 INFO Utils: Successfully started service 'SparkUI' on port 
4040.
14/09/23 14:20:43 INFO SparkUI: Started SparkUI at 
http://votlbdcd09.tms.toyota.com:4040
14/09/23 14:20:43 WARN NativeCodeLoader: Unable to load native-hadoop library 
for your platform... using builtin-java classes where applicable
14/09/23 14:20:44 INFO EventLoggingListener: Logging events to 
file:/user/spark/applicationHistory//spark-shell-1411507243973
14/09/23 14:20:44 INFO AppClient$ClientActor: Connecting to master 
spark://votlbdcd01.tms.toyota.com:7077...
14/09/23 14:20:44 INFO SparkDeploySchedulerBackend: SchedulerBackend is ready 
for scheduling beginning after reached minRegisteredResourcesRatio: 0.0
14/09/23 14:20:44 INFO SparkILoop: Created spark context..
14/09/23 14:20:44 INFO SparkDeploySchedulerBackend: Connected to Spark cluster 
with app ID app-20140923142044-0006
Spark context available as sc.

scala 14/09/23 14:21:26 INFO AppClient$ClientActor: Executor added: 
app-20140923142044-0006/0 on 
worker-20140923084845-votlbdcd03.tms.toyota.com-7078 
(votlbdcd03.tms.toyota.com:7078) with 8 cores
14/09/23 14:21:26 INFO SparkDeploySchedulerBackend: Granted executor ID 
app-20140923142044-0006/0 on hostPort votlbdcd03

Spark SQL Exception

2014-09-18 Thread Paul Magid
All:

I am putting Spark SQL 1.1 through its paces (in a POC) and have been 
pleasantly surprised with what can be done with such a young technology.I 
have run into an exception (listed below) that I suspect relates to the number 
of columns in the table I am querying.   There are 336 columns in the table.   
I have included the Scala / Spark SQL I am running.  This Spark SQL code runs 
just fine when run against narrower tables.   Also, we have purpose built 
this POC cluster with lots of memory and we have set up Impala and Spark SQL 
with roughly the same amounts of memory.   There are 7 worker nodes with 20GB 
memory for Impala and Spark SQL each.  We are using Impala as a comparative 
benchmark and sanity check.  The equivalent SQL runs just fine in Impala (see 
below).   I am a bit of a noob and any help (even with the code below) is 
greatly appreciated.  Also, is there a document that lists current Spark SQL 
limitations/issues?

Paul Magid
Toyota Motor Sales IS Enterprise Architecture (EA)
Architect I RD
Ph: 310-468-9091 (X69091)
PCN 1C2970, Mail Drop PN12


Successful Result In Impala
+
++
| marital_status |
++
| M  |
| S  |
| U  |
| null   |
++
Returned 4 row(s) in 0.91s

Code
+
//Timer code
def time[R](block: = R): R = {
val t0 = System.nanoTime()
val result = block// call-by-name
val t1 = System.nanoTime()
println(Elapsed time:  + (t1 - t0).toFloat/10 + s)
result
}

//Declare and import SQLContext
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext._

//Load Parquet file into a table
val parquetFile_db2 = 
sqlContext.parquetFile(hdfs://x.x.x.x:8020/user/hive/warehouse/c360poc.db/customer_demographic_pq/)
parquetFile_db2.registerAsTable(customer_demographic_pq)

//Run SQL code with timer
val records= time {sql(select marital_status from customer_demographic_pq 
group by marital_status order by marital_status ).collect().foreach(println)}


Exception
+
14/09/18 08:50:39 INFO SparkContext: Job finished: RangePartitioner at 
Exchange.scala:79, took 21.885859255 s
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: sort, tree:
Sort [marital_status#9 ASC], true
Exchange (RangePartitioning [marital_status#9 ASC], 200)
  Aggregate false, [marital_status#9], [marital_status#9]
   Exchange (HashPartitioning [marital_status#9], 200)
Aggregate true, [marital_status#9], [marital_status#9]
 ParquetTableScan [marital_status#9], (ParquetRelation 
hdfs://x.x.x.x:8020/user/hive/warehouse/c360poc.db/customer_demographic_pq/, 
Some(Configuration: core-default.xml, core-site.xml, mapred-default.xml, 
mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, 
hdfs-site.xml), org.apache.spark.sql.SQLContext@4d79d3de, []), []

at 
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:47)
at org.apache.spark.sql.execution.Sort.execute(basicOperators.scala:191)
at 
org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:85)
at org.apache.spark.sql.SchemaRDD.collect(SchemaRDD.scala:438)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply$mcV$sp(console:19)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(console:19)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(console:19)
at $iwC$$iwC$$iwC$$iwC.time(console:12)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:19)
at $iwC$$iwC$$iwC$$iwC$$iwC.init(console:24)
at $iwC$$iwC$$iwC$$iwC.init(console:26)
at $iwC$$iwC$$iwC.init(console:28)
at $iwC$$iwC.init(console:30)
at $iwC.init(console:32)
at init(console:34)
at .init(console:38)
at .clinit(console)
at .init(console:7)
at .clinit(console)
at $print(console)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at 
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789)
at 
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062)
at 
org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610)
at 
org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:814)
at 
org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala

RE: Spark SQL Exception

2014-09-18 Thread Paul Magid
Michael:

Thanks for the quick response.   I can confirm that once I removed the “order 
by” clause the exception below went away.   So, I believe this confirms what 
you were say and I will be opening a new feature request in JIRA.

However, that exception was replaced by a java.lang.OutOfMemoryError: Java heap 
space error.   I am guessing this relates to any of the following Issues:
SPARK-2902 Change default options to be more agressive (In memory columnar 
compression)
SPARK-3056 Sort-based Aggregation (SparkSQL only support the hash-based 
aggregation, which may cause OOM if too many identical keys in the input 
tuples.)
SPARK-2926 Add MR-style (merge-sort) SortShuffleReader for sort-based shuffle

The Exception is included below.

Paul Magid
Toyota Motor Sales IS Enterprise Architecture (EA)
Architect I RD
Ph: 310-468-9091 (X69091)
PCN 1C2970, Mail Drop PN12

Exception
+
14/09/18 10:11:03 INFO TaskSetManager: Finished task 36.0 in stage 0.0 (TID 57) 
in 18681 ms on votlbdcd04.tms.toyota.com (5/200)
14/09/18 10:11:09 ERROR Utils: Uncaught exception in thread Result resolver 
thread-0
java.lang.OutOfMemoryError: Java heap space
Exception in thread Result resolver thread-0 14/09/18 10:11:09 INFO 
RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
java.lang.OutOfMemoryError: Java heap space
14/09/18 10:11:09 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon 
shut down; proceeding with flushing remote transports.
14/09/18 10:11:09 INFO Remoting: Remoting shut down
14/09/18 10:11:09 INFO RemoteActorRefProvider$RemotingTerminator: Remoting shut 
down.
org.apache.spark.SparkException: Job cancelled because SparkContext was shut 
down
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:694)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:693)
at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
at 
org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:693)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessActor.postStop(DAGScheduler.scala:1399)
at 
akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:201)
at 
akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:163)
at akka.actor.ActorCell.terminate(ActorCell.scala:338)
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:431)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
at akka.dispatch.Mailbox.run(Mailbox.scala:218)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


scala 14/09/18 10:11:09 INFO TaskSetManager: Finished task 50.0 in stage 0.0 
(TID 71) in 27100 ms on votlbdcd04.tms.toyota.com (6/200)
14/09/18 10:11:10 INFO TaskSetManager: Finished task 22.0 in stage 0.0 (TID 43) 
in 27520 ms on votlbdcd04.tms.toyota.com (7/200)
14/09/18 10:11:10 INFO ConnectionManager: Removing ReceivingConnection to 
ConnectionManagerId(votlbdcd05.tms.toyota.com,24438)
14/09/18 10:11:10 INFO ConnectionManager: Key not valid ? 
sun.nio.ch.SelectionKeyImpl@7a542d34
14/09/18 10:11:10 INFO ConnectionManager: key already cancelled ? 
sun.nio.ch.SelectionKeyImpl@7a542d34
java.nio.channels.CancelledKeyException
at 
org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:386)
at 
org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:139)
14/09/18 10:11:10 INFO ConnectionManager: Removing SendingConnection to 
ConnectionManagerId(votlbdcd05.tms.toyota.com,24438)
14/09/18 10:11:10 INFO ConnectionManager: Removing SendingConnection to 
ConnectionManagerId(votlbdcd05.tms.toyota.com,24438)
14/09/18 10:11:10 INFO ConnectionManager: Handling connection error on 
connection to ConnectionManagerId(votlbdcd05.tms.toyota.com,24438)
14/09/18 10:11:10 INFO ConnectionManager: Removing SendingConnection to 
ConnectionManagerId(votlbdcd05.tms.toyota.com,24438)
14/09/18 10:11:10 INFO ConnectionManager: Removing SendingConnection to 
ConnectionManagerId(votlbdcd05.tms.toyota.com,24438)
14/09/18 10:11:10 INFO ConnectionManager: Removing SendingConnection to 
ConnectionManagerId(votlbdcd06.tms.toyota.com,19998)
14/09/18 10:11:10 INFO ConnectionManager: Removing ReceivingConnection to 
ConnectionManagerId