Re: Spark job tracker.

2014-07-21 Thread abhiguruvayya
Hello Marcelo Vanzin,

Can you explain bit more on this? I tried using client mode but can you
explain how can i use this port to write the log or output to this
port?Thanks in advance!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-job-tracker-tp8367p10287.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: spark1.0.1 spark sql error java.lang.NoClassDefFoundError: Could not initialize class $line11.$read$

2014-07-21 Thread Victor Sheng
Hi,Kevin
I tried it on spark1.0.0, it works fine.
It's a bug in spark1.0.1 ...
Thanks,
Victor



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark1-0-1-spark-sql-error-java-lang-NoClassDefFoundError-Could-not-initialize-class-line11-read-tp10135p10288.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: TreeNodeException: No function to evaluate expression. type: AttributeReference, tree: id#0 on GROUP BY

2014-07-21 Thread Martin Gammelsæter
Aha, that makes sense. Thanks for the response! I guess one of the
areas Spark could need some love in in error messages (:

On Fri, Jul 18, 2014 at 9:41 PM, Michael Armbrust
mich...@databricks.com wrote:
 Sorry for the non-obvious error message.  It is not valid SQL to include
 attributes in the select clause unless they are also in the group by clause
 or are inside of an aggregate function.

 On Jul 18, 2014 5:12 AM, Martin Gammelsæter martingammelsae...@gmail.com
 wrote:

 Hi again!

 I am having problems when using GROUP BY on both SQLContext and
 HiveContext (same problem).

 My code (simplified as much as possible) can be seen here:
 http://pastebin.com/33rjW67H

 In short, I'm getting data from a Cassandra store with Datastax' new
 driver (which works great by the way, recommended!), and mapping it to
 a Spark SQL table through a Product class (Dokument in the source).
 Regular SELECTs and stuff works fine, but once I try to do a GROUP BY,
 I get the following error:

 Exception in thread main org.apache.spark.SparkException: Job
 aborted due to stage failure: Task 0.0:25 failed 4 times, most recent
 failure: Exception failure in TID 63 on host 192.168.121.132:
 org.apache.spark.sql.catalyst.errors.package$TreeNodeException: No
 function to evaluate expression. type: AttributeReference, tree: id#0

 org.apache.spark.sql.catalyst.expressions.AttributeReference.eval(namedExpressions.scala:158)

 org.apache.spark.sql.catalyst.expressions.MutableProjection.apply(Projection.scala:64)

 org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7$$anon$1.next(Aggregate.scala:195)

 org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7$$anon$1.next(Aggregate.scala:174)
 scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
 scala.collection.Iterator$class.foreach(Iterator.scala:727)
 scala.collection.AbstractIterator.foreach(Iterator.scala:1157)

 scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)

 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)

 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)

 scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
 scala.collection.AbstractIterator.to(Iterator.scala:1157)

 scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
 scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)

 scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
 scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
 org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:750)
 org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:750)

 org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1096)

 org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1096)

 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:112)
 org.apache.spark.scheduler.Task.run(Task.scala:51)

 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)

 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 java.lang.Thread.run(Thread.java:745)

 What am I doing wrong?

 --
 Best regards,
 Martin Gammelsæter



-- 
Mvh.
Martin Gammelsæter
92209139


Re: spark sql left join gives KryoException: Buffer overflow

2014-07-21 Thread Pei-Lun Lee
Hi Michael,

Thanks for the suggestion. In my query, both table are too large to use
broadcast join.

When SPARK-2211 is done, will spark sql automatically choose join
algorithms?
Is there some way to manually hint the optimizer?


2014-07-19 5:23 GMT+08:00 Michael Armbrust mich...@databricks.com:

 Unfortunately, this is a query where we just don't have an efficiently
 implementation yet.  You might try switching the table order.

 Here is the JIRA for doing something more efficient:
 https://issues.apache.org/jira/browse/SPARK-2212


 On Fri, Jul 18, 2014 at 7:05 AM, Pei-Lun Lee pl...@appier.com wrote:

 Hi,

 We have a query with left joining and got this error:

 Caused by: org.apache.spark.SparkException: Job aborted due to stage
 failure: Task 1.0:0 failed 4 times, most recent failure: Exception failure
 in TID 5 on host ip-10-33-132-101.us-west-2.compute.internal:
 com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 0,
 required: 1

 Looks like spark sql tried to do a broadcast join and collecting one of
 the table to master but it is too large.

 How do we explicitly control the join behavior like this?

 --
 Pei-Lun Lee





LabeledPoint with weight

2014-07-21 Thread Jiusheng Chen
It seems MLlib right now doesn't support weighted training, training samples
have equal importance. Weighted training can be very useful to reduce data
size and speed up training. 

Do you have plan to support it in future? The data format will be something
like:

label:*weight * index1:value1 index2:value2 ...



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/LabeledPoint-with-weight-tp10291.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Can't see any thing one the storage panel of application UI

2014-07-21 Thread binbinbin915
Hi,

I'm running LogisticRegression of mllib. But I can't see the rdd information
from storage panel.

http://apache-spark-user-list.1001560.n3.nabble.com/file/n10296/a.png 
http://apache-spark-user-list.1001560.n3.nabble.com/file/n10296/b.png 







--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Can-t-see-any-thing-one-the-storage-panel-of-application-UI-tp10296.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Can't see any thing one the storage panel of application UI

2014-07-21 Thread Preeti Khurana
Am getting the same issue .
Spark version : 1.0



On 21/07/14 4:16 PM, binbinbin915 binbinbin...@live.cn wrote:

Hi,

I'm running LogisticRegression of mllib. But I can't see the rdd
information
from storage panel.

http://apache-spark-user-list.1001560.n3.nabble.com/file/n10296/a.png
http://apache-spark-user-list.1001560.n3.nabble.com/file/n10296/b.png







--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/Can-t-see-any-thing-on
e-the-storage-panel-of-application-UI-tp10296.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: the default GraphX graph-partition strategy on multicore machine?

2014-07-21 Thread Yifan LI
Thanks so much, Ankur, :))

Excuse me but I am wondering that:

(for a chosen partition strategy for my application)
1.1)  how to check the size of each partition? is there any api, or log file?
1.2) how to check the processing cost of each partition(time, memory, etc)?

2.1) and the global communication cost of my application upon the chosen 
partition strategy?


On Jul 18, 2014, at 9:18 PM, Ankur Dave ankurd...@gmail.com wrote:

 Sorry, I didn't read your vertex replication example carefully, so my 
 previous answer is wrong. Here's the correct one:
 
 On Fri, Jul 18, 2014 at 9:13 AM, Yifan LI iamyifa...@gmail.com wrote:
 I don't understand, for instance, we have 3 edge partition tables(EA: a - b, 
 a - c; EB: a - d, a - e; EC: d - c ), 2 vertex partition tables(VA: a, b, 
 c; VB: d, e), the whole vertex table VA will be replicated to all these 3 
 edge partitions? since each of them refers to some vertexes in VA.
 
 Vertices can be replicated individually without requiring the entire vertex 
 partition to be replicated. In this case, here's what will get replicated to 
 each partition:
 
 EA: a (from VA), b (from VA), c (from VA)
 EB: a (from VA), d (from VB), e (from VB)
 EC: c (from VA), d (from VB)
 
 Ankur



Re: DynamoDB input source

2014-07-21 Thread Ian Wilkinson
Hi,

I am invoking the spark-shell (Spark 1.0.0) with:

spark-shell --jars \
libs/aws-java-sdk-1.3.26.jar,\
libs/httpclient-4.1.1.jar,\
libs/httpcore-nio-4.1.jar,\
libs/gson-2.1.jar,\
libs/httpclient-cache-4.1.1.jar,\
libs/httpmime-4.1.1.jar,\
libs/hive-dynamodb-handler-0.11.0.jar,\
libs/httpcore-4.1.jar,\
libs/joda-time-2.1.jar

and, entering the following in the shell:


import org.apache.hadoop.io.Text;
import org.apache.hadoop.dynamodb.DynamoDBItemWritable
import org.apache.hadoop.dynamodb.read.DynamoDBInputFormat
import org.apache.hadoop.mapred.JobConf

var jobConf = new JobConf(sc.hadoopConfiguration)
jobConf.set(dynamodb.servicename, dynamodb)
jobConf.set(dynamodb.input.tableName, ...)
jobConf.set(dynamodb.endpoint, dynamodb.eu-west-1.amazonaws.com)
jobConf.set(dynamodb.regionid, eu-west-1)
jobConf.set(dynamodb.throughput.read, 1)
jobConf.set(dynamodb.throughput.read.percent, 1)

jobConf.set(dynamodb.awsAccessKeyId, ...)
jobConf.set(dynamodb.awsSecretAccessKey, ...)

jobConf.set(mapred.output.format.class, 
org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat)

var users = sc.hadoopRDD(jobConf, classOf[DynamoDBInputFormat], classOf[Text], 
classOf[DynamoDBItemWritable])
users.count()


This is raising an npe for FileSplit (as below). Any suggestions on
what I might pursue to correct this would be very welcome.

ian


14/07/20 23:56:03 INFO deprecation: session.id is deprecated. Instead, use 
dfs.metrics.session-id
14/07/20 23:56:03 INFO JvmMetrics: Initializing JVM Metrics with 
processName=JobTracker, sessionId=
14/07/20 23:56:03 INFO AbstractDynamoDBInputFormat: Throughput percent: 1.0
14/07/20 23:56:03 INFO EndpointProvider: Using endpoint for DynamoDB: 
dynamodb.eu-west-1.amazonaws.com
14/07/20 23:56:03 INFO DynamoDBClient: Describe Table Output: {Table: 
{TableName: ..., KeySchema: {HashKeyElement: {AttributeName: id, 
AttributeType: S, }, }, TableStatus: ACTIVE, CreationDateTime: Wed May 07 
14:38:30 BST 2014, ProvisionedThroughput: {ReadCapacityUnits: 4, 
WriteCapacityUnits: 4, }, TableSizeBytes: 2473, ItemCount: 14, }, }
14/07/20 23:56:03 INFO SparkContext: Starting job: count at console:21
14/07/20 23:56:03 INFO DAGScheduler: Got job 0 (count at console:21) with 1 
output partitions (allowLocal=false)
14/07/20 23:56:03 INFO DAGScheduler: Final stage: Stage 0(count at console:21)
14/07/20 23:56:03 INFO DAGScheduler: Parents of final stage: List()
14/07/20 23:56:03 INFO DAGScheduler: Missing parents: List()
14/07/20 23:56:03 INFO DAGScheduler: Submitting Stage 0 (HadoopRDD[0] at 
hadoopRDD at console:18), which has no missing parents
14/07/20 23:56:03 ERROR DAGSchedulerActorSupervisor: eventProcesserActor failed 
due to the error null; shutting down SparkContext
14/07/20 23:56:04 INFO SparkUI: Stopped Spark web UI at http://10.0.1.7:4040
14/07/20 23:56:04 INFO DAGScheduler: Stopping DAGScheduler
14/07/20 23:56:05 INFO MapOutputTrackerMasterActor: MapOutputTrackerActor 
stopped!
14/07/20 23:56:05 INFO ConnectionManager: Selector thread was interrupted!
14/07/20 23:56:05 INFO ConnectionManager: ConnectionManager stopped
14/07/20 23:56:05 INFO MemoryStore: MemoryStore cleared
14/07/20 23:56:05 INFO BlockManager: BlockManager stopped
14/07/20 23:56:05 INFO BlockManagerMasterActor: Stopping BlockManagerMaster
14/07/20 23:56:05 INFO BlockManagerMaster: BlockManagerMaster stopped
14/07/20 23:56:05 INFO SparkContext: Successfully stopped SparkContext
14/07/20 23:56:05 ERROR OneForOneStrategy:
java.lang.NullPointerException
at 
org.apache.hadoop.mapreduce.lib.input.FileSplit.write(FileSplit.java:80)
at org.apache.hadoop.mapred.FileSplit.write(FileSplit.java:85)
at 
org.apache.hadoop.dynamodb.split.DynamoDBSplit.write(DynamoDBSplit.java:63)
at 
org.apache.hadoop.io.ObjectWritable.writeObject(ObjectWritable.java:202)
at 
org.apache.hadoop.io.ObjectWritable.writeObject(ObjectWritable.java:128)
at org.apache.hadoop.io.ObjectWritable.write(ObjectWritable.java:82)
at 
org.apache.spark.SerializableWritable.writeObject(SerializableWritable.scala:35)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
 

DROP IF EXISTS still throws exception about table does not exist?

2014-07-21 Thread Nan Zhu
Hi, all  

When I try hiveContext.hql(drop table if exists abc) where abc is a non-exist 
table  

I still received an exception about non-exist table though if exists is there

the same statement runs well in hive shell

Some feedback from Hive community is here: 
https://issues.apache.org/jira/browse/HIVE-7458

“Your are doing hiveContext.hql(DROP TABLE IF EXISTS hivetesting) in Scala 
schell of the Spark project.

What this shell is doing ? Query to remote metastore on non existing table (see 
on your provided stack).
The remote metastore throws NoSuchObjectException(message:default.hivetesting 
table not found)because Spark code call 
HiveMetaStoreClient.getTable(HiveMetaStoreClient.java:854) on non-existing 
table. It's the right behavior.
You should check on Spark code why a query is done on non existing table.


I think Spark does not handle well the IF EXISTS part of this query. Maybe you 
could fill a ticket on Spark JIRA.

BUT, it's not a bug in HIVE IMHO.”

My question is the DDL is executed by Hive itself, doesn’t it?

Best,  

--  
Nan Zhu



Re: NullPointerException When Reading Avro Sequence Files

2014-07-21 Thread Sparky
For those curious I used the JavaSparkContext and got access to an
AvroSequenceFile (wrapper around Sequence File) using the following:

file = sc.newAPIHadoopFile(hdfs path to my file,
AvroSequenceFileInputFormat.class, AvroKey.class, AvroValue.class,
new Configuration())



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/NullPointerException-when-reading-Avro-Sequence-files-tp10201p10305.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: java.lang.OutOfMemoryError: GC overhead limit exceeded

2014-07-21 Thread Abel Coronado Iruegas
Hi Yifan

This works for me:

export SPARK_JAVA_OPTS=-Xms10g -Xmx40g -XX:MaxPermSize=10g
export ADD_JARS=/home/abel/spark/MLI/target/MLI-assembly-1.0.jar
export SPARK_MEM=40g
./spark-shell

Regards


On Mon, Jul 21, 2014 at 7:48 AM, Yifan LI iamyifa...@gmail.com wrote:

 Hi,

 I am trying to load the Graphx example dataset(LiveJournal, 1.08GB)
 through *Scala Shell* on my standalone multicore machine(8 cpus, 16GB
 mem), but an OutOfMemory error was returned when below code was running,

 val graph = GraphLoader.edgeListFile(sc, path, minEdgePartitions =
 16).partitionBy(PartitionStrategy.RandomVertexCut)

 I guess I should set some parameters to JVM? like -Xmx5120m
 But how to do this in Scala Shell?
 I directly used the bin/spark-shell to start spark and seems everything
 works correctly in WebUI.

 Or, I should do parameters setting at somewhere(spark-1.0.1)?



 Best,
 Yifan LI



Is there anyone who use streaming join to filter spam as guide mentioned?

2014-07-21 Thread hawkwang

Hello guys,

I'm just trying to use spark streaming features.
I noticed that there is join example for filtering spam, so I just want 
to try.
But, nothing happens after join, the output JavaPairDStream content is 
same as before.

So, is there any examples that I can refer to?

Thanks for any suggestions.

Regards,
Hawk


Why spark-submit command hangs?

2014-07-21 Thread Sam Liu
Hi Experts,
I setup Yarn and Spark env: all services runs on a single node. And then 
submited a WordCount job using spark-submit script with 
command:./bin/spark-submit tests/wordcount-spark-scala.jar --class 
scala.spark.WordCount --num-executors 1 --driver-memory 300M --executor-memory 
300M --executor-cores 1 yarn-standalone hdfs://hostname/tmp/input 
hdfs://hostname/tmp/output
However, the command hangs and no job is submited to Yarn. Any comments? 

output:Spark assembly has been built with Hive, including Datanucleus jars on 
classpath
14/07/21 22:38:42 WARN spark.SparkConf: null jar passed to SparkContext 
constructor
14/07/21 22:38:43 INFO spark.SecurityManager: Changing view acls to: biadmin
14/07/21 22:38:43 INFO spark.SecurityManager: SecurityManager: authentication 
disabled; ui acls disabled; users with view permissions: Set(biadmin)
14/07/21 22:38:43 INFO slf4j.Slf4jLogger: Slf4jLogger started
14/07/21 22:38:43 INFO Remoting: Starting remoting
14/07/21 22:38:43 INFO Remoting: Remoting started; listening on addresses 
:[akka.tcp://spark@hostname:56903]
14/07/21 22:38:43 INFO Remoting: Remoting now listens on addresses: 
[akka.tcp://spark@hostname:56903]
14/07/21 22:38:43 INFO spark.SparkEnv: Registering MapOutputTracker
14/07/21 22:38:43 INFO spark.SparkEnv: Registering BlockManagerMaster
14/07/21 22:38:43 INFO storage.DiskBlockManager: Created local directory at 
/tmp/spark-local-20140721223843-75cd
14/07/21 22:38:43 INFO storage.MemoryStore: MemoryStore started with capacity 
180.0 MB.
14/07/21 22:38:43 INFO network.ConnectionManager: Bound socket to port 57453 
with id = ConnectionManagerId(hostname,57453)
14/07/21 22:38:43 INFO storage.BlockManagerMaster: Trying to register 
BlockManager
14/07/21 22:38:43 INFO storage.BlockManagerInfo: Registering block manager 
hostname:57453 with 180.0 MB RAM
14/07/21 22:38:43 INFO storage.BlockManagerMaster: Registered BlockManager
14/07/21 22:38:43 INFO spark.HttpServer: Starting HTTP Server
14/07/21 22:38:43 INFO server.Server: jetty-8.y.z-SNAPSHOT
14/07/21 22:38:43 INFO server.AbstractConnector: Started 
SocketConnector@0.0.0.0:19323
14/07/21 22:38:43 INFO broadcast.HttpBroadcast: Broadcast server started at 
http://9.123.99.10:19323
14/07/21 22:38:43 INFO spark.HttpFileServer: HTTP File server directory is 
/tmp/spark-e224a31b-4517-43d8-9778-4b6af07dcad2
14/07/21 22:38:43 INFO spark.HttpServer: Starting HTTP Server
14/07/21 22:38:43 INFO server.Server: jetty-8.y.z-SNAPSHOT
14/07/21 22:38:43 INFO server.AbstractConnector: Started 
SocketConnector@0.0.0.0:35420
14/07/21 22:38:43 INFO server.Server: jetty-8.y.z-SNAPSHOT
14/07/21 22:38:43 INFO server.AbstractConnector: Started 
SelectChannelConnector@0.0.0.0:4040
14/07/21 22:38:43 INFO ui.SparkUI: Started SparkUI at http://hostname:4040
14/07/21 22:38:44 WARN util.NativeCodeLoader: Unable to load native-hadoop 
library for your platform... using builtin-java classes where applicable
14/07/21 22:38:44 WARN spark.SparkContext: yarn-standalone is deprecated as 
of Spark 1.0. Use yarn-cluster instead.
14/07/21 22:38:44 INFO cluster.YarnClusterScheduler: Created 
YarnClusterScheduler
14/07/21 22:38:44 INFO yarn.ApplicationMaster$$anon$1: Adding shutdown hook for 
context org.apache.spark.SparkContext@610c610c



Thanks!

Sam Liu



Re: java.lang.OutOfMemoryError: GC overhead limit exceeded

2014-07-21 Thread Yifan LI
Thanks, Abel.


Best,
Yifan LI
On Jul 21, 2014, at 4:16 PM, Abel Coronado Iruegas acoronadoirue...@gmail.com 
wrote:

 Hi Yifan
 
 This works for me:
 
 export SPARK_JAVA_OPTS=-Xms10g -Xmx40g -XX:MaxPermSize=10g
 export ADD_JARS=/home/abel/spark/MLI/target/MLI-assembly-1.0.jar
 export SPARK_MEM=40g
 ./spark-shell 
 
 
 Regards
 
 
 On Mon, Jul 21, 2014 at 7:48 AM, Yifan LI iamyifa...@gmail.com wrote:
 Hi,
 
 I am trying to load the Graphx example dataset(LiveJournal, 1.08GB) through 
 Scala Shell on my standalone multicore machine(8 cpus, 16GB mem), but an 
 OutOfMemory error was returned when below code was running,
 
 val graph = GraphLoader.edgeListFile(sc, path, minEdgePartitions = 
 16).partitionBy(PartitionStrategy.RandomVertexCut)
 
 I guess I should set some parameters to JVM? like -Xmx5120m
 But how to do this in Scala Shell? 
 I directly used the bin/spark-shell to start spark and seems everything 
 works correctly in WebUI.
 
 Or, I should do parameters setting at somewhere(spark-1.0.1)?
 
 
 
 Best,
 Yifan LI
 



Is deferred execution of multiple RDDs ever coming?

2014-07-21 Thread Harry Brundage
Hello fellow Sparkians.

In https://groups.google.com/d/msg/spark-users/eXV7Bp3phsc/gVgm-MdeEkwJ,
Matei suggested that Spark might get deferred grouping and forced execution
of multiple jobs in an efficient way. His code sample:

rdd.reduceLater(reduceFunction1) // returns Future[ResultType1]
rdd.reduceLater(reduceFunction2) // returns Future[ResultType2]
SparkContext.force() // executes all the later operations as part of a
single optimized job

This would be immensely useful. If you ever want to do a thing where you do
two passes over the data and save two different results to disk, you either
have to cache the RDD which can be slow or deprive the processing code of
memory, or recompute the whole thing twice. If Spark was smart enough to
let you group together these operations and fork an RDD (say an
RDD.partition method), you could very easily implement these n-pass
operations across RDDs and have spark execute them efficiently.

Our use case for a feature like this is processing many records and
attaching metadata to the records during processing about our confidence in
the data-points, and then writing the data to one spot and the metadata to
another spot.

I've also wanted this for taking a dataset, profiling it for partition size
or anomalously sized partitions, and then using the profiling result to
repartition the data before saving it to disk, which I think is impossible
to do without caching right now. This use case is a bit more interesting
because information from earlier on in the DAG needs to influence later
stages, and so I suspect the answer will be cache the thing. I explicitly
don't want to cache it because I'm not really doing an iterative
algorithm where I'm willing to pay the heap and time penalties, I'm just
doing an operation which needs run-time information without a collect call.
This suggests that something like a repartition with a lazily evaluated
accumulator might work as well, but I haven't been able to figure out a
solution even with this primitive and the current APIs.

So, does anyone know if this feature might land, and if not, where to start
implementing it? What would the Python story for Futures be?


Re: Spark Streaming with long batch / window duration

2014-07-21 Thread aaronjosephs
So I think  I may end up using hourglass
(https://engineering.linkedin.com/datafu/datafus-hourglass-incremental-data-processing-hadoop)
a hadoop framework for incremental data processing, it would be very cool if
spark (not streaming ) could support something like this



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-with-long-batch-window-duration-tp10191p10311.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


gain access to persisted rdd

2014-07-21 Thread mrm
Hi,

I am using pyspark and have persisted a list of rdds within a function, but
I don't have a reference to them anymore. The RDD's are listed in the UI,
under the Storage tab, and they have names associated to them (e.g. 4). Is
it possible to access the RDD's to unpersist them?

Thanks!




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/gain-access-to-persisted-rdd-tp10313.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Give more Java Heap Memory on Standalone mode

2014-07-21 Thread Nick R. Katsipoulakis
Hello,

Currently I work on a project in which:

I spawn a standalone Apache Spark MLlib job in Standalone mode, from a
running Java Process.

In the code of the Spark Job I have the following code:

SparkConf sparkConf = new SparkConf().setAppName(SparkParallelLoad);
sparkConf.set(spark.executor.memory, 8g);
JavaSparkContext sc = new JavaSparkContext(sparkConf);

...

Also, in my ~/spark/conf/spark-env.sh I have the following values:

SPARK_WORKER_CORES=1
export SPARK_WORKER_CORES=1
SPARK_WORKER_MEMORY=2g
export SPARK_WORKER_MEMORY=2g
SPARK_DAEMON_JAVA_OPTS=-Dspark.deploy.spark.executor.memory=4g
export SPARK_DAEMON_JAVA_OPTS=-Dspark.deploy.spark.executor.memory=4g

During runtime I receive a Java OutOfMemory exception and a Core dump. My
dataset is less than 1 GB and I want to make sure that I cache it all in
memory for my ML task.

Am I increasing the JVM Heap Memory correctly? Am I doing something wrong?

Thank you,

Nick


Re: spark1.0.1 spark sql error java.lang.NoClassDefFoundError: Could not initialize class $line11.$read$

2014-07-21 Thread Yin Huai
Hi Victor,

Instead of importing sqlContext.createSchemaRDD, can you explicitly call
sqlContext.createSchemaRDD(rdd) to create a SchemaRDD?

For example,

You have a case class Record.

case class Record(data_date: String, mobile: String, create_time: String)

Then, you create a RDD[Record] and let's call it mobile.

Instead of using mobile.registerAsTable(mobile), can you try the
following snippet and see if it works?

val mobileSchemaRDD = sqlContext.createSchemaRDD(mobile)
mobileSchemaRDD.registerAsTable(mobile)

Thanks,

Yin


On Sun, Jul 20, 2014 at 11:10 PM, Victor Sheng victorsheng...@gmail.com
wrote:

 Hi,Kevin
 I tried it on spark1.0.0, it works fine.
 It's a bug in spark1.0.1 ...
 Thanks,
 Victor



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/spark1-0-1-spark-sql-error-java-lang-NoClassDefFoundError-Could-not-initialize-class-line11-read-tp10135p10288.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Spark Streaming timing considerations

2014-07-21 Thread Laeeq Ahmed


Hi TD,

Thanks for the help.

The only problem left here is that the dstreamTime contains some extra 
information which seems date i.e. 1405944367000 ms whereas my application 
timestamps are just in sec which I converted 
to ms. e.g. 2300, 2400, 2500 etc. So the filter doesn't take effect.

I was thinking to add that extra info to my Time(4000). But I am not really 
sure what it is?    


val keyAndValues = eegStreams.map(x= {
    val token = x.split( )
    ((token(0).toDouble * 
1000).toLong,token(1).toDouble)
    })


val transformed = 
keyAndValues.window(Seconds(8),Seconds(4)).transform((windowedRDD, dstreamTime) 
= {
val currentAppTimeWindowStart = dstreamTime - Time(4000)  // 
define the window over the timestamp that you want to process
val currentAppTimeWindowEnd = dstreamTime
val filteredRDD = windowedRDD.filter(r = Duration(r._1)  
currentAppTimeWindowStart  Time(r._1) = currentAppTimeWindowEnd)  
filteredRDD
 })

The sample input is as under

 AppTimestamp Datapoints  

0 -145.934066 
0.003906 0.19536 
0.007812 0.19536 
0.011719 0.19536 
0.015625 0.19536 
0.019531 0.976801 
0.023438 0.586081 
0.027344 -1.758242 
0.03125 -1.367521 
0.035156 2.930403 
0.039062 4.102564 
0.042969 3.711844 
0.046875 2.148962 
0.050781 -4.102564 
0.054688 -1.758242 
0.058594 3.711844 
0.0625 9.181929 
0.066406 11.135531 
0.070312 4.884005 
0.074219 0.976801 
0.078125 4.493284 
0.082031 11.135531 
0.085938 12.698413 
0.089844 15.824176 
0.09375 21.684982 
0.097656 22.466422 
0.101562 18.949939 
0.105469 14.652015 
0.109375 11.135531 
0.113281 1.758242 
0.117188 -6.056166 
0.121094 -0.976801 
0.125 0.19536 
0.128906 -6.837607 
0.132812 -8.400488 
0.136719 -14.261294 
0.140625 -24.810745 
0.144531 -25.592186 
0.148438 -19.73138 
0.152344 -18.559219 
0.15625 -25.201465 

Regards,
Laeeq


On Thursday, July 17, 2014 8:58 PM, Tathagata Das tathagata.das1...@gmail.com 
wrote:
 


You have to define what is the range records that needs to be filtered out in 
every windowed RDD, right? For example, when the DStream.window has data from 
from times 0 - 8 seconds by DStream time, you only want to filter out data that 
falls into say 4 - 8 seconds by application time. This latter is the 
application-level time window that you need to define in the transform 
function. What may help is that there is another version of transform which 
allows you to get the current DStream time (that is, it will give the value 
8) from which you can calculate the app-time-window 4 - 8. 


val transformed = keyAndValues.window(Seconds(8), 
Seconds(4)).transform((windowedRDD: RDD[...], dstreamTime: Time) = {
 val currentAppTimeWindowStart = dstreamTime - appTimeWindowSize
   // define the window over the timestamp that you want to process 
 val currentAppTimeWindowEnd = dstreamTime
 val filteredRDD = windowedRDD.filter(r = r._1 = currentAppTimeWindowEnd  
r._1  currentAppTimeWindowStart) // filter and retain only the records 
that fall in the current app-time window
 return filteredRDD
 })


Hope this helps!

TD



On Thu, Jul 17, 2014 at 5:54 AM, Laeeq Ahmed laeeqsp...@yahoo.com wrote:

Hi TD,


I have been able to filter the first WindowedRDD, but I am not sure how to 
make a generic filter. The larger window is 8 seconds and want to fetch 4 
second based on application-time-stamp. I have seen an earlier post which 
suggest timeStampBasedwindow but I am not sure how to make 
timestampBasedwindow in the following example. 



 val transformed = keyAndValues.window(Seconds(8), 
Seconds(4)).transform(windowedRDD = {
 //val timeStampBasedWindow = ???                    // define the window over 
the timestamp that you want to process
 val filteredRDD = windowedRDD.filter(_._1  4)     // filter and retain only 
the records that fall in the timestamp-based window
 return filteredRDD
 })

Consider the input tuples as (1,23),(1.2,34) . . . . . (3.8,54)(4,413) . . .  
whereas key is the timestamp.

Regards,
Laeeq
 





On Saturday, July 12, 2014 8:29 PM, Laeeq Ahmed laeeqsp...@yahoo.com wrote:
 


Hi,
Thanks I will try to implement it.


Regards,
Laeeq





 On Saturday, July 12, 2014 4:37 AM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:
 


This is not in the current streaming API.


Queue stream is useful for testing with generated RDDs, but not for actual 
data. For actual data stream, the slack time can be implemented by doing 
DStream.window on a larger window that take slack time in consideration, and 
then the required application-time-based-window of data filtered out. For 
example, if you want a slack time of 1 minute and batches of 10 seconds, then 
do a window operation of 70 seconds, then in each RDD filter out the records 
with the desired application time and process them. 


TD



On Fri, Jul 11, 2014 at 7:44 AM, Laeeq Ahmed laeeqsp...@yahoo.com wrote:

Hi,


In the 

Re: Spark Streaming timing considerations

2014-07-21 Thread Sean Owen
That is just standard Unix time.

1405944367000 = Sun, 09 Aug 46522 05:56:40 GMT


On Mon, Jul 21, 2014 at 5:43 PM, Laeeq Ahmed laeeqsp...@yahoo.com wrote:


 Hi TD,

 Thanks for the help.

 The only problem left here is that the dstreamTime contains some extra 
 information which seems date i.e. 1405944367000 ms whereas my application 
 timestamps are just in sec which I converted to ms. e.g. 2300, 2400, 2500 
 etc. So the filter doesn't take effect.

 I was thinking to add that extra info to my Time(4000). But I am not really 
 sure what it is?



Re: Spark Streaming timing considerations

2014-07-21 Thread Sean Owen
Uh, right. I mean:

1405944367 = Mon, 21 Jul 2014 12:06:07 GMT

On Mon, Jul 21, 2014 at 5:47 PM, Sean Owen so...@cloudera.com wrote:
 That is just standard Unix time.

 1405944367000 = Sun, 09 Aug 46522 05:56:40 GMT


 On Mon, Jul 21, 2014 at 5:43 PM, Laeeq Ahmed laeeqsp...@yahoo.com wrote:


 Hi TD,

 Thanks for the help.

 The only problem left here is that the dstreamTime contains some extra 
 information which seems date i.e. 1405944367000 ms whereas my application 
 timestamps are just in sec which I converted to ms. e.g. 2300, 2400, 2500 
 etc. So the filter doesn't take effect.

 I was thinking to add that extra info to my Time(4000). But I am not really 
 sure what it is?



Re: Give more Java Heap Memory on Standalone mode

2014-07-21 Thread Nick R. Katsipoulakis
Thank you Abel,

It seems that your advice worked. Even though I receive a message that it
is a deprecated way of defining Spark Memory (the system prompts that I
should set spark.driver.memory), the memory is increased.

Again, thank you,

Nick


On Mon, Jul 21, 2014 at 9:42 AM, Abel Coronado Iruegas 
acoronadoirue...@gmail.com wrote:

 Hi Nick

 Maybe if you use:

  export SPARK_MEM=4g






 On Mon, Jul 21, 2014 at 11:35 AM, Nick R. Katsipoulakis 
 kat...@cs.pitt.edu wrote:

 Hello,

 Currently I work on a project in which:

 I spawn a standalone Apache Spark MLlib job in Standalone mode, from a
 running Java Process.

 In the code of the Spark Job I have the following code:

 SparkConf sparkConf = new SparkConf().setAppName(SparkParallelLoad);
 sparkConf.set(spark.executor.memory, 8g);
 JavaSparkContext sc = new JavaSparkContext(sparkConf);

 ...

 Also, in my ~/spark/conf/spark-env.sh I have the following values:

 SPARK_WORKER_CORES=1
 export SPARK_WORKER_CORES=1
 SPARK_WORKER_MEMORY=2g
 export SPARK_WORKER_MEMORY=2g
 SPARK_DAEMON_JAVA_OPTS=-Dspark.deploy.spark.executor.memory=4g
 export SPARK_DAEMON_JAVA_OPTS=-Dspark.deploy.spark.executor.memory=4g

 During runtime I receive a Java OutOfMemory exception and a Core dump. My
 dataset is less than 1 GB and I want to make sure that I cache it all in
 memory for my ML task.

 Am I increasing the JVM Heap Memory correctly? Am I doing something wrong?

 Thank you,

 Nick





relationship of RDD[Array[String]] to Array[Array[String]]

2014-07-21 Thread Philip Ogren
It is really nice that Spark RDD's provide functions  that are often 
equivalent to functions found in Scala collections.  For example, I can 
call:


myArray.map(myFx)

and equivalently

myRdd.map(myFx)

Awesome!

My question is this.  Is it possible to write code that works on either 
an RDD or a local collection without having to have parallel 
implementations?  I can't tell that RDD or Array share any supertypes or 
traits by looking at the respective scaladocs. Perhaps implicit 
conversions could be used here.  What I would like to do is have a 
single function whose body is like this:


myData.map(myFx)

where myData could be an RDD[Array[String]] (for example) or an 
Array[Array[String]].


Has anyone had success doing this?

Thanks,
Philip




Re: relationship of RDD[Array[String]] to Array[Array[String]]

2014-07-21 Thread Michael Malak
It's really more of a Scala question than a Spark question, but the standard OO 
(not Scala-specific) way is to create your own custom supertype (e.g. 
MyCollectionTrait), inherited/implemented by two concrete classes (e.g. MyRDD 
and MyArray), each of which manually forwards method calls to the corresponding 
pre-existing library implementations. Writing all those forwarding method calls 
is tedious, but Scala provides at least one bit of syntactic sugar, which 
alleviates having to type in twice the parameter lists for each method:
http://stackoverflow.com/questions/8230831/is-method-parameter-forwarding-possible-in-scala

I'm not seeing a way to utilize implicit conversions in this case. Since Scala 
is statically (albeit inferred) typed, I don't see a way around having a common 
supertype.



On Monday, July 21, 2014 11:01 AM, Philip Ogren philip.og...@oracle.com wrote:
It is really nice that Spark RDD's provide functions  that are often 
equivalent to functions found in Scala collections.  For example, I can 
call:

myArray.map(myFx)

and equivalently

myRdd.map(myFx)

Awesome!

My question is this.  Is it possible to write code that works on either 
an RDD or a local collection without having to have parallel 
implementations?  I can't tell that RDD or Array share any supertypes or 
traits by looking at the respective scaladocs. Perhaps implicit 
conversions could be used here.  What I would like to do is have a 
single function whose body is like this:

myData.map(myFx)

where myData could be an RDD[Array[String]] (for example) or an 
Array[Array[String]].

Has anyone had success doing this?

Thanks,
Philip


Re: relationship of RDD[Array[String]] to Array[Array[String]]

2014-07-21 Thread andy petrella
heya,

Without a bit of gymnastic at the type level, nope. Actually RDD doesn't
share any functions with the scala lib (the simple reason I could see is
that the Spark's ones are lazy, the default implementations in Scala
aren't).

However, it'd be possible by implementing an implicit converter from a
SeqLike (f.i.) to an RDD, nonetheless it'd be cumbersome because the
overlap between the two world isn't entire (for instance, flatMap haven't
the same semantic, drop is hard, etc).

Also, it'd scary me a bit to have this kind of bazooka waiting me a next
corner, by letting me think that a iterative like process can be ran in a
distributed world :-).

OTOH, the inverse is quite easy, an implicit conv from RDD to an Array is
simply a call to collect (take care that RDD is not covariant -- I think
it'd be related to the fact that the ClassTag is needed!?)

only my .2 ¢




 aℕdy ℙetrella
about.me/noootsab
[image: aℕdy ℙetrella on about.me]

http://about.me/noootsab


On Mon, Jul 21, 2014 at 7:01 PM, Philip Ogren philip.og...@oracle.com
wrote:

 It is really nice that Spark RDD's provide functions  that are often
 equivalent to functions found in Scala collections.  For example, I can
 call:

 myArray.map(myFx)

 and equivalently

 myRdd.map(myFx)

 Awesome!

 My question is this.  Is it possible to write code that works on either an
 RDD or a local collection without having to have parallel implementations?
  I can't tell that RDD or Array share any supertypes or traits by looking
 at the respective scaladocs. Perhaps implicit conversions could be used
 here.  What I would like to do is have a single function whose body is like
 this:

 myData.map(myFx)

 where myData could be an RDD[Array[String]] (for example) or an
 Array[Array[String]].

 Has anyone had success doing this?

 Thanks,
 Philip





Re: LiveListenerBus throws exception and weird web UI bug

2014-07-21 Thread mrm
I have the same error! Did you manage to fix it?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/LiveListenerBus-throws-exception-and-weird-web-UI-bug-tp8330p10324.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark 1.0.1 SQL on 160 G parquet file (snappy compressed, made by cloudera impala), 23 core and 60G mem / node, yarn-client mode, always failed

2014-07-21 Thread Yin Huai
Instead of using union, can you try sqlContext.parquetFile(/user/
hive/warehouse/xxx_parquet.db).registerAsTable(parquetTable)?
Then, var all = sql(select some_id, some_type, some_time from
parquetTable).map(line
= (line(0), (line(1).toString, line(2).toString.substring(0, 19

Thanks,

Yin


On Sun, Jul 20, 2014 at 8:58 AM, chutium teng@gmail.com wrote:

 like this:

 val sc = new SparkContext(new SparkConf().setAppName(SLA Filter))
 val sqlContext = new org.apache.spark.sql.SQLContext(sc)
 import sqlContext._
 val suffix = args(0)
 sqlContext.parquetFile(/user/hive/warehouse/xxx_parquet.db/xx001_
 + suffix).registerAsTable(xx001)
 sqlContext.parquetFile(/user/hive/warehouse/xxx_parquet.db/xx002_
 + suffix).registerAsTable(xx002)
 ...
 ...
 var xx001 = sql(select some_id, some_type, some_time from
 xx001).map(line = (line(0), (line(1).toString,
 line(2).toString.substring(0, 19)) ) )
 var xx002 = sql(select some_id, some_type, some_time from
 xx002).map(line = (line(0), (line(1).toString,
 line(2).toString.substring(0, 19)) ) )
 ...
 ...

 var all = xx001 union xx002 ... union ...

 all..groupByKey.filter( kv = FilterSLA.filterSLA(kv._2.toSeq)
 ).saveAsTextFile(xxx)

 filterSLA will turn the input Seq[(String, String)] to Map, then check
 somethinkg like if map contains type1 and type2 and then if timestamp_type1
 - timestamp_type2  2days


 thanks




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-0-1-SQL-on-160-G-parquet-file-snappy-compressed-made-by-cloudera-impala-23-core-and-60G-mem-d-tp10254p10268.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



RE: Hive From Spark

2014-07-21 Thread Andrew Lee
Hi All,
Currently, if you are running Spark HiveContext API with Hive 0.12, it won't 
work due to the following 2 libraries which are not consistent with Hive 0.12 
and Hadoop as well. (Hive libs aligns with Hadoop libs, and as a common 
practice, they should be consistent to work inter-operable).
These are under discussion in the 2 JIRA tickets:
https://issues.apache.org/jira/browse/HIVE-7387
https://issues.apache.org/jira/browse/SPARK-2420
When I ran the command by tweaking the classpath and build for Spark 1.0.1-rc3, 
I was able to create table through HiveContext, however, when I fetch the data, 
due to incompatible API calls in Guava, it breaks. This is critical since it 
needs to map the cllumns to the RDD schema.
Hive and Hadoop are using an older version of guava libraries (11.0.1) where 
Spark Hive is using guava 14.0.1+.The community isn't willing to downgrade to 
11.0.1 which is the current version for Hadoop 2.2 and Hive 0.12. Be aware of 
protobuf version as well in Hive 0.12 (it uses protobuf 2.4).
scalascala import org.apache.spark.SparkContext
import org.apache.spark.SparkContextscala import org.apache.spark.sql.hive._
import org.apache.spark.sql.hive._scalascala val hiveContext = new 
org.apache.spark.sql.hive.HiveContext(sc)
hiveContext: org.apache.spark.sql.hive.HiveContext = 
org.apache.spark.sql.hive.HiveContext@34bee01ascalascala 
hiveContext.hql(CREATE TABLE IF NOT EXISTS src (key INT, value STRING))
res0: org.apache.spark.sql.SchemaRDD = 
SchemaRDD[0] at RDD at SchemaRDD.scala:104
== Query Plan ==
Native command: executed by Hivescala hiveContext.hql(LOAD DATA LOCAL 
INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src)
res1: org.apache.spark.sql.SchemaRDD = 
SchemaRDD[3] at RDD at SchemaRDD.scala:104
== Query Plan ==
Native command: executed by Hivescalascala // Queries are expressed in 
HiveQLscala hiveContext.hql(FROM src SELECT key, 
value).collect().foreach(println)
java.lang.NoSuchMethodError: 
com.google.common.hash.HashFunction.hashInt(I)Lcom/google/common/hash/HashCode;
at 
org.apache.spark.util.collection.OpenHashSet.org$apache$spark$util$collection$OpenHashSet$$hashcode(OpenHashSet.scala:261)
at 
org.apache.spark.util.collection.OpenHashSet$mcI$sp.getPos$mcI$sp(OpenHashSet.scala:165)
at 
org.apache.spark.util.collection.OpenHashSet$mcI$sp.contains$mcI$sp(OpenHashSet.scala:102)
at 
org.apache.spark.util.SizeEstimator$$anonfun$visitArray$2.apply$mcVI$sp(SizeEstimator.scala:214)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at org.apache.spark.util.SizeEstimator$.visitArray(SizeEstimator.scala:210)
at 
org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:169)
at 
org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:161)
at org.apache.spark.util.SizeEstimator$.estimate(SizeEstimator.scala:155)
at org.apache.spark.storage.MemoryStore.putValues(MemoryStore.scala:75)
at org.apache.spark.storage.MemoryStore.putValues(MemoryStore.scala:92)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:661)
at org.apache.spark.storage.BlockManager.put(BlockManager.scala:546)
at org.apache.spark.storage.BlockManager.putSingle(BlockManager.scala:812)
at org.apache.spark.broadcast.HttpBroadcast.init(HttpBroadcast.scala:52)
at 
org.apache.spark.broadcast.HttpBroadcastFactory.newBroadcast(HttpBroadcastFactory.scala:35)
at 
org.apache.spark.broadcast.HttpBroadcastFactory.newBroadcast(HttpBroadcastFactory.scala:29)
at 
org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
at org.apache.spark.SparkContext.broadcast(SparkContext.scala:776)
at org.apache.spark.sql.hive.HadoopTableReader.init(TableReader.scala:60)
at 
org.apache.spark.sql.hive.execution.HiveTableScan.init(HiveTableScan.scala:70)
at 
org.apache.spark.sql.hive.HiveStrategies$HiveTableScans$$anonfun$4.apply(HiveStrategies.scala:73)
at 
org.apache.spark.sql.hive.HiveStrategies$HiveTableScans$$anonfun$4.apply(HiveStrategies.scala:73)
at 
org.apache.spark.sql.SQLContext$SparkPlanner.pruneFilterProject(SQLContext.scala:280)
at 
org.apache.spark.sql.hive.HiveStrategies$HiveTableScans$.apply(HiveStrategies.scala:69)
at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59)
at 
org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:316)
at 
org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:316)
at 
org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:319)
at 
org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:319)
at org.apache.spark.sql.SchemaRDD.collect(SchemaRDD.scala:420)
at 

Re: Hive From Spark

2014-07-21 Thread Sean Owen
I haven't seen anyone actively 'unwilling' -- I hope not. See
discussion at https://issues.apache.org/jira/browse/SPARK-2420 where I
sketch what a downgrade means. I think it just hasn't gotten a looking
over.

Contrary to what I thought earlier, the conflict does in fact cause
problems in theory, and you show it causes a problem in practice. Not
to mention it causes issues for Hive-on-Spark now.

On Mon, Jul 21, 2014 at 6:27 PM, Andrew Lee alee...@hotmail.com wrote:
 Hive and Hadoop are using an older version of guava libraries (11.0.1) where
 Spark Hive is using guava 14.0.1+.
 The community isn't willing to downgrade to 11.0.1 which is the current
 version for Hadoop 2.2 and Hive 0.12.


Re: Why spark-submit command hangs?

2014-07-21 Thread Andrew Or
Hi Sam,

Did you specify the MASTER in your spark-env.sh? I ask because I didn't see
a --master in your launch command. Also, your app seems to take in a master
(yarn-standalone). This is not exactly correct because by the time the
SparkContext is launched locally, which is the default, it is too late to
use yarn-cluster mode by definition, since the driver should launched
within one of the containers on the worker machines.

I would suggest the following:
- change your application to not take in the Spark master as a command line
argument
- use yarn-cluster instead of yarn-standalone (which is deprecated)
- add --master yarn-cluster in your spark-submit command

Another worrying thing is the warning from your logs:
14/07/21 22:38:42 WARN spark.SparkConf: null jar passed to SparkContext
constructor

How are you creating your SparkContext?

Andrew



2014-07-21 7:47 GMT-07:00 Sam Liu liuqiyun_sp...@sina.com:

 Hi Experts,


 I setup Yarn and Spark env: all services runs on a single node. And then
 submited a WordCount job using spark-submit script with command:

 ./bin/spark-submit tests/wordcount-spark-scala.jar --class
 scala.spark.WordCount --num-executors 1 --driver-memory 300M
 --executor-memory 300M --executor-cores 1 yarn-standalone
 hdfs://hostname/tmp/input hdfs://hostname/tmp/output


 However, the command hangs and no job is submited to Yarn. Any comments?


 output:

 Spark assembly has been built with Hive, including Datanucleus jars on
 classpath
 14/07/21 22:38:42 WARN spark.SparkConf: null jar passed to SparkContext
 constructor
 14/07/21 22:38:43 INFO spark.SecurityManager: Changing view acls to:
 biadmin
 14/07/21 22:38:43 INFO spark.SecurityManager: SecurityManager:
 authentication disabled; ui acls disabled; users with view permissions:
 Set(biadmin)
 14/07/21 22:38:43 INFO slf4j.Slf4jLogger: Slf4jLogger started
 14/07/21 22:38:43 INFO Remoting: Starting remoting
 14/07/21 22:38:43 INFO Remoting: Remoting started; listening on addresses
 :[akka.tcp://spark@hostname:56903]
 14/07/21 22:38:43 INFO Remoting: Remoting now listens on addresses:
 [akka.tcp://spark@hostname:56903]
 14/07/21 22:38:43 INFO spark.SparkEnv: Registering MapOutputTracker
 14/07/21 22:38:43 INFO spark.SparkEnv: Registering BlockManagerMaster
 14/07/21 22:38:43 INFO storage.DiskBlockManager: Created local directory
 at /tmp/spark-local-20140721223843-75cd
 14/07/21 22:38:43 INFO storage.MemoryStore: MemoryStore started with
 capacity 180.0 MB.
 14/07/21 22:38:43 INFO network.ConnectionManager: Bound socket to port
 57453 with id = ConnectionManagerId(hostname,57453)
 14/07/21 22:38:43 INFO storage.BlockManagerMaster: Trying to register
 BlockManager
 14/07/21 22:38:43 INFO storage.BlockManagerInfo: Registering block manager
 hostname:57453 with 180.0 MB RAM
 14/07/21 22:38:43 INFO storage.BlockManagerMaster: Registered BlockManager
 14/07/21 22:38:43 INFO spark.HttpServer: Starting HTTP Server
 14/07/21 22:38:43 INFO server.Server: jetty-8.y.z-SNAPSHOT
 14/07/21 22:38:43 INFO server.AbstractConnector: Started
 SocketConnector@0.0.0.0:19323
 14/07/21 22:38:43 INFO broadcast.HttpBroadcast: Broadcast server started
 at http://9.123.99.10:19323
 14/07/21 22:38:43 INFO spark.HttpFileServer: HTTP File server directory is
 /tmp/spark-e224a31b-4517-43d8-9778-4b6af07dcad2
 14/07/21 22:38:43 INFO spark.HttpServer: Starting HTTP Server
 14/07/21 22:38:43 INFO server.Server: jetty-8.y.z-SNAPSHOT
 14/07/21 22:38:43 INFO server.AbstractConnector: Started
 SocketConnector@0.0.0.0:35420
 14/07/21 22:38:43 INFO server.Server: jetty-8.y.z-SNAPSHOT
 14/07/21 22:38:43 INFO server.AbstractConnector: Started
 SelectChannelConnector@0.0.0.0:4040
 14/07/21 22:38:43 INFO ui.SparkUI: Started SparkUI at http://hostname:4040
 14/07/21 22:38:44 WARN util.NativeCodeLoader: Unable to load native-hadoop
 library for your platform... using builtin-java classes where applicable
 14/07/21 22:38:44 WARN spark.SparkContext: yarn-standalone is deprecated
 as of Spark 1.0. Use yarn-cluster instead.
 14/07/21 22:38:44 INFO cluster.YarnClusterScheduler: Created
 YarnClusterScheduler
 14/07/21 22:38:44 INFO yarn.ApplicationMaster$$anon$1: Adding shutdown
 hook for context org.apache.spark.SparkContext@610c610c




 Thanks!
 
 Sam Liu




Re: relationship of RDD[Array[String]] to Array[Array[String]]

2014-07-21 Thread Philip Ogren

Thanks Michael,

That is one solution that I had thought of.  It seems like a bit of 
overkill for the few methods I want to do this for - but I will think 
about it.  I guess I was hoping that I was missing something more 
obvious/easier.


Philip

On 07/21/2014 11:20 AM, Michael Malak wrote:

It's really more of a Scala question than a Spark question, but the standard OO 
(not Scala-specific) way is to create your own custom supertype (e.g. 
MyCollectionTrait), inherited/implemented by two concrete classes (e.g. MyRDD 
and MyArray), each of which manually forwards method calls to the corresponding 
pre-existing library implementations. Writing all those forwarding method calls 
is tedious, but Scala provides at least one bit of syntactic sugar, which 
alleviates having to type in twice the parameter lists for each method:
http://stackoverflow.com/questions/8230831/is-method-parameter-forwarding-possible-in-scala

I'm not seeing a way to utilize implicit conversions in this case. Since Scala 
is statically (albeit inferred) typed, I don't see a way around having a common 
supertype.



On Monday, July 21, 2014 11:01 AM, Philip Ogren philip.og...@oracle.com wrote:
It is really nice that Spark RDD's provide functions  that are often
equivalent to functions found in Scala collections.  For example, I can
call:

myArray.map(myFx)

and equivalently

myRdd.map(myFx)

Awesome!

My question is this.  Is it possible to write code that works on either
an RDD or a local collection without having to have parallel
implementations?  I can't tell that RDD or Array share any supertypes or
traits by looking at the respective scaladocs. Perhaps implicit
conversions could be used here.  What I would like to do is have a
single function whose body is like this:

myData.map(myFx)

where myData could be an RDD[Array[String]] (for example) or an
Array[Array[String]].

Has anyone had success doing this?

Thanks,
Philip




Re: LiveListenerBus throws exception and weird web UI bug

2014-07-21 Thread Andrew Or
Hi all,

This error happens because we receive a completed event for a particular
stage that we don't know about, i.e. a stage we haven't received a
submitted event for. The root cause of this, as Baoxu explained, is
usually because the event queue is full and the listener begins to drop
events. In this case we are dropping the submitted event. This particular
exception should be fixed in the latest master, as we now check for whether
the key exists before indexing directly into it. Unfortunately, this is not
in Spark 1.0.1, but will be fixed in Spark 1.1. There is currently no
bullet-proof workaround for this issue, but you might try to reduce the
number of concurrently running tasks (partitions) to avoid emitting too
many events. The root cause of the listener queue taking too much time to
process events is recorded in SPARK-2316, which we also intend to fix by
Spark 1.1.

Andrew


2014-07-21 10:23 GMT-07:00 mrm ma...@skimlinks.com:

 I have the same error! Did you manage to fix it?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/LiveListenerBus-throws-exception-and-weird-web-UI-bug-tp8330p10324.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



RDD pipe partitionwise

2014-07-21 Thread Jaonary Rabarisoa
Dear all,

Is there any example of mapPartitions that fork external process or how to
make RDD.pipe working on every data of a partition ?

Cheers,

Jaonary


Re: gain access to persisted rdd

2014-07-21 Thread Andrew Or
Hi Maria,

If you don't have a reference to a persisted RDD, it will be automatically
unpersisted on the next GC by the ContextCleaner. This is implemented for
scala, but should still work in python because python uses reference
counting to clean up objects that are no longer strongly referenced.

Andrew


2014-07-21 8:37 GMT-07:00 mrm ma...@skimlinks.com:

 Hi,

 I am using pyspark and have persisted a list of rdds within a function, but
 I don't have a reference to them anymore. The RDD's are listed in the UI,
 under the Storage tab, and they have names associated to them (e.g. 4).
 Is
 it possible to access the RDD's to unpersist them?

 Thanks!




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/gain-access-to-persisted-rdd-tp10313.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Give more Java Heap Memory on Standalone mode

2014-07-21 Thread Andrew Or
Hi Nick and Abel,

Looks like you are requesting 8g for your executors, but only allowing 2g
on the workers. You should set SPARK_WORKER_MEMORY to at least 8g if you
intend to use that much memory in your application. Also, you shouldn't
have to set SPARK_DAEMON_JAVA_OPTS; you can just set
spark.executor.memory as you have done so in your SparkConf. As you may
have already noticed, SPARK_MEM is deprecated in favor of
spark.executor.memory and spark.driver.memory. If you are running Spark
1.0+, you can use spark-submit with the --executor-memory and
--driver-memory to set this on the command line.

Andrew


2014-07-21 10:01 GMT-07:00 Nick R. Katsipoulakis kat...@cs.pitt.edu:

 Thank you Abel,

 It seems that your advice worked. Even though I receive a message that it
 is a deprecated way of defining Spark Memory (the system prompts that I
 should set spark.driver.memory), the memory is increased.

 Again, thank you,

 Nick


 On Mon, Jul 21, 2014 at 9:42 AM, Abel Coronado Iruegas 
 acoronadoirue...@gmail.com wrote:

 Hi Nick

 Maybe if you use:

  export SPARK_MEM=4g






 On Mon, Jul 21, 2014 at 11:35 AM, Nick R. Katsipoulakis 
 kat...@cs.pitt.edu wrote:

 Hello,

 Currently I work on a project in which:

 I spawn a standalone Apache Spark MLlib job in Standalone mode, from a
 running Java Process.

 In the code of the Spark Job I have the following code:

 SparkConf sparkConf = new SparkConf().setAppName(SparkParallelLoad);
 sparkConf.set(spark.executor.memory, 8g);
 JavaSparkContext sc = new JavaSparkContext(sparkConf);

 ...

 Also, in my ~/spark/conf/spark-env.sh I have the following values:

 SPARK_WORKER_CORES=1
 export SPARK_WORKER_CORES=1
 SPARK_WORKER_MEMORY=2g
 export SPARK_WORKER_MEMORY=2g
 SPARK_DAEMON_JAVA_OPTS=-Dspark.deploy.spark.executor.memory=4g
 export SPARK_DAEMON_JAVA_OPTS=-Dspark.deploy.spark.executor.memory=4g

 During runtime I receive a Java OutOfMemory exception and a Core dump.
 My dataset is less than 1 GB and I want to make sure that I cache it all in
 memory for my ML task.

 Am I increasing the JVM Heap Memory correctly? Am I doing something
 wrong?

 Thank you,

 Nick






Re: Spark 1.0.1 SQL on 160 G parquet file (snappy compressed, made by cloudera impala), 23 core and 60G mem / node, yarn-client mode, always failed

2014-07-21 Thread chutium
Hi,

unfortunately it is not so straightforward

xxx_parquet.db

is a folder of managed database created by hive/impala, so, every sub
element in it is a table in hive/impala, they are folders in HDFS, and each
table has different schema, and in its folder there are one or more parquet
files.

that means

xx001_suffix
xx002_suffix

are folders, there are some parquet files like

xx001_suffix/parquet_file1_with_schema1

xx002_suffix/parquet_file1_with_schema2
xx002_suffix/parquet_file2_with_schema2

it seems only union can do this job~

Nonetheless, thank you very much, maybe the only reason is that spark eating
up too much memory...



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-0-1-SQL-on-160-G-parquet-file-snappy-compressed-made-by-cloudera-impala-23-core-and-60G-mem-d-tp10254p10335.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Caching issue with msg: RDD block could not be dropped from memory as it does not exist

2014-07-21 Thread Andrew Or
Hi Rindra,

Depending on what you're doing with your groupBy, you may end up inflating
your data quite a bit. Even if your machine has 16G, by default spark-shell
only uses 512M, and the amount used for storing blocks is only 60% of that
(spark.storage.memoryFraction), so this space becomes ~300M. This is still
many multiples of the size of your dataset, but not by orders of magnitude.
If you are running Spark 1.0+, you can increase the amount of memory used
by spark-shell by adding --driver-memory 1g as a command line argument in
local mode, or --executor-memory 1g in any other mode.

(Also, it seems that you set your log level to WARN. The cause is most
probably because the cache is not big enough, but setting the log level to
INFO will provide you with more information on the exact sizes that are
being used by the storage and the blocks).

Andrew


2014-07-19 13:01 GMT-07:00 rindra rindra@gmail.com:

 Hi,

 I am working with a small dataset about 13Mbyte on the spark-shell. After
 doing a
 groupBy on the RDD, I wanted to cache RDD in memory but I keep getting
 these warnings:

 scala rdd.cache()
 res28: rdd.type = MappedRDD[63] at repartition at console:28


 scala rdd.count()
 14/07/19 12:45:18 WARN BlockManager: Block rdd_63_82 could not be dropped
 from memory as it does not exist
 14/07/19 12:45:18 WARN BlockManager: Putting block rdd_63_82 failed
 14/07/19 12:45:18 WARN BlockManager: Block rdd_63_40 could not be dropped
 from memory as it does not exist
 14/07/19 12:45:18 WARN BlockManager: Putting block rdd_63_40 failed
 res29: Long = 5

 It seems that I could not cache the data in memory even though my local
 machine has
 16Gb RAM and the data is only 13MB with 100 partitions size.

 How to prevent this caching issue from happening? Thanks.

 Rindra



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Caching-issue-with-msg-RDD-block-could-not-be-dropped-from-memory-as-it-does-not-exist-tp10248.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: gain access to persisted rdd

2014-07-21 Thread chutium
but at least if user want to access the persisted RDDs, they can use
sc.getPersistentRDDs  in the same context.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/gain-access-to-persisted-rdd-tp10313p10337.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: spark streaming rate limiting from kafka

2014-07-21 Thread Bill Jay
Hi Tathagata,

I am currentlycreating multiple DStream to consumefrom different topics.
How can I let each consumer consume from different partitions. I find the
following parameters from Spark API:

createStream[K, V, U : Decoder[_], T : Decoder[_]](jssc:
JavaStreamingContext
https://spark.apache.org/docs/1.0.0/api/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.html
, keyTypeClass: Class[K], valueTypeClass: Class[V],keyDecoderClass: Class[U]
, valueDecoderClass: Class[T], kafkaParams: Map[String, String], topics: Map
[String, Integer],storageLevel: StorageLevel
https://spark.apache.org/docs/1.0.0/api/scala/org/apache/spark/storage/StorageLevel.html
): JavaPairReceiverInputDStream
https://spark.apache.org/docs/1.0.0/api/scala/org/apache/spark/streaming/api/java/JavaPairReceiverInputDStream.html
[K, V]

Create an input stream that pulls messages form a Kafka Broker.




The topics parameter is:
*Map of (topic_name - numPartitions) to consume. Each partition is
consumed in its own thread*

Does numPartitions mean the total number of partitions to consume from
topic_name or the index of the partition? How can we specify for each
createStream which partition of the Kafka topic to consume? I think if so,
I will get a lot of parallelism from the source of the data. Thanks!

Bill

On Thu, Jul 17, 2014 at 6:21 PM, Tathagata Das tathagata.das1...@gmail.com
wrote:

 You can create multiple kafka stream to partition your topics across them,
 which will run multiple receivers or multiple executors. This is covered in
 the Spark streaming guide.
 http://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving

 And for the purpose of this thread, to answer the original question, we now
 have the ability
 https://issues.apache.org/jira/browse/SPARK-1854?jql=project%20%3D%20SPARK%20AND%20resolution%20%3D%20Unresolved%20AND%20component%20%3D%20Streaming%20ORDER%20BY%20priority%20DESC
 to limit the receiving rate. Its in the master branch, and will be
 available in Spark 1.1. It basically sets the limits at the receiver level
 (so applies to all sources) on what is the max records per second that can
 will be received by the receiver.

 TD


 On Thu, Jul 17, 2014 at 6:15 PM, Tobias Pfeiffer t...@preferred.jp wrote:

 Bill,

 are you saying, after repartition(400), you have 400 partitions on one
 host and the other hosts receive nothing of the data?

 Tobias


 On Fri, Jul 18, 2014 at 8:11 AM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 I also have an issue consuming from Kafka. When I consume from Kafka,
 there are always a single executor working on this job. Even I use
 repartition, it seems that there is still a single executor. Does anyone
 has an idea how to add parallelism to this job?



 On Thu, Jul 17, 2014 at 2:06 PM, Chen Song chen.song...@gmail.com
 wrote:

 Thanks Luis and Tobias.


 On Tue, Jul 1, 2014 at 11:39 PM, Tobias Pfeiffer t...@preferred.jp
 wrote:

 Hi,

 On Wed, Jul 2, 2014 at 1:57 AM, Chen Song chen.song...@gmail.com
 wrote:

 * Is there a way to control how far Kafka Dstream can read on
 topic-partition (via offset for example). By setting this to a small
 number, it will force DStream to read less data initially.


 Please see the post at

 http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201406.mbox/%3ccaph-c_m2ppurjx-n_tehh0bvqe_6la-rvgtrf1k-lwrmme+...@mail.gmail.com%3E
 Kafka's auto.offset.reset parameter may be what you are looking for.

 Tobias




 --
 Chen Song







Re: can't get jobs to run on cluster (enough memory and cpus are available on worker)

2014-07-21 Thread Matt Work Coarr
I got this working by having our sysadmin update our security group to
allow incoming traffic from the local subnet on ports 1-65535.  I'm not
sure if there's a more specific range I could have used, but so far,
everything is running!

Thanks for all the responses Marcelo and Andrew!!

Matt


On Thu, Jul 17, 2014 at 9:10 PM, Andrew Or and...@databricks.com wrote:

 Hi Matt,

 The security group shouldn't be an issue; the ports listed in
 `spark_ec2.py` are only for communication with the outside world.

 How did you launch your application? I notice you did not launch your
 driver from your Master node. What happens if you did? Another thing is
 that there seems to be some inconsistency or missing pieces in the logs you
 posted. After an executor says driver disassociated, what happens in the
 driver logs? Is an exception thrown or something?

 It would be useful if you could also post your conf/spark-env.sh.

 Andrew


 2014-07-17 14:11 GMT-07:00 Marcelo Vanzin van...@cloudera.com:

 Hi Matt,

 I'm not very familiar with setup on ec2; the closest I can point you
 at is to look at the launch_cluster in ec2/spark_ec2.py, where the
 ports seem to be configured.


 On Thu, Jul 17, 2014 at 1:29 PM, Matt Work Coarr
 mattcoarr.w...@gmail.com wrote:
  Thanks Marcelo!  This is a huge help!!
 
  Looking at the executor logs (in a vanilla spark install, I'm finding
 them
  in $SPARK_HOME/work/*)...
 
  It launches the executor, but it looks like the
 CoarseGrainedExecutorBackend
  is having trouble talking to the driver (exactly what you said!!!).
 
  Do you know what the range of random ports that is used for the the
  executor-to-driver?  Is that range adjustable?  Any config setting or
  environment variable?
 
  I manually setup my ec2 security group to include all the ports that the
  spark ec2 script ($SPARK_HOME/ec2/spark_ec2.py) sets up in it's security
  groups.  They included (for those listed above 1):
  1
  50060
  50070
  50075
  60060
  60070
  60075
 
  Obviously I'll need to make some adjustments to my EC2 security group!
  Just
  need to figure out exactly what should be in there.  To keep things
 simple,
  I just have one security group for the master, slaves, and the driver
  machine.
 
  In listing the port ranges in my current security group I looked at the
  ports that spark_ec2.py sets up as well as the ports listed in the
 spark
  standalone mode documentation page under configuring ports for network
  security:
 
  http://spark.apache.org/docs/latest/spark-standalone.html
 
 
  Here are the relevant fragments from the executor log:
 
  Spark Executor Command: /cask/jdk/bin/java -cp
 
 ::/cask/spark/conf:/cask/spark/lib/spark-assembly-1.0.0-hadoop2.2.0.jar:/cask/spark/lib/datanucleus-api-jdo-3.
 
 
 2.1.jar:/cask/spark/lib/datanucleus-rdbms-3.2.1.jar:/cask/spark/lib/datanucleus-core-3.2.2.jar
  -XX:MaxPermSize=128m -Dspark.akka.frameSize=100 -Dspark.akka.
 
  frameSize=100 -Xms512M -Xmx512M
  org.apache.spark.executor.CoarseGrainedExecutorBackend
  akka.tcp://spark@ip-10-202-11-191.ec2.internal:46787/user/CoarseGra
 
  inedScheduler 0 ip-10-202-8-45.ec2.internal 8
  akka.tcp://sparkWorker@ip-10-202-8-45.ec2.internal:7101/user/Worker
  app-20140717195146-
 
  
 
  ...
 
  14/07/17 19:51:47 DEBUG NativeCodeLoader: Trying to load the
 custom-built
  native-hadoop library...
 
  14/07/17 19:51:47 DEBUG NativeCodeLoader: Failed to load native-hadoop
 with
  error: java.lang.UnsatisfiedLinkError: no hadoop in java.library.path
 
  14/07/17 19:51:47 DEBUG NativeCodeLoader:
 
 java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
 
  14/07/17 19:51:47 WARN NativeCodeLoader: Unable to load native-hadoop
  library for your platform... using builtin-java classes where applicable
 
  14/07/17 19:51:47 DEBUG JniBasedUnixGroupsMappingWithFallback: Falling
 back
  to shell based
 
  14/07/17 19:51:47 DEBUG JniBasedUnixGroupsMappingWithFallback: Group
 mapping
  impl=org.apache.hadoop.security.ShellBasedUnixGroupsMapping
 
  14/07/17 19:51:48 DEBUG Groups: Group mapping
  impl=org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback;
  cacheTimeout=30
 
  14/07/17 19:51:48 DEBUG SparkHadoopUtil: running as user: ec2-user
 
  ...
 
 
  14/07/17 19:51:48 INFO CoarseGrainedExecutorBackend: Connecting to
 driver:
  akka.tcp://spark@ip-10-202-11-191.ec2.internal
 :46787/user/CoarseGrainedScheduler
 
  14/07/17 19:51:48 INFO WorkerWatcher: Connecting to worker
  akka.tcp://sparkWorker@ip-10-202-8-45.ec2.internal:7101/user/Worker
 
  14/07/17 19:51:49 INFO WorkerWatcher: Successfully connected to
  akka.tcp://sparkWorker@ip-10-202-8-45.ec2.internal:7101/user/Worker
 
  14/07/17 19:53:29 ERROR CoarseGrainedExecutorBackend: Driver
 Disassociated
  [akka.tcp://sparkExecutor@ip-10-202-8-45.ec2.internal:55670] -
  [akka.tcp://spark@ip-10-202-11-191.ec2.internal:46787] disassociated!
  Shutting down.
 
 
  Thanks a bunch!
  Matt

launching a spark cluster in ec2 from within an application

2014-07-21 Thread M@
I would like to programmatically start a spark cluster in ec2 from another
app running in ec2, run my job and then destroy the cluster.  I can launch a
spark EMR cluster easily enough using the SDK however I ran into two
problems:
1) I was only able to retrieve the address of the master node from the
console, not via the SDK.
2) I was not able to connect to the master from my app after setting
spark://public_dns:7077 as the master in the SparkConf (where public_dns
is the address listed for the cluster on the EMR console page in amazon).  I
kept getting all masters are unresponsive errors.
In addition, the amazon docs only speak of running spark jobs in emr by
ssh'ing to the master, launching a spark shell and running the jobs from
there.  Is it even possible to do programmatically from another app or must
you login into the master and run jobs from the shell if you want to use
spark in amazon EMR?

The second approach I tried was simply calling the spark-ec2 script from my
app passing the same parameters that I use to launch the cluster manually
from the cli.  This failed because the ec2.connect call returns None when
called from my app (scala/java on play) whereas it works perfectly when
called from the cli.
Is there a recommended method to launch ec2 clusters dynamically from within
an app running in ec2?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/launching-a-spark-cluster-in-ec2-from-within-an-application-tp10340.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: LabeledPoint with weight

2014-07-21 Thread Xiangrui Meng
This is a useful feature but it may be hard to have it in v1.1 due to
limited time. Hopefully, we can support it in v1.2. -Xiangrui

On Mon, Jul 21, 2014 at 12:58 AM, Jiusheng Chen chenjiush...@gmail.com wrote:
 It seems MLlib right now doesn't support weighted training, training samples
 have equal importance. Weighted training can be very useful to reduce data
 size and speed up training.

 Do you have plan to support it in future? The data format will be something
 like:

 label:*weight * index1:value1 index2:value2 ...



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/LabeledPoint-with-weight-tp10291.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark 1.0.1 SQL on 160 G parquet file (snappy compressed, made by cloudera impala), 23 core and 60G mem / node, yarn-client mode, always failed

2014-07-21 Thread Aaron Davidson
What's the exception you're seeing? Is it an OOM?


On Mon, Jul 21, 2014 at 11:20 AM, chutium teng@gmail.com wrote:

 Hi,

 unfortunately it is not so straightforward

 xxx_parquet.db

 is a folder of managed database created by hive/impala, so, every sub
 element in it is a table in hive/impala, they are folders in HDFS, and each
 table has different schema, and in its folder there are one or more parquet
 files.

 that means

 xx001_suffix
 xx002_suffix

 are folders, there are some parquet files like

 xx001_suffix/parquet_file1_with_schema1

 xx002_suffix/parquet_file1_with_schema2
 xx002_suffix/parquet_file2_with_schema2

 it seems only union can do this job~

 Nonetheless, thank you very much, maybe the only reason is that spark
 eating
 up too much memory...



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-0-1-SQL-on-160-G-parquet-file-snappy-compressed-made-by-cloudera-impala-23-core-and-60G-mem-d-tp10254p10335.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Spark job tracker.

2014-07-21 Thread abhiguruvayya
An also i am facing one issue. If i run the program in yarn-cluster mode it
works absolutely fine but if i change it to yarn-client mode i get this
below error.

Application application_1405471266091_0055 failed 2 times due to AM
Container for appattempt_1405471266091_0055_02 exited with exitCode:
-1000 due to: File does not exist:
/user/hadoop/.sparkStaging/application_1405471266091_0055/commons-math3-3.0.jar

I have commons-math3-3.0.jar in the class path and i am loading it to
staging also.

yarn.Client: Uploading file:***/commons-math3-3.0.jar to
***/user/hadoop/.sparkStaging/application_1405471266091_0055/commons-math3-3.0.jar.
from the logs.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-job-tracker-tp8367p10343.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark 1.0.1 SQL on 160 G parquet file (snappy compressed, made by cloudera impala), 23 core and 60G mem / node, yarn-client mode, always failed

2014-07-21 Thread chutium
no, something like this

14/07/20 00:19:29 ERROR cluster.YarnClientClusterScheduler: Lost executor 2
on 02.xxx: remote Akka client disassociated

...
...

14/07/20 00:21:13 WARN scheduler.TaskSetManager: Lost TID 832 (task 1.2:186)
14/07/20 00:21:13 WARN scheduler.TaskSetManager: Loss was due to
java.io.IOException
java.io.IOException: Filesystem closed
at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:703)
at
org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:779)
at
org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:840)
at java.io.DataInputStream.readFully(DataInputStream.java:195)
at java.io.DataInputStream.readFully(DataInputStream.java:169)
at
parquet.hadoop.ParquetFileReader$ConsecutiveChunkList.readAll(ParquetFileReader.java:599)
at
parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:360)
at
parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:100)
at
parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:172)
at
parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:130)
at
org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:122)
at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.Task.run(Task.scala:51)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)


ulimit is increased




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-0-1-SQL-on-160-G-parquet-file-snappy-compressed-made-by-cloudera-impala-23-core-and-60G-mem-d-tp10254p10344.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Does spark streaming fit to our application

2014-07-21 Thread srinivas
Hi,
  Our application is required to do some aggregations on data that will be
coming as a stream for over two months. I would like to know if spark
streaming will be suitable for our requirement. After going through some
documentation and videos i think we can do aggregations on data based on
window timeframe that will be in minutes. I am not sure if we can cache that
data or we can store the data in hdfs for further calculations in spark
streaming. Please help!!

Thanks,
-Srini. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Does-spark-streaming-fit-to-our-application-tp10345.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


broadcast variable get cleaned by ContextCleaner unexpectedly ?

2014-07-21 Thread Nan Zhu
Hi, all  

When I run some Spark application (actually unit test of the application in 
Jenkins ), I found that I always hit the FileNotFoundException when reading 
broadcast variable  

The program itself works well, except the unit test

Here is the example log:


14/07/21 19:49:13 INFO Executor: Running task ID 4 14/07/21 19:49:13 INFO 
DAGScheduler: Completed ResultTask(0, 2) 14/07/21 19:49:13 INFO TaskSetManager: 
Finished TID 2 in 95 ms on localhost (progress: 3/106) 14/07/21 19:49:13 INFO 
TableOutputFormat: Created table instance for hdfstest_customers 14/07/21 
19:49:13 INFO Executor: Serialized size of result for 3 is 596 14/07/21 
19:49:13 INFO Executor: Sending result for 3 directly to driver 14/07/21 
19:49:13 INFO BlockManager: Found block broadcast_0 locally 14/07/21 19:49:13 
INFO Executor: Finished task ID 3 14/07/21 19:49:13 INFO TaskSetManager: 
Starting task 0.0:5 as TID 5 on executor localhost: localhost (PROCESS_LOCAL) 
14/07/21 19:49:13 INFO TaskSetManager: Serialized task 0.0:5 as 11885 bytes in 
0 ms 14/07/21 19:49:13 INFO Executor: Running task ID 5 14/07/21 19:49:13 INFO 
BlockManager: Removing broadcast 0 14/07/21 19:49:13 INFO DAGScheduler: 
Completed ResultTask(0, 3) 14/07/21 19:49:13 INFO ContextCleaner: Cleaned 
broadcast 0 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 3 in 97 ms on 
localhost (progress: 4/106) 14/07/21 19:49:13 INFO BlockManager: Found block 
broadcast_0 locally 14/07/21 19:49:13 INFO BlockManager: Removing block 
broadcast_0 14/07/21 19:49:13 INFO MemoryStore: Block broadcast_0 of size 
202564 dropped from memory (free 886623436) 14/07/21 19:49:13 INFO 
ContextCleaner: Cleaned shuffle 0 14/07/21 19:49:13 INFO ShuffleBlockManager: 
Deleted all files for shuffle 0 14/07/21 19:49:13 INFO HadoopRDD: Input split: 
hdfs://172.31.34.184:9000/etltest/hdfsData/customer.csv:25+5 14/07/21 19:49:13 
INFO HadoopRDD: Input split: 
hdfs://172.31.34.184:9000/etltest/hdfsData/customer.csv:20+5 14/07/21 19:49:13 
INFO TableOutputFormat: Created table instance for hdfstest_customers 14/07/21 
19:49:13 INFO Executor: Serialized size of result for 4 is 596 14/07/21 
19:49:13 INFO Executor: Sending result for 4 directly to driver 14/07/21 
19:49:13 INFO Executor: Finished task ID 4 14/07/21 19:49:13 INFO 
TaskSetManager: Starting task 0.0:6 as TID 6 on executor localhost: localhost 
(PROCESS_LOCAL) 14/07/21 19:49:13 INFO TaskSetManager: Serialized task 0.0:6 as 
11885 bytes in 0 ms 14/07/21 19:49:13 INFO Executor: Running task ID 6 14/07/21 
19:49:13 INFO DAGScheduler: Completed ResultTask(0, 4) 14/07/21 19:49:13 INFO 
TaskSetManager: Finished TID 4 in 80 ms on localhost (progress: 5/106) 14/07/21 
19:49:13 INFO TableOutputFormat: Created table instance for hdfstest_customers 
14/07/21 19:49:13 INFO Executor: Serialized size of result for 5 is 596 
14/07/21 19:49:13 INFO Executor: Sending result for 5 directly to driver 
14/07/21 19:49:13 INFO Executor: Finished task ID 5 14/07/21 19:49:13 INFO 
TaskSetManager: Starting task 0.0:7 as TID 7 on executor localhost: localhost 
(PROCESS_LOCAL) 14/07/21 19:49:13 INFO TaskSetManager: Serialized task 0.0:7 as 
11885 bytes in 0 ms 14/07/21 19:49:13 INFO Executor: Running task ID 7 14/07/21 
19:49:13 INFO DAGScheduler: Completed ResultTask(0, 5) 14/07/21 19:49:13 INFO 
TaskSetManager: Finished TID 5 in 77 ms on localhost (progress: 6/106) 14/07/21 
19:49:13 INFO HttpBroadcast: Started reading broadcast variable 0 14/07/21 
19:49:13 INFO HttpBroadcast: Started reading broadcast variable 0 14/07/21 
19:49:13 ERROR Executor: Exception in task ID 6 java.io.FileNotFoundException: 
http://172.31.34.174:52070/broadcast_0 at 
sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1624)
 at org.apache.spark.broadcast.HttpBroadcast$.read(HttpBroadcast.scala:196) at 
org.apache.spark.broadcast.HttpBroadcast.readObject(HttpBroadcast.scala:89) at 
sun.reflect.GeneratedMethodAccessor24.invoke(Unknown Source) at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606) at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at 

Re: spark sql left join gives KryoException: Buffer overflow

2014-07-21 Thread Michael Armbrust

 When SPARK-2211 is done, will spark sql automatically choose join
 algorithms?
  Is there some way to manually hint the optimizer?


Ideally we will select the best algorithm for you.  We are also considering
ways to allow the user to hint.


Re: Spark Streaming timing considerations

2014-07-21 Thread Tathagata Das
You will have to use some function that converts the dstreamTime (ms since
epoch, same format as returned by System.currentTimeMillis), and your
application-level time.

TD


On Mon, Jul 21, 2014 at 9:47 AM, Sean Owen so...@cloudera.com wrote:

 Uh, right. I mean:

 1405944367 = Mon, 21 Jul 2014 12:06:07 GMT

 On Mon, Jul 21, 2014 at 5:47 PM, Sean Owen so...@cloudera.com wrote:
  That is just standard Unix time.
 
  1405944367000 = Sun, 09 Aug 46522 05:56:40 GMT
 
 
  On Mon, Jul 21, 2014 at 5:43 PM, Laeeq Ahmed laeeqsp...@yahoo.com
 wrote:
 
 
  Hi TD,
 
  Thanks for the help.
 
  The only problem left here is that the dstreamTime contains some extra
 information which seems date i.e. 1405944367000 ms whereas my application
 timestamps are just in sec which I converted to ms. e.g. 2300, 2400, 2500
 etc. So the filter doesn't take effect.
 
  I was thinking to add that extra info to my Time(4000). But I am not
 really sure what it is?
 



Re: Uber jar with SBT

2014-07-21 Thread Tathagata Das
Just to confirm, are you interested in submitting the spark job inside the
cluster of the spark standalone mode (that is, one of the workers will be
running the driver)? For that, spark-submit does support that fully yet.
You can probably use the instructions present in Spark 0.9.1 to do that.

Regarding spark-submit's behavior, that is the expected behavior.
Spark-submit waits for the driver program to terminate.

TD


On Sat, Jul 19, 2014 at 7:34 AM, boci boci.b...@gmail.com wrote:

 Hi!

 I using java7, I found the problem. I not run start and await termination
 on streaming context, now it's work BUT
 spark-submit never return (it's run in the foreground and receive the
 kafka streams)... what I miss?
 (I want to send the job to standalone cluster worker process)

 b0c1


 --
 Skype: boci13, Hangout: boci.b...@gmail.com


 On Sat, Jul 19, 2014 at 3:32 PM, Sean Owen so...@cloudera.com wrote:

 Are you building / running with Java 6? I imagine your .jar files has
 more than 65536 files, and Java 6 has various issues with jars this
 large. If possible, use Java 7 everywhere.

 https://issues.apache.org/jira/browse/SPARK-1520

 On Sat, Jul 19, 2014 at 2:30 PM, boci boci.b...@gmail.com wrote:
  Hi Guys,
 
  I try to create spark uber jar with sbt but I have a lot of problem... I
  want to use the following:
  - Spark streaming
  - Kafka
  - Elsaticsearch
  - HBase
 
  the current jar size is cca 60M and it's not working.
  - When I deploy with spark-submit: It's running and exit without any
 error
  - When I try to start with local[*]  mode, it's say:
   Exception in thread main java.lang.NoClassDefFoundError:
  org/apache/spark/Logging
  = but I start with java -cp /.../spark-assembly-1.0.1-hadoop2.2.0.jar
 -jar
  my.jar
 
  Any idea how can solve this? (which lib required to set provided wich
  required for run... later I want to run this jar in yarn cluster)
 
  b0c1
 
 --
  Skype: boci13, Hangout: boci.b...@gmail.com





Re: Is there anyone who use streaming join to filter spam as guide mentioned?

2014-07-21 Thread Tathagata Das
Could you share your code snippet so that we can take a look?

TD



On Mon, Jul 21, 2014 at 7:23 AM, hawkwang wanghao.b...@gmail.com wrote:

 Hello guys,

 I'm just trying to use spark streaming features.
 I noticed that there is join example for filtering spam, so I just want to
 try.
 But, nothing happens after join, the output JavaPairDStream content is
 same as before.
 So, is there any examples that I can refer to?

 Thanks for any suggestions.

 Regards,
 Hawk



Re: broadcast variable get cleaned by ContextCleaner unexpectedly ?

2014-07-21 Thread Nan Zhu
Hi, TD,   

Thanks for the reply

I tried to reproduce this in a simpler program, but no luck

However, the program has been very simple, just load some files from HDFS and 
write them to HBase….

---

It seems that the issue only appears when I run the unit test in Jenkins (not 
fail every time, in usual, it will success in 1/10 times)

I once suspected that it’s related to some concurrency issue, but even I 
disable the parallel test in built.sbt, the problem is still there

---

Best,  

--  
Nan Zhu


On Monday, July 21, 2014 at 5:40 PM, Tathagata Das wrote:

 The ContextCleaner cleans up data and metadata related to RDDs and broadcast 
 variables, only when those variables are not in scope and get 
 garbage-collected by the JVM. So if the broadcast variable in question is 
 probably somehow going out of scope even before the job using the broadcast 
 variable is in progress.  
  
 Could you reproduce this behavior reliably in a simple code snippet that you 
 can share with us?
  
 TD
  
  
  
 On Mon, Jul 21, 2014 at 2:29 PM, Nan Zhu zhunanmcg...@gmail.com 
 (mailto:zhunanmcg...@gmail.com) wrote:
  Hi, all  
   
  When I run some Spark application (actually unit test of the application in 
  Jenkins ), I found that I always hit the FileNotFoundException when reading 
  broadcast variable   
   
  The program itself works well, except the unit test
   
  Here is the example log:  
   
   
  14/07/21 19:49:13 INFO Executor: Running task ID 4 14/07/21 19:49:13 INFO 
  DAGScheduler: Completed ResultTask(0, 2) 14/07/21 19:49:13 INFO 
  TaskSetManager: Finished TID 2 in 95 ms on localhost (progress: 3/106) 
  14/07/21 19:49:13 INFO TableOutputFormat: Created table instance for 
  hdfstest_customers 14/07/21 19:49:13 INFO Executor: Serialized size of 
  result for 3 is 596 14/07/21 19:49:13 INFO Executor: Sending result for 3 
  directly to driver 14/07/21 19:49:13 INFO BlockManager: Found block 
  broadcast_0 locally 14/07/21 19:49:13 INFO Executor: Finished task ID 3 
  14/07/21 19:49:13 INFO TaskSetManager: Starting task 0.0:5 as TID 5 on 
  executor localhost: localhost (PROCESS_LOCAL) 14/07/21 19:49:13 INFO 
  TaskSetManager: Serialized task 0.0:5 as 11885 bytes in 0 ms 14/07/21 
  19:49:13 INFO Executor: Running task ID 5 14/07/21 19:49:13 INFO 
  BlockManager: Removing broadcast 0 14/07/21 19:49:13 INFO DAGScheduler: 
  Completed ResultTask(0, 3) 14/07/21 19:49:13 INFO ContextCleaner: Cleaned 
  broadcast 0 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 3 in 97 ms 
  on localhost (progress: 4/106) 14/07/21 19:49:13 INFO BlockManager: Found 
  block broadcast_0 locally 14/07/21 19:49:13 INFO BlockManager: Removing 
  block broadcast_0 14/07/21 19:49:13 INFO MemoryStore: Block broadcast_0 of 
  size 202564 dropped from memory (free 886623436) 14/07/21 19:49:13 INFO 
  ContextCleaner: Cleaned shuffle 0 14/07/21 19:49:13 INFO 
  ShuffleBlockManager: Deleted all files for shuffle 0 14/07/21 19:49:13 INFO 
  HadoopRDD: Input split: 
  hdfs://172.31.34.184:9000/etltest/hdfsData/customer.csv:25+5 14/07/21 
  (http://172.31.34.184:9000/etltest/hdfsData/customer.csv:25+514/07/21) 
  19:49:13 INFO HadoopRDD: Input split: 
  hdfs://172.31.34.184:9000/etltest/hdfsData/customer.csv:20+5 14/07/21 
  (http://172.31.34.184:9000/etltest/hdfsData/customer.csv:20+514/07/21) 
  19:49:13 INFO TableOutputFormat: Created table instance for 
  hdfstest_customers 14/07/21 19:49:13 INFO Executor: Serialized size of 
  result for 4 is 596 14/07/21 19:49:13 INFO Executor: Sending result for 4 
  directly to driver 14/07/21 19:49:13 INFO Executor: Finished task ID 4 
  14/07/21 19:49:13 INFO TaskSetManager: Starting task 0.0:6 as TID 6 on 
  executor localhost: localhost (PROCESS_LOCAL) 14/07/21 19:49:13 INFO 
  TaskSetManager: Serialized task 0.0:6 as 11885 bytes in 0 ms 14/07/21 
  19:49:13 INFO Executor: Running task ID 6 14/07/21 19:49:13 INFO 
  DAGScheduler: Completed ResultTask(0, 4) 14/07/21 19:49:13 INFO 
  TaskSetManager: Finished TID 4 in 80 ms on localhost (progress: 5/106) 
  14/07/21 19:49:13 INFO TableOutputFormat: Created table instance for 
  hdfstest_customers 14/07/21 19:49:13 INFO Executor: Serialized size of 
  result for 5 is 596 14/07/21 19:49:13 INFO Executor: Sending result for 5 
  directly to driver 14/07/21 19:49:13 INFO Executor: Finished task ID 5 
  14/07/21 19:49:13 INFO TaskSetManager: Starting task 0.0:7 as TID 7 on 
  executor localhost: localhost (PROCESS_LOCAL) 14/07/21 19:49:13 INFO 
  TaskSetManager: Serialized task 0.0:7 as 11885 bytes in 0 ms 14/07/21 
  19:49:13 INFO Executor: Running task ID 7 14/07/21 19:49:13 INFO 
  DAGScheduler: Completed ResultTask(0, 5) 14/07/21 19:49:13 INFO 
  TaskSetManager: Finished TID 5 in 77 ms on localhost (progress: 6/106) 
  14/07/21 19:49:13 INFO HttpBroadcast: Started reading broadcast variable 0 
  14/07/21 19:49:13 INFO HttpBroadcast: Started reading broadcast variable 0 
  14/07/21 19:49:13 ERROR Executor: Exception in task ID 6 
  

Spark Partitioner vs Cassandra Partitioner

2014-07-21 Thread Marcelo Elias Del Valle
Hi,

I am new to Spark, have used hadoop for some time and just entered the
mailing list.

I am considering using spark in my application, reading data from Cassandra
in Python and writting mapped data back to Cassandra or to ES after it.

The first question I have is: Is it possible to use
https://github.com/datastax/spark-cassandra-connector with pyspark? I
noticed there is an example of cassandra input format in the master branch,
but I guess it will only work in the last release.

The second question is about how Spark does M/R over NoSQL tools like
Cassandra. If I understood it correctly, By using spark cassandra connector
an RDD is provided and I can read data from Cassandra, and use Spark to M/R
it.

However, when I do that, I still need HDFS to store intermediate results.
Correct me if I am wrong, but MAP results are stored in local filesystem,
then a partitioner is used to shuffle data to Spark nodes and then data is
reduced.

I would like to understand why doing that using a tool like Cassandra, for
example. Cassandra has partitioners itself, so I could just write the MAP
output (using batch inserts) to an intermediate column family and, after
map phase is complete, reduce the data. No need for shuffling, as Cassandra
does that very well.

Do you agree with my understanding? I wonder if I can do that using Spark,
if this could be a good feature in future or if you have good reasons to
think it would not perform well or something like that.

Thanks in advance, I look forward for answers.

Best regards,
Marcelo Valle.


Re: broadcast variable get cleaned by ContextCleaner unexpectedly ?

2014-07-21 Thread Nan Zhu
Hi, TD,   

I think I got more insights to the problem

in the Jenkins test file, I mistakenly pass a wrong value to spark.cores.max, 
which is much larger than the expected value  

(I passed master address as local[6], and spark.core.max as 200)

If I set a more consistent value, everything goes well,  

But I do not think it will bring this problem even the spark.cores.max is too 
large?

Best,  

--  
Nan Zhu


On Monday, July 21, 2014 at 6:11 PM, Nan Zhu wrote:

 Hi, TD,   
  
 Thanks for the reply
  
 I tried to reproduce this in a simpler program, but no luck
  
 However, the program has been very simple, just load some files from HDFS and 
 write them to HBase….
  
 ---
  
 It seems that the issue only appears when I run the unit test in Jenkins (not 
 fail every time, in usual, it will success in 1/10 times)
  
 I once suspected that it’s related to some concurrency issue, but even I 
 disable the parallel test in built.sbt, the problem is still there
  
 ---
  
 Best,  
  
 --  
 Nan Zhu
  
  
 On Monday, July 21, 2014 at 5:40 PM, Tathagata Das wrote:
  
  The ContextCleaner cleans up data and metadata related to RDDs and 
  broadcast variables, only when those variables are not in scope and get 
  garbage-collected by the JVM. So if the broadcast variable in question is 
  probably somehow going out of scope even before the job using the broadcast 
  variable is in progress.  
   
  Could you reproduce this behavior reliably in a simple code snippet that 
  you can share with us?
   
  TD
   
   
   
  On Mon, Jul 21, 2014 at 2:29 PM, Nan Zhu zhunanmcg...@gmail.com 
  (mailto:zhunanmcg...@gmail.com) wrote:
   Hi, all  

   When I run some Spark application (actually unit test of the application 
   in Jenkins ), I found that I always hit the FileNotFoundException when 
   reading broadcast variable   

   The program itself works well, except the unit test

   Here is the example log:  


   14/07/21 19:49:13 INFO Executor: Running task ID 4 14/07/21 19:49:13 INFO 
   DAGScheduler: Completed ResultTask(0, 2) 14/07/21 19:49:13 INFO 
   TaskSetManager: Finished TID 2 in 95 ms on localhost (progress: 3/106) 
   14/07/21 19:49:13 INFO TableOutputFormat: Created table instance for 
   hdfstest_customers 14/07/21 19:49:13 INFO Executor: Serialized size of 
   result for 3 is 596 14/07/21 19:49:13 INFO Executor: Sending result for 3 
   directly to driver 14/07/21 19:49:13 INFO BlockManager: Found block 
   broadcast_0 locally 14/07/21 19:49:13 INFO Executor: Finished task ID 3 
   14/07/21 19:49:13 INFO TaskSetManager: Starting task 0.0:5 as TID 5 on 
   executor localhost: localhost (PROCESS_LOCAL) 14/07/21 19:49:13 INFO 
   TaskSetManager: Serialized task 0.0:5 as 11885 bytes in 0 ms 14/07/21 
   19:49:13 INFO Executor: Running task ID 5 14/07/21 19:49:13 INFO 
   BlockManager: Removing broadcast 0 14/07/21 19:49:13 INFO DAGScheduler: 
   Completed ResultTask(0, 3) 14/07/21 19:49:13 INFO ContextCleaner: Cleaned 
   broadcast 0 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 3 in 97 
   ms on localhost (progress: 4/106) 14/07/21 19:49:13 INFO BlockManager: 
   Found block broadcast_0 locally 14/07/21 19:49:13 INFO BlockManager: 
   Removing block broadcast_0 14/07/21 19:49:13 INFO MemoryStore: Block 
   broadcast_0 of size 202564 dropped from memory (free 886623436) 14/07/21 
   19:49:13 INFO ContextCleaner: Cleaned shuffle 0 14/07/21 19:49:13 INFO 
   ShuffleBlockManager: Deleted all files for shuffle 0 14/07/21 19:49:13 
   INFO HadoopRDD: Input split: 
   hdfs://172.31.34.184:9000/etltest/hdfsData/customer.csv:25+5 14/07/21 
   (http://172.31.34.184:9000/etltest/hdfsData/customer.csv:25+514/07/21) 
   19:49:13 INFO HadoopRDD: Input split: 
   hdfs://172.31.34.184:9000/etltest/hdfsData/customer.csv:20+5 14/07/21 
   (http://172.31.34.184:9000/etltest/hdfsData/customer.csv:20+514/07/21) 
   19:49:13 INFO TableOutputFormat: Created table instance for 
   hdfstest_customers 14/07/21 19:49:13 INFO Executor: Serialized size of 
   result for 4 is 596 14/07/21 19:49:13 INFO Executor: Sending result for 4 
   directly to driver 14/07/21 19:49:13 INFO Executor: Finished task ID 4 
   14/07/21 19:49:13 INFO TaskSetManager: Starting task 0.0:6 as TID 6 on 
   executor localhost: localhost (PROCESS_LOCAL) 14/07/21 19:49:13 INFO 
   TaskSetManager: Serialized task 0.0:6 as 11885 bytes in 0 ms 14/07/21 
   19:49:13 INFO Executor: Running task ID 6 14/07/21 19:49:13 INFO 
   DAGScheduler: Completed ResultTask(0, 4) 14/07/21 19:49:13 INFO 
   TaskSetManager: Finished TID 4 in 80 ms on localhost (progress: 5/106) 
   14/07/21 19:49:13 INFO TableOutputFormat: Created table instance for 
   hdfstest_customers 14/07/21 19:49:13 INFO Executor: Serialized size of 
   result for 5 is 596 14/07/21 19:49:13 INFO Executor: Sending result for 5 
   directly to driver 14/07/21 19:49:13 INFO Executor: Finished task ID 5 
   14/07/21 19:49:13 INFO TaskSetManager: Starting task 0.0:7 as TID 7 on 
   

Re: Launching with m3.2xlarge instances: /mnt and /mnt2 mounted on 7gb drive

2014-07-21 Thread Matei Zaharia
Actually the script in the master branch is also broken (it's pointing to an 
older AMI). Try 1.0.1 for launching clusters.

On Jul 20, 2014, at 2:25 PM, Chris DuBois chris.dub...@gmail.com wrote:

 I pulled the latest last night. I'm on commit 4da01e3.
 
 
 On Sun, Jul 20, 2014 at 2:08 PM, Matei Zaharia matei.zaha...@gmail.com 
 wrote:
 Is this with the 1.0.0 scripts? I believe it's fixed in 1.0.1.
 
 Matei
 
 On Jul 20, 2014, at 1:22 AM, Chris DuBois chris.dub...@gmail.com wrote:
 
  Using the spark-ec2 script with m3.2xlarge instances seems to not have /mnt 
  and /mnt2 pointing to the 80gb SSDs that come with that instance. Does 
  anybody know whether extra steps are required when using this instance type?
 
  Thanks,
  Chris
 
 



Re: DROP IF EXISTS still throws exception about table does not exist?

2014-07-21 Thread Nan Zhu
Ah, I see,   

thanks, Yin  

--  
Nan Zhu


On Monday, July 21, 2014 at 5:00 PM, Yin Huai wrote:

 Hi Nan,
  
 It is basically a log entry because your table does not exist. It is not a 
 real exception.  
  
 Thanks,
  
 Yin
  
  
 On Mon, Jul 21, 2014 at 7:10 AM, Nan Zhu zhunanmcg...@gmail.com 
 (mailto:zhunanmcg...@gmail.com) wrote:
  a related JIRA: https://issues.apache.org/jira/browse/SPARK-2605  
   
  --  
  Nan Zhu
   
   
  On Monday, July 21, 2014 at 10:10 AM, Nan Zhu wrote:
   
   Hi, all  

   When I try hiveContext.hql(drop table if exists abc) where abc is a 
   non-exist table  

   I still received an exception about non-exist table though if exists is 
   there

   the same statement runs well in hive shell  

   Some feedback from Hive community is here: 
   https://issues.apache.org/jira/browse/HIVE-7458  

   “Your are doing hiveContext.hql(DROP TABLE IF EXISTS hivetesting) in 
   Scala schell of the Spark project.  

   What this shell is doing ? Query to remote metastore on non existing 
   table (see on your provided stack).
   The remote metastore throws 
   NoSuchObjectException(message:default.hivetesting table not found)because 
   Spark code call 
   HiveMetaStoreClient.getTable(HiveMetaStoreClient.java:854) on 
   non-existing table. It's the right behavior.
   You should check on Spark code why a query is done on non existing table.


   I think Spark does not handle well the IF EXISTS part of this query. 
   Maybe you could fill a ticket on Spark JIRA.

   BUT, it's not a bug in HIVE IMHO.”

   My question is the DDL is executed by Hive itself, doesn’t it?

   Best,  

   --  
   Nan Zhu

   
  



unable to create rdd with pyspark newAPIHadoopRDD

2014-07-21 Thread umeshdangat
Hello,
 I have only just started playing around with spark to see if it fits my
needs. I was trying to read some data from elasticsearch as an rdd, so that
I could perform some python based analytics on it. I am unable to create the
rdd object as of now, failing with a serialization error.

Working of spark repo commit tag in master:
abeacffb7bcdfa3eeb1e969aa546029a7b464eaa.

Steps I am doing as mentioned in patch:
https://github.com/apache/spark/pull/455

IPYTHON=1
SPARK_CLASSPATH=/Users/umeshdangat/Downloads/elasticsearch-hadoop-2.0.0/dist/elasticsearch-hadoop-mr-2.0.0.jar
./bin/pyspark

from pyspark import SparkContext
sc = SparkContext('local[2]')
conf = {'es.resource': 'twitter/tweet'} #index/type
rdd = sc.newAPIHadoopRDD(org.elasticsearch.hadoop.mr.EsInputFormat,
org.apache.hadoop.io.NullWritable,
org.elasticsearch.hadoop.mr.LinkedMapWritable, conf=conf)

Stack Trace:
Py4JJavaError Traceback (most recent call last)
/Users/umeshdangat/Documents/spark/ipython-input-4-ee964756398b in
module()
 1 rdd =
sc.newAPIHadoopRDD(org.elasticsearch.hadoop.mr.EsInputFormat,
org.apache.hadoop.io.NullWritable,
org.elasticsearch.hadoop.mr.LinkedMapWritable, conf=conf)

/Users/umeshdangat/Documents/spark/python/pyspark/context.pyc in
newAPIHadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter,
valueConverter, conf)
426 jconf = self._dictToJavaMap(conf)
427 jrdd = self._jvm.PythonRDD.newAPIHadoopRDD(self._jsc,
inputFormatClass, keyClass,
-- 428valueClass,
keyConverter, valueConverter, jconf)
429 return RDD(jrdd, self, PickleSerializer())
430 

/Users/umeshdangat/Documents/spark/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py
in __call__(self, *args)
535 answer = self.gateway_client.send_command(command)
536 return_value = get_return_value(answer, self.gateway_client,
-- 537 self.target_id, self.name)
538 
539 for temp_arg in temp_args:

/Users/umeshdangat/Documents/spark/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py
in get_return_value(answer, gateway_client, target_id, name)
298 raise Py4JJavaError(
299 'An error occurred while calling {0}{1}{2}.\n'.
-- 300 format(target_id, '.', name), value)
301 else:
302 raise Py4JError(

Py4JJavaError: An error occurred while calling
z:org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task
2.0 in stage 1.0 (TID 2) had a not serializable result:
scala.collection.convert.Wrappers$MapWrapper
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1045)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1029)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1027)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1027)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:632)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:632)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:632)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1230)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/unable-to-create-rdd-with-pyspark-newAPIHadoopRDD-tp10358.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: broadcast variable get cleaned by ContextCleaner unexpectedly ?

2014-07-21 Thread Tathagata Das
That is definitely weird. spark.core.max should not affect thing when they
are running local mode.

And, I am trying to think of scenarios that could cause a broadcast
variable used in the current job to fall out of scope, but they all seem
very far fetched. So i am really curious to see the code where this could
be happening.

Either ways, you could turn off the behavior by using
spark.cleaner.referenceTracking=false

TD


On Mon, Jul 21, 2014 at 3:52 PM, Nan Zhu zhunanmcg...@gmail.com wrote:

  Hi, TD,

 I think I got more insights to the problem

 in the Jenkins test file, I mistakenly pass a wrong value to spark.cores.max,
 which is much larger than the expected value

 (I passed master address as local[6], and spark.core.max as 200)

 If I set a more consistent value, everything goes well,

 But I do not think it will bring this problem even the spark.cores.max is
 too large?

 Best,

 --
 Nan Zhu

 On Monday, July 21, 2014 at 6:11 PM, Nan Zhu wrote:

  Hi, TD,

 Thanks for the reply

 I tried to reproduce this in a simpler program, but no luck

 However, the program has been very simple, just load some files from HDFS
 and write them to HBase….

 ---

 It seems that the issue only appears when I run the unit test in Jenkins
 (not fail every time, in usual, it will success in 1/10 times)

 I once suspected that it’s related to some concurrency issue, but even I
 disable the parallel test in built.sbt, the problem is still there

 ---

 Best,

 --
 Nan Zhu

 On Monday, July 21, 2014 at 5:40 PM, Tathagata Das wrote:

 The ContextCleaner cleans up data and metadata related to RDDs and
 broadcast variables, only when those variables are not in scope and get
 garbage-collected by the JVM. So if the broadcast variable in question is
 probably somehow going out of scope even before the job using the broadcast
 variable is in progress.

 Could you reproduce this behavior reliably in a simple code snippet that
 you can share with us?

 TD



 On Mon, Jul 21, 2014 at 2:29 PM, Nan Zhu zhunanmcg...@gmail.com wrote:

  Hi, all

 When I run some Spark application (actually unit test of the application in
 Jenkins ), I found that I always hit the FileNotFoundException when
 reading broadcast variable

 The program itself works well, except the unit test

 Here is the example log:


 14/07/21 19:49:13 INFO Executor: Running task ID 4
 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 2)
 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 2 in 95 ms on localhost 
 (progress: 3/106)
 14/07/21 19:49:13 INFO TableOutputFormat: Created table instance for 
 hdfstest_customers
 14/07/21 19:49:13 INFO Executor: Serialized size of result for 3 is 596
 14/07/21 19:49:13 INFO Executor: Sending result for 3 directly to driver
 14/07/21 19:49:13 INFO BlockManager: Found block broadcast_0 locally
 14/07/21 19:49:13 INFO Executor: Finished task ID 3
 14/07/21 19:49:13 INFO TaskSetManager: Starting task 0.0:5 as TID 5 on 
 executor localhost: localhost (PROCESS_LOCAL)
 14/07/21 19:49:13 INFO TaskSetManager: Serialized task 0.0:5 as 11885 bytes 
 in 0 ms
 14/07/21 19:49:13 INFO Executor: Running task ID 5
 14/07/21 19:49:13 INFO BlockManager: Removing broadcast 0
 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 3)*14/07/21 
 19:49:13 INFO ContextCleaner: Cleaned broadcast 0*
 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 3 in 97 ms on localhost 
 (progress: 4/106)
 14/07/21 19:49:13 INFO BlockManager: Found block broadcast_0 locally
 14/07/21 19:49:13 INFO BlockManager: Removing block broadcast_0*14/07/21 
 19:49:13 INFO MemoryStore: Block broadcast_0 of size 202564 dropped from 
 memory (free 886623436)*
 14/07/21 19:49:13 INFO ContextCleaner: Cleaned shuffle 0
 14/07/21 19:49:13 INFO ShuffleBlockManager: Deleted all files for shuffle 0
 14/07/21 19:49:13 INFO HadoopRDD: Input split: 
 hdfs://172.31.34.184:9000/etltest/hdfsData/customer.csv:25+5
 14/07/21 
 http://172.31.34.184:9000/etltest/hdfsData/customer.csv:25+514/07/21 
 19:49:13 INFO HadoopRDD: Input split: 
 hdfs://172.31.34.184:9000/etltest/hdfsData/customer.csv:20+5
 14/07/21 
 http://172.31.34.184:9000/etltest/hdfsData/customer.csv:20+514/07/21 
 19:49:13 INFO TableOutputFormat: Created table instance for hdfstest_customers
 14/07/21 19:49:13 INFO Executor: Serialized size of result for 4 is 596
 14/07/21 19:49:13 INFO Executor: Sending result for 4 directly to driver
 14/07/21 19:49:13 INFO Executor: Finished task ID 4
 14/07/21 19:49:13 INFO TaskSetManager: Starting task 0.0:6 as TID 6 on 
 executor localhost: localhost (PROCESS_LOCAL)
 14/07/21 19:49:13 INFO TaskSetManager: Serialized task 0.0:6 as 11885 bytes 
 in 0 ms
 14/07/21 19:49:13 INFO Executor: Running task ID 6
 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 4)
 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 4 in 80 ms on localhost 
 (progress: 5/106)
 14/07/21 19:49:13 INFO TableOutputFormat: Created table instance for 
 hdfstest_customers
 14/07/21 19:49:13 

RE: error from DecisonTree Training:

2014-07-21 Thread Jack Yang
So this is a bug unsolved (for java) yet?

From: Jack Yang [mailto:j...@uow.edu.au]
Sent: Friday, 18 July 2014 4:52 PM
To: user@spark.apache.org
Subject: error from DecisonTree Training:

Hi All,
I got an error while using DecisionTreeModel (my program is written in Java, 
spark 1.0.1, scala 2.10.1).
I have read a local file, loaded it as RDD, and then sent to decisionTree for 
training. See below for details:

JavaRDDLabeledPoint Points = lines.map(new ParsePoint()).cache();
LogisticRegressionModel model = 
LogisticRegressionWithSGD.train(Points.rdd(),iterations, stepSize);   // until 
here it is working
Strategy strategy = new Strategy( );
DecisionTree decisionTree = new DecisionTree(strategy);
DecisionTreeModel decisionTreeModel = decisionTree.train(Points.rdd());


The error is : java.lang.ClassCastException: [Ljava.lang.Object; cannot be cast 
to [Lorg.apache.spark.mllib.regression.LabeledPoint;

Any thoughts?

Best regards,
Jack



Re: Launching with m3.2xlarge instances: /mnt and /mnt2 mounted on 7gb drive

2014-07-21 Thread Xiangrui Meng
You can also try a different region. I tested us-west-2 yesterday, and
it worked well. -Xiangrui

On Sun, Jul 20, 2014 at 4:35 PM, Matei Zaharia matei.zaha...@gmail.com wrote:
 Actually the script in the master branch is also broken (it's pointing to an
 older AMI). Try 1.0.1 for launching clusters.

 On Jul 20, 2014, at 2:25 PM, Chris DuBois chris.dub...@gmail.com wrote:

 I pulled the latest last night. I'm on commit 4da01e3.


 On Sun, Jul 20, 2014 at 2:08 PM, Matei Zaharia matei.zaha...@gmail.com
 wrote:

 Is this with the 1.0.0 scripts? I believe it's fixed in 1.0.1.

 Matei

 On Jul 20, 2014, at 1:22 AM, Chris DuBois chris.dub...@gmail.com wrote:

  Using the spark-ec2 script with m3.2xlarge instances seems to not have
  /mnt and /mnt2 pointing to the 80gb SSDs that come with that instance. Does
  anybody know whether extra steps are required when using this instance 
  type?
 
  Thanks,
  Chris





Re: error from DecisonTree Training:

2014-07-21 Thread Xiangrui Meng
This is a known issue:
https://issues.apache.org/jira/browse/SPARK-2197 . Joseph is working
on it. -Xiangrui

On Mon, Jul 21, 2014 at 4:20 PM, Jack Yang j...@uow.edu.au wrote:
 So this is a bug unsolved (for java) yet?



 From: Jack Yang [mailto:j...@uow.edu.au]
 Sent: Friday, 18 July 2014 4:52 PM
 To: user@spark.apache.org
 Subject: error from DecisonTree Training:



 Hi All,

 I got an error while using DecisionTreeModel (my program is written in Java,
 spark 1.0.1, scala 2.10.1).

 I have read a local file, loaded it as RDD, and then sent to decisionTree
 for training. See below for details:



 JavaRDDLabeledPoint Points = lines.map(new ParsePoint()).cache();

 LogisticRegressionModel model =
 LogisticRegressionWithSGD.train(Points.rdd(),iterations, stepSize);   //
 until here it is working

 Strategy strategy = new Strategy( ….);

 DecisionTree decisionTree = new DecisionTree(strategy);

 DecisionTreeModel decisionTreeModel = decisionTree.train(Points.rdd());





 The error is : java.lang.ClassCastException: [Ljava.lang.Object; cannot be
 cast to [Lorg.apache.spark.mllib.regression.LabeledPoint;



 Any thoughts?



 Best regards,

 Jack




Re: broadcast variable get cleaned by ContextCleaner unexpectedly ?

2014-07-21 Thread Nan Zhu
Ah, sorry, sorry, my brain just damaged….. sent some wrong information  

not “spark.cores.max” but the minPartitions in sc.textFile()  


Best,

--  
Nan Zhu


On Monday, July 21, 2014 at 7:17 PM, Tathagata Das wrote:

 That is definitely weird. spark.core.max should not affect thing when they 
 are running local mode.  
  
 And, I am trying to think of scenarios that could cause a broadcast variable 
 used in the current job to fall out of scope, but they all seem very far 
 fetched. So i am really curious to see the code where this could be 
 happening.  
  
 Either ways, you could turn off the behavior by using 
 spark.cleaner.referenceTracking=false
  
 TD
  
  
 On Mon, Jul 21, 2014 at 3:52 PM, Nan Zhu zhunanmcg...@gmail.com 
 (mailto:zhunanmcg...@gmail.com) wrote:
  Hi, TD,   
   
  I think I got more insights to the problem
   
  in the Jenkins test file, I mistakenly pass a wrong value to 
  spark.cores.max, which is much larger than the expected value   
   
  (I passed master address as local[6], and spark.core.max as 200)
   
  If I set a more consistent value, everything goes well,  
   
  But I do not think it will bring this problem even the spark.cores.max is 
  too large?  
   
  Best,  
   
  --  
  Nan Zhu
   
   
  On Monday, July 21, 2014 at 6:11 PM, Nan Zhu wrote:
   
   Hi, TD,   

   Thanks for the reply

   I tried to reproduce this in a simpler program, but no luck  

   However, the program has been very simple, just load some files from HDFS 
   and write them to HBase….

   ---

   It seems that the issue only appears when I run the unit test in Jenkins 
   (not fail every time, in usual, it will success in 1/10 times)  

   I once suspected that it’s related to some concurrency issue, but even I 
   disable the parallel test in built.sbt, the problem is still there  

   ---

   Best,  

   --  
   Nan Zhu


   On Monday, July 21, 2014 at 5:40 PM, Tathagata Das wrote:

The ContextCleaner cleans up data and metadata related to RDDs and 
broadcast variables, only when those variables are not in scope and get 
garbage-collected by the JVM. So if the broadcast variable in question 
is probably somehow going out of scope even before the job using the 
broadcast variable is in progress.  
 
Could you reproduce this behavior reliably in a simple code snippet 
that you can share with us?
 
TD
 
 
 
On Mon, Jul 21, 2014 at 2:29 PM, Nan Zhu zhunanmcg...@gmail.com 
(mailto:zhunanmcg...@gmail.com) wrote:
 Hi, all  
  
 When I run some Spark application (actually unit test of the 
 application in Jenkins ), I found that I always hit the 
 FileNotFoundException when reading broadcast variable   
  
 The program itself works well, except the unit test
  
 Here is the example log:  
  
  
 14/07/21 19:49:13 INFO Executor: Running task ID 4 14/07/21 19:49:13 
 INFO DAGScheduler: Completed ResultTask(0, 2) 14/07/21 19:49:13 INFO 
 TaskSetManager: Finished TID 2 in 95 ms on localhost (progress: 
 3/106) 14/07/21 19:49:13 INFO TableOutputFormat: Created table 
 instance for hdfstest_customers 14/07/21 19:49:13 INFO Executor: 
 Serialized size of result for 3 is 596 14/07/21 19:49:13 INFO 
 Executor: Sending result for 3 directly to driver 14/07/21 19:49:13 
 INFO BlockManager: Found block broadcast_0 locally 14/07/21 19:49:13 
 INFO Executor: Finished task ID 3 14/07/21 19:49:13 INFO 
 TaskSetManager: Starting task 0.0:5 as TID 5 on executor localhost: 
 localhost (PROCESS_LOCAL) 14/07/21 19:49:13 INFO TaskSetManager: 
 Serialized task 0.0:5 as 11885 bytes in 0 ms 14/07/21 19:49:13 INFO 
 Executor: Running task ID 5 14/07/21 19:49:13 INFO BlockManager: 
 Removing broadcast 0 14/07/21 19:49:13 INFO DAGScheduler: Completed 
 ResultTask(0, 3) 14/07/21 19:49:13 INFO ContextCleaner: Cleaned 
 broadcast 0 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 3 in 
 97 ms on localhost (progress: 4/106) 14/07/21 19:49:13 INFO 
 BlockManager: Found block broadcast_0 locally 14/07/21 19:49:13 INFO 
 BlockManager: Removing block broadcast_0 14/07/21 19:49:13 INFO 
 MemoryStore: Block broadcast_0 of size 202564 dropped from memory 
 (free 886623436) 14/07/21 19:49:13 INFO ContextCleaner: Cleaned 
 shuffle 0 14/07/21 19:49:13 INFO ShuffleBlockManager: Deleted all 
 files for shuffle 0 14/07/21 19:49:13 INFO HadoopRDD: Input split: 
 hdfs://172.31.34.184:9000/etltest/hdfsData/customer.csv:25+5 14/07/21 
 (http://172.31.34.184:9000/etltest/hdfsData/customer.csv:25+514/07/21)
  19:49:13 INFO HadoopRDD: Input split: 
 hdfs://172.31.34.184:9000/etltest/hdfsData/customer.csv:20+5 14/07/21 
 (http://172.31.34.184:9000/etltest/hdfsData/customer.csv:20+514/07/21)
  19:49:13 INFO TableOutputFormat: Created table instance for 
 hdfstest_customers 

Re: How to map each line to (line number, line)?

2014-07-21 Thread Andrew Ash
I'm not sure if you guys ever picked a preferred method for doing this, but
I just encountered it and came up with this method that's working
reasonably well on a small dataset.  It should be quite easily
generalizable to non-String RDDs.

def addRowNumber(r: RDD[String]): RDD[Tuple2[Long,String]] = {
val sc = r.sparkContext
val partitionSizes = r.mapPartitionsWithIndex( (index, rows) =
Iterator( (index, rows.size) ) ).collect
val partitionGlobalStartIndex =
partitionSizes.sortBy(_._1).map(_._2).scanLeft(0)(_+_)

val startIndexes = sc.broadcast(partitionGlobalStartIndex)
r.mapPartitionsWithIndex( (partitionIndex, rows) = {
val partitionStartIndex = startIndexes.value(partitionIndex)
rows.zipWithIndex map { case (row, rowIndex) =
(partitionStartIndex + rowIndex, row)
}
})
}


On Wed, Jan 1, 2014 at 4:05 AM, Guillaume Pitel guillaume.pi...@exensa.com
wrote:

  I'm not very comfortable with the idea of generating a rdd from the range
 (it might take a lot of memory), dispatching it to the nodes, then zipping.

 You should try and compare the two approaches and give us the performance
 comparison.

 Guillaume

 Why not use a zipped RDD?

 http://spark.incubator.apache.org/docs/latest/api/core/index.html#org.apache.spark.rdd.ZippedRDD


  I do not know why no one else suggested this. Of course it has 3 extra
 loops (one for counting rdd, one for generating the range, one for
 zipping). Apart from this performance problem, any other caveats?



  I have used something like this in the past.

   val index = sc.parallelize(Range.Long(0, rdd.count, 1),
 rdd.partitions.size)
   val rddWithIndex = rdd.zip(index)

  If that doesn't work, then you could try zipPartitions as well, since
 it has slightly more relaxed constraints.


 --
[image: eXenSa]
  *Guillaume PITEL, Président*
 +33(0)6 25 48 86 80 / +33(0)9 70 44 67 53

  eXenSa S.A.S. http://www.exensa.com/
  41, rue Périer - 92120 Montrouge - FRANCE
 Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05



RE: error from DecisonTree Training:

2014-07-21 Thread Jack Yang
That is nice.
Thanks Xiangrui.

-Original Message-
From: Xiangrui Meng [mailto:men...@gmail.com] 
Sent: Tuesday, 22 July 2014 9:31 AM
To: user@spark.apache.org
Subject: Re: error from DecisonTree Training:

This is a known issue:
https://issues.apache.org/jira/browse/SPARK-2197 . Joseph is working on it. 
-Xiangrui

On Mon, Jul 21, 2014 at 4:20 PM, Jack Yang j...@uow.edu.au wrote:
 So this is a bug unsolved (for java) yet?



 From: Jack Yang [mailto:j...@uow.edu.au]
 Sent: Friday, 18 July 2014 4:52 PM
 To: user@spark.apache.org
 Subject: error from DecisonTree Training:



 Hi All,

 I got an error while using DecisionTreeModel (my program is written in 
 Java, spark 1.0.1, scala 2.10.1).

 I have read a local file, loaded it as RDD, and then sent to 
 decisionTree for training. See below for details:



 JavaRDDLabeledPoint Points = lines.map(new ParsePoint()).cache();

 LogisticRegressionModel model =
 LogisticRegressionWithSGD.train(Points.rdd(),iterations, stepSize);   //
 until here it is working

 Strategy strategy = new Strategy( ….);

 DecisionTree decisionTree = new DecisionTree(strategy);

 DecisionTreeModel decisionTreeModel = 
 decisionTree.train(Points.rdd());





 The error is : java.lang.ClassCastException: [Ljava.lang.Object; 
 cannot be cast to [Lorg.apache.spark.mllib.regression.LabeledPoint;



 Any thoughts?



 Best regards,

 Jack




答复: LiveListenerBus throws exception and weird web UI bug

2014-07-21 Thread 余根茂(木艮)
Hi all, 

 Here is my fix https://github.com/apache/spark/pull/1356, although not 
handsome, but work well.  Any Suggestions?

 

--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/LiveListenerBus-throws-exception-and-weird-web-UI-bug-tp8330p10324.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

 



Joining by timestamp.

2014-07-21 Thread durga
Hi

I have peculiar problem,

I have two data sets (large ones) . 
Data set1:

((timestamp),iterable[Any]) = {
(2014-07-10T00:02:45.045+,ArrayBuffer((2014-07-10T00:02:45.045+,98.4859,22)))
(2014-07-10T00:07:32.618+,ArrayBuffer((2014-07-10T00:07:32.618+,75.4737,22)))
}

DataSet2:
((timestamp),iterable[Any]) ={
(2014-07-10T00:03:16.952+,ArrayBuffer((2014-07-10T00:03:16.952+,99.6148,23)))
(2014-07-10T00:08:11.329+,ArrayBuffer((2014-07-10T00:08:11.329+,80.9017,23)))
}

I need to join them , But the catch is , both time stamps are not same ,
they can be approximately 4mins +/-.

those records needs to be joined

Any idea is very much appreciated.

I am thinking right now.

file descriptor for sorted Dataset2.
Read the sorted records of dataset1 .
 for each record , check for any record matching with the criteria , 
if match emit the record1,record2
if not matching continue reading record2 until it matches.

I know this works for a very small files , That's the reason I need help.

Thanks,
D.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Joining-by-timestamp-tp10367.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Task not serializable: java.io.NotSerializableException: org.apache.spark.SparkContext

2014-07-21 Thread hsy...@gmail.com
I have the same problem


On Sat, Jul 19, 2014 at 12:31 AM, lihu lihu...@gmail.com wrote:

 Hi,
 Everyone.  I have a piece of following code. When I run it,
 it occurred the error just like below, it seem that the SparkContext is not
 serializable, but i do not try to use the SparkContext except the broadcast.
 [In fact, this code is in the MLLib, I just try to broadcast the
  centerArrays ]

 it can success in the redeceBykey operation, but failed at the collect
 operation, this confused me.


 INFO DAGScheduler: Failed to run collect at KMeans.scala:235
 [error] (run-main-0) org.apache.spark.SparkException: Job aborted: Task
 not serializable: java.io.NotSerializableException:
 org.apache.spark.SparkContext
 org.apache.spark.SparkException: Job aborted: Task not serializable:
 java.io.NotSerializableException: org.apache.spark.SparkContext
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
  at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)




 private def initKMeansParallel(data: RDD[Array[Double]]):
 Array[ClusterCenters] = {

 @transient val sc = data.sparkContext   // I try to add the 
 transient
 annotation here, but it doesn't work

 // Initialize each run's center to a random point
 val seed = new XORShiftRandom().nextInt()
 val sample = data.takeSample(true, runs, seed).toSeq
 val centers = Array.tabulate(runs)(r = ArrayBuffer(sample(r)))

 // On each step, sample 2 * k points on average for each run with
 probability proportional
 // to their squared distance from that run's current centers
 for (step - 0 until initializationSteps) {
   val centerArrays = sc.broadcast(centers.map(_.toArray))
   val sumCosts = data.flatMap { point =
 for (r - 0 until runs) yield (r,
 KMeans.pointCost(centerArrays.value(r), point))
   }.reduceByKey(_ + _).collectAsMap()
   //can pass at this point
   val chosen = data.mapPartitionsWithIndex { (index, points) =
 val rand = new XORShiftRandom(seed ^ (step  16) ^ index)
 for {
   p - points
   r - 0 until runs
   if rand.nextDouble()  KMeans.pointCost(centerArrays.value(r),
 p) * 2 * k / sumCosts(r)
 } yield (r, p)
   }.collect()
   // failed at this
 point.
   for ((r, p) - chosen) {
 centers(r) += p
   }
 }







saveAsSequenceFile for DStream

2014-07-21 Thread Barnaby
First of all, I do not know Scala, but learning.

I'm doing a proof of concept by streaming content from a socket, counting
the words and write it to a Tachyon disk. A different script will read the
file stream and print out the results.

 val lines = ssc.socketTextStream(args(0), args(1).toInt,
StorageLevel.MEMORY_AND_DISK_SER)
 val words = lines.flatMap(_.split( ))
 val wordCounts = words.map(x = (x, 1)).reduceByKey(_ + _)
 wordCounts.saveAs???Files(tachyon://localhost:19998/files/WordCounts)
 ssc.start()
 ssc.awaitTermination()

I already did a proof of concept to write and read sequence files but there
doesn't seem to be a saveAsSequenceFiles() method in DStream. What is the
best way to write out an RDD to a stream so that the timestamps are in the
filenames and so there is minimal overhead in reading the data back in as
objects, see below.

My simple successful proof was the following:
val rdd =  sc.parallelize(Array((a,2), (b,3), (c,1)))
rdd.saveAsSequenceFile(tachyon://.../123.sf2)
val rdd2 = sc.sequenceFile[String,Int](tachyon://.../123.sf2)

How can I do something similar with streaming?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/saveAsSequenceFile-for-DStream-tp10369.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


RE: Joining by timestamp.

2014-07-21 Thread Cheng, Hao
This is a very interesting problem. SparkSQL supports the Non Equi Join, but it 
is in very low efficiency with large tables.

One possible solution is make both table partition based and the partition keys 
are (cast(ds as bigint) / 240), and with each partition in dataset1, you 
probably can write SQL like select * from dataset1 inner join dataset2 on 
abs(cast(dataset1.ds as bigint) -cast(dataset2.ds as bigint))  240.

We assume the dataset2 is always the 3 adjacent partitions and the partitions 
key are: key -1, key, and key + 1 (which means the dataset2 with maximum ds 
240 seconds greater than dataset 1 and minimum timestamp 240 seconds less than 
the dataset1).

After finish iterating every partition in dataset1, you should get the result.

BTW, not sure if you really want the stream sql: 
https://github.com/thunderain-project/StreamSQL


-Original Message-
From: durga [mailto:durgak...@gmail.com] 
Sent: Tuesday, July 22, 2014 8:41 AM
To: u...@spark.incubator.apache.org
Subject: Joining by timestamp.

Hi

I have peculiar problem,

I have two data sets (large ones) . 
Data set1:

((timestamp),iterable[Any]) = {
(2014-07-10T00:02:45.045+,ArrayBuffer((2014-07-10T00:02:45.045+,98.4859,22)))
(2014-07-10T00:07:32.618+,ArrayBuffer((2014-07-10T00:07:32.618+,75.4737,22)))
}

DataSet2:
((timestamp),iterable[Any]) ={
(2014-07-10T00:03:16.952+,ArrayBuffer((2014-07-10T00:03:16.952+,99.6148,23)))
(2014-07-10T00:08:11.329+,ArrayBuffer((2014-07-10T00:08:11.329+,80.9017,23)))
}

I need to join them , But the catch is , both time stamps are not same , they 
can be approximately 4mins +/-.

those records needs to be joined

Any idea is very much appreciated.

I am thinking right now.

file descriptor for sorted Dataset2.
Read the sorted records of dataset1 .
 for each record , check for any record matching with the criteria , 
if match emit the record1,record2
if not matching continue reading record2 until it matches.

I know this works for a very small files , That's the reason I need help.

Thanks,
D.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Joining-by-timestamp-tp10367.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: spark streaming rate limiting from kafka

2014-07-21 Thread Tobias Pfeiffer
Bill,

numPartitions means the number of Spark partitions that the data received
from Kafka will be split to. It has nothing to do with Kafka partitions, as
far as I know.

If you create multiple Kafka consumers, it doesn't seem like you can
specify which consumer will consume which Kafka partitions. Instead, Kafka
(at least with the interface that is exposed by the Spark Streaming API)
will do something called rebalance and assign Kafka partitions to consumers
evenly, you can see this in the client logs.

When using multiple Kafka consumers with auto.offset.reset = true, please
expect to run into this one:
https://issues.apache.org/jira/browse/SPARK-2383

Tobias


On Tue, Jul 22, 2014 at 3:40 AM, Bill Jay bill.jaypeter...@gmail.com
wrote:

 Hi Tathagata,

 I am currentlycreating multiple DStream to consumefrom different topics.
 How can I let each consumer consume from different partitions. I find the
 following parameters from Spark API:

 createStream[K, V, U : Decoder[_], T : Decoder[_]](jssc:
 JavaStreamingContext
 https://spark.apache.org/docs/1.0.0/api/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.html
 , keyTypeClass: Class[K], valueTypeClass: Class[V],keyDecoderClass: Class[
 U], valueDecoderClass: Class[T], kafkaParams: Map[String, String],
 topics: Map[String, Integer],storageLevel: StorageLevel
 https://spark.apache.org/docs/1.0.0/api/scala/org/apache/spark/storage/StorageLevel.html
 ): JavaPairReceiverInputDStream
 https://spark.apache.org/docs/1.0.0/api/scala/org/apache/spark/streaming/api/java/JavaPairReceiverInputDStream.html
 [K, V]

 Create an input stream that pulls messages form a Kafka Broker.




 The topics parameter is:
 *Map of (topic_name - numPartitions) to consume. Each partition is
 consumed in its own thread*

 Does numPartitions mean the total number of partitions to consume from
 topic_name or the index of the partition? How can we specify for each
 createStream which partition of the Kafka topic to consume? I think if so,
 I will get a lot of parallelism from the source of the data. Thanks!

 Bill

 On Thu, Jul 17, 2014 at 6:21 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 You can create multiple kafka stream to partition your topics across
 them, which will run multiple receivers or multiple executors. This is
 covered in the Spark streaming guide.
 http://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving

 And for the purpose of this thread, to answer the original question, we now
 have the ability
 https://issues.apache.org/jira/browse/SPARK-1854?jql=project%20%3D%20SPARK%20AND%20resolution%20%3D%20Unresolved%20AND%20component%20%3D%20Streaming%20ORDER%20BY%20priority%20DESC
 to limit the receiving rate. Its in the master branch, and will be
 available in Spark 1.1. It basically sets the limits at the receiver level
 (so applies to all sources) on what is the max records per second that can
 will be received by the receiver.

 TD


 On Thu, Jul 17, 2014 at 6:15 PM, Tobias Pfeiffer t...@preferred.jp
 wrote:

 Bill,

 are you saying, after repartition(400), you have 400 partitions on one
 host and the other hosts receive nothing of the data?

 Tobias


 On Fri, Jul 18, 2014 at 8:11 AM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 I also have an issue consuming from Kafka. When I consume from Kafka,
 there are always a single executor working on this job. Even I use
 repartition, it seems that there is still a single executor. Does anyone
 has an idea how to add parallelism to this job?



 On Thu, Jul 17, 2014 at 2:06 PM, Chen Song chen.song...@gmail.com
 wrote:

 Thanks Luis and Tobias.


 On Tue, Jul 1, 2014 at 11:39 PM, Tobias Pfeiffer t...@preferred.jp
 wrote:

 Hi,

 On Wed, Jul 2, 2014 at 1:57 AM, Chen Song chen.song...@gmail.com
 wrote:

 * Is there a way to control how far Kafka Dstream can read on
 topic-partition (via offset for example). By setting this to a small
 number, it will force DStream to read less data initially.


 Please see the post at

 http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201406.mbox/%3ccaph-c_m2ppurjx-n_tehh0bvqe_6la-rvgtrf1k-lwrmme+...@mail.gmail.com%3E
 Kafka's auto.offset.reset parameter may be what you are looking for.

 Tobias




 --
 Chen Song








Understanding Spark

2014-07-21 Thread omergul123
Hi,

I'm just a new one in the world big data and I'm trying understand the use
cases of several projects. Of course one of them is Spark.

I wanna know that what is the proper way of examining my data that resides
on my MySQL server?

Think that I'm saving every page view of a user with the timestamp in a
table called views (id, user_id, page, created_at). Let's assume there are
millions of rows in this table. In the Spark examples, there are some text
files which are analysed. So in the case of data that resides in MySQL, what
should be my approach? By analysing the data, you can think of generating
page recommendations for similar users.

Thanks,



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Understanding-Spark-tp10373.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


defaultMinPartitions in textFile

2014-07-21 Thread Wang, Jensen
Hi,
I started to use spark on yarn recently and found a problem while 
tuning my program.

When SparkContext is initialized as sc and ready to read text file from hdfs, 
the textFile(path, defaultMinPartitions) method is called.
I traced down the second parameter in the spark source code and finally found 
this:
   conf.getInt(spark.default.parallelism, math.max(totalCoreCount.get(), 2))  
in  CoarseGrainedSchedulerBackend.scala

I do not specify the property spark.default.parallelism anywhere so the 
getInt will return value from the larger one between totalCoreCount and 2.

When I submit the application using spark-submit and specify the parameter: 
--num-executors  2   --executor-cores 6, I suppose the totalCoreCount will be
2*6 = 12, so defaultMinPartitions will be 12.

But when I print the value of defaultMinPartitions in my program, I still get 2 
in return,  How does this happen, or where do I make a mistake?


RE: data locality

2014-07-21 Thread Haopu Wang
Sandy,

 

I just tried the standalone cluster and didn't have chance to try Yarn yet.

So if I understand correctly, there are *no* special handling of task 
assignment according to the HDFS block's location when Spark is running as a 
*standalone* cluster.

Please correct me if I'm wrong. Thank you for your patience!

 



From: Sandy Ryza [mailto:sandy.r...@cloudera.com] 
Sent: 2014年7月22日 9:47
To: user@spark.apache.org
Subject: Re: data locality

 

This currently only works for YARN.  The standalone default is to place an 
executor on every node for every job.

 

The total number of executors is specified by the user.

 

-Sandy

 

On Fri, Jul 18, 2014 at 2:00 AM, Haopu Wang hw...@qilinsoft.com wrote:

Sandy,

 

Do you mean the “preferred location” is working for standalone cluster also? 
Because I check the code of SparkContext and see comments as below:

 

  // This is used only by YARN for now, but should be relevant to other cluster 
types (Mesos,

  // etc) too. This is typically generated from 
InputFormatInfo.computePreferredLocations. It

  // contains a map from hostname to a list of input format splits on the host.

  private[spark] var preferredNodeLocationData: Map[String, Set[SplitInfo]] = 
Map()

 

BTW, even with the preferred hosts, how does Spark decide how many total 
executors to use for this application?

 

Thanks again!

 



From: Sandy Ryza [mailto:sandy.r...@cloudera.com] 
Sent: Friday, July 18, 2014 3:44 PM
To: user@spark.apache.org
Subject: Re: data locality

 

Hi Haopu,

 

Spark will ask HDFS for file block locations and try to assign tasks based on 
these.

 

There is a snag.  Spark schedules its tasks inside of executor processes that 
stick around for the lifetime of a Spark application.  Spark requests executors 
before it runs any jobs, i.e. before it has any information about where the 
input data for the jobs is located.  If the executors occupy significantly 
fewer nodes than exist in the cluster, it can be difficult for Spark to achieve 
data locality.  The workaround for this is an API that allows passing in a set 
of preferred locations when instantiating a Spark context.  This API is 
currently broken in Spark 1.0, and will likely changed to be something a little 
simpler in a future release.

 

val locData = InputFormatInfo.computePreferredLocations

  (Seq(new InputFormatInfo(conf, classOf[TextInputFormat], new 
Path(“myfile.txt”)))

 

val sc = new SparkContext(conf, locData)

 

-Sandy

 

 

On Fri, Jul 18, 2014 at 12:35 AM, Haopu Wang hw...@qilinsoft.com wrote:

I have a standalone spark cluster and a HDFS cluster which share some of nodes.

 

When reading HDFS file, how does spark assign tasks to nodes? Will it ask HDFS 
the location for each file block in order to get a right worker node?

 

How about a spark cluster on Yarn?

 

Thank you very much!

 

 

 



Re: Question about initial message in graphx

2014-07-21 Thread Ankur Dave
On Mon, Jul 21, 2014 at 8:05 PM, Bin WU bw...@connect.ust.hk wrote:

 I am not sure how to specify different initial values to each node in the
 graph. Moreover, I am wondering why initial message is necessary. I think
 we can instead initialize the graph and then pass it into Pregel interface?


Indeed it's not necessary, and a future update to Pregel will probably
remove it: https://github.com/apache/spark/pull/1217

But the initial message parameter doesn't prevent you from specifying
different initial *values* for each vertex. Pregel respects the initial
vertex values of the provided graph; the initial message just ensures that
it will run vprog at least once per vertex.

If you don't want an initial message, one option is to set aside a special
message value for it and check for this in vprog. For example, if the
message type is Int, you could change it to Option[Int] and use None as the
initial message.

Ankur http://www.ankurdave.com/


new error for me

2014-07-21 Thread Nathan Kronenfeld
Does anyone know what this error means:
14/07/21 23:07:22 INFO TaskSchedulerImpl: Adding task set 3.0 with 1 tasks
14/07/21 23:07:22 INFO TaskSetManager: Starting task 3.0:0 as TID 1620 on
executor 27: r104u05.oculus.local (PROCESS_LOCAL)
14/07/21 23:07:22 INFO TaskSetManager: Serialized task 3.0:0 as 8620 bytes
in 1 ms
14/07/21 23:07:36 INFO BlockManagerInfo: Added taskresult_1620 in memory on
r104u05.oculus.local:50795 (size: 64.9 MB, free: 18.3 GB)
14/07/21 23:07:36 INFO SendingConnection: Initiating connection to [r104u05.
oculus.local/192.168.0.105:50795]
14/07/21 23:07:57 INFO ConnectionManager: key already cancelled ?
sun.nio.ch.SelectionKeyImpl@1d86a150
java.nio.channels.CancelledKeyException
at sun.nio.ch.SelectionKeyImpl.ensureValid(SelectionKeyImpl.java:73)
at sun.nio.ch.SelectionKeyImpl.interestOps(SelectionKeyImpl.java:77)
at
org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:265)
at
org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:115)
14/07/21 23:07:57 WARN SendingConnection: Error finishing connection to
r104u05.oculus.local/192.168.0.105:50795
java.net.ConnectException: Connection timed out
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:735)
at
org.apache.spark.network.SendingConnection.finishConnect(Connection.scala:318)
at
org.apache.spark.network.ConnectionManager$$anon$7.run(ConnectionManager.scala:202)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:724)
14/07/21 23:07:57 INFO ConnectionManager: Handling connection error on
connection to ConnectionManagerId(r104u05.oculus.local,50795)
14/07/21 23:07:57 INFO ConnectionManager: Removing SendingConnection to
ConnectionManagerId(r104u05.oculus.local,50795)
14/07/21 23:07:57 INFO ConnectionManager: Notifying
org.apache.spark.network.ConnectionManager$MessageStatus@13ad274d
14/07/21 23:07:57 INFO ConnectionManager: Handling connection error on
connection to ConnectionManagerId(r104u05.oculus.local,50795)
14/07/21 23:07:57 INFO ConnectionManager: Removing SendingConnection to
ConnectionManagerId(r104u05.oculus.local,50795)
14/07/21 23:07:57 INFO ConnectionManager: Removing SendingConnection to
ConnectionManagerId(r104u05.oculus.local,50795)
14/07/21 23:07:57 WARN TaskSetManager: Lost TID 1620 (task 3.0:0)
14/07/21 23:07:57 WARN TaskSetManager: Lost result for TID 1620 on host
r104u05.oculus.local

I've never seen this one before, and now it's coming up consistently.

Thanks,
 -Nathan


Re: defaultMinPartitions in textFile

2014-07-21 Thread Ye Xianjin
well, I think you miss this line of code in SparkContext.scala
line 1242-1243(master):
 /** Default min number of partitions for Hadoop RDDs when not given by user */
  def defaultMinPartitions: Int = math.min(defaultParallelism, 2)

so the defaultMinPartitions will be 2 unless the defaultParallelism is less 
than 2...


--  
Ye Xianjin
Sent with Sparrow (http://www.sparrowmailapp.com/?sig)


On Tuesday, July 22, 2014 at 10:18 AM, Wang, Jensen wrote:

 Hi,  
 I started to use spark on yarn recently and found a problem while 
 tuning my program.
   
 When SparkContext is initialized as sc and ready to read text file from hdfs, 
 the textFile(path, defaultMinPartitions) method is called.
 I traced down the second parameter in the spark source code and finally found 
 this:
conf.getInt(spark.default.parallelism, math.max(totalCoreCount.get(), 
 2))  in  CoarseGrainedSchedulerBackend.scala
   
 I do not specify the property “spark.default.parallelism” anywhere so the 
 getInt will return value from the larger one between totalCoreCount and 2.
   
 When I submit the application using spark-submit and specify the parameter: 
 --num-executors  2   --executor-cores 6, I suppose the totalCoreCount will be 
  
 2*6 = 12, so defaultMinPartitions will be 12.
   
 But when I print the value of defaultMinPartitions in my program, I still get 
 2 in return,  How does this happen, or where do I make a mistake?
  
  
  




RE: defaultMinPartitions in textFile

2014-07-21 Thread Wang, Jensen
Yes, Great.  I thought it was math.max instead of math.min on that line. Thank 
you!

From: Ye Xianjin [mailto:advance...@gmail.com]
Sent: Tuesday, July 22, 2014 11:37 AM
To: user@spark.apache.org
Subject: Re: defaultMinPartitions in textFile

well, I think you miss this line of code in SparkContext.scala
line 1242-1243(master):
 /** Default min number of partitions for Hadoop RDDs when not given by user */
  def defaultMinPartitions: Int = math.min(defaultParallelism, 2)
so the defaultMinPartitions will be 2 unless the defaultParallelism is less 
than 2...

--
Ye Xianjin
Sent with Sparrowhttp://www.sparrowmailapp.com/?sig


On Tuesday, July 22, 2014 at 10:18 AM, Wang, Jensen wrote:

Hi,

I started to use spark on yarn recently and found a problem while 
tuning my program.



When SparkContext is initialized as sc and ready to read text file from hdfs, 
the textFile(path, defaultMinPartitions) method is called.

I traced down the second parameter in the spark source code and finally found 
this:

   conf.getInt(spark.default.parallelism, math.max(totalCoreCount.get(), 2))  
in  CoarseGrainedSchedulerBackend.scala



I do not specify the property “spark.default.parallelism” anywhere so the 
getInt will return value from the larger one between totalCoreCount and 2.



When I submit the application using spark-submit and specify the parameter: 
--num-executors  2   --executor-cores 6, I suppose the totalCoreCount will be

2*6 = 12, so defaultMinPartitions will be 12.



But when I print the value of defaultMinPartitions in my program, I still get 2 
in return,  How does this happen, or where do I make a mistake?



RE: Joining by timestamp.

2014-07-21 Thread durga
Hi Chen,
I am new to the Spark as well as SparkSQL , could you please explain how
would I create a table and run query on top of it.That would be super
helpful.

Thanks,
D.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Joining-by-timestamp-tp10367p10381.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


RE: Joining by timestamp.

2014-07-21 Thread Cheng, Hao
Actually it's just a pseudo algorithm I described, you can do it with spark 
API. Hope the algorithm helpful.

-Original Message-
From: durga [mailto:durgak...@gmail.com] 
Sent: Tuesday, July 22, 2014 11:56 AM
To: u...@spark.incubator.apache.org
Subject: RE: Joining by timestamp.

Hi Chen,
I am new to the Spark as well as SparkSQL , could you please explain how would 
I create a table and run query on top of it.That would be super helpful.

Thanks,
D.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Joining-by-timestamp-tp10367p10381.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


RE: Joining by timestamp.

2014-07-21 Thread durga
Hi Chen,

Thank you very much for your reply. I think I do not understand how can I do
the join using spark api. If you have time , could you please write some
code . 

Thanks again,
D.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Joining-by-timestamp-tp10367p10383.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


RE: Joining by timestamp.

2014-07-21 Thread Cheng, Hao
Durga, you can start from the documents
  http://spark.apache.org/docs/latest/quick-start.html 
  http://spark.apache.org/docs/latest/programming-guide.html


-Original Message-
From: durga [mailto:durgak...@gmail.com] 
Sent: Tuesday, July 22, 2014 12:45 PM
To: u...@spark.incubator.apache.org
Subject: RE: Joining by timestamp.

Hi Chen,

Thank you very much for your reply. I think I do not understand how can I do 
the join using spark api. If you have time , could you please write some code . 

Thanks again,
D.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Joining-by-timestamp-tp10367p10383.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.