Re: Spark job tracker.
Hello Marcelo Vanzin, Can you explain bit more on this? I tried using client mode but can you explain how can i use this port to write the log or output to this port?Thanks in advance! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-job-tracker-tp8367p10287.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: spark1.0.1 spark sql error java.lang.NoClassDefFoundError: Could not initialize class $line11.$read$
Hi,Kevin I tried it on spark1.0.0, it works fine. It's a bug in spark1.0.1 ... Thanks, Victor -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark1-0-1-spark-sql-error-java-lang-NoClassDefFoundError-Could-not-initialize-class-line11-read-tp10135p10288.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: TreeNodeException: No function to evaluate expression. type: AttributeReference, tree: id#0 on GROUP BY
Aha, that makes sense. Thanks for the response! I guess one of the areas Spark could need some love in in error messages (: On Fri, Jul 18, 2014 at 9:41 PM, Michael Armbrust mich...@databricks.com wrote: Sorry for the non-obvious error message. It is not valid SQL to include attributes in the select clause unless they are also in the group by clause or are inside of an aggregate function. On Jul 18, 2014 5:12 AM, Martin Gammelsæter martingammelsae...@gmail.com wrote: Hi again! I am having problems when using GROUP BY on both SQLContext and HiveContext (same problem). My code (simplified as much as possible) can be seen here: http://pastebin.com/33rjW67H In short, I'm getting data from a Cassandra store with Datastax' new driver (which works great by the way, recommended!), and mapping it to a Spark SQL table through a Product class (Dokument in the source). Regular SELECTs and stuff works fine, but once I try to do a GROUP BY, I get the following error: Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0:25 failed 4 times, most recent failure: Exception failure in TID 63 on host 192.168.121.132: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: No function to evaluate expression. type: AttributeReference, tree: id#0 org.apache.spark.sql.catalyst.expressions.AttributeReference.eval(namedExpressions.scala:158) org.apache.spark.sql.catalyst.expressions.MutableProjection.apply(Projection.scala:64) org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7$$anon$1.next(Aggregate.scala:195) org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7$$anon$1.next(Aggregate.scala:174) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) scala.collection.Iterator$class.foreach(Iterator.scala:727) scala.collection.AbstractIterator.foreach(Iterator.scala:1157) scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) scala.collection.AbstractIterator.to(Iterator.scala:1157) scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) scala.collection.AbstractIterator.toArray(Iterator.scala:1157) org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:750) org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:750) org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1096) org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1096) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:112) org.apache.spark.scheduler.Task.run(Task.scala:51) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) What am I doing wrong? -- Best regards, Martin Gammelsæter -- Mvh. Martin Gammelsæter 92209139
Re: spark sql left join gives KryoException: Buffer overflow
Hi Michael, Thanks for the suggestion. In my query, both table are too large to use broadcast join. When SPARK-2211 is done, will spark sql automatically choose join algorithms? Is there some way to manually hint the optimizer? 2014-07-19 5:23 GMT+08:00 Michael Armbrust mich...@databricks.com: Unfortunately, this is a query where we just don't have an efficiently implementation yet. You might try switching the table order. Here is the JIRA for doing something more efficient: https://issues.apache.org/jira/browse/SPARK-2212 On Fri, Jul 18, 2014 at 7:05 AM, Pei-Lun Lee pl...@appier.com wrote: Hi, We have a query with left joining and got this error: Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1.0:0 failed 4 times, most recent failure: Exception failure in TID 5 on host ip-10-33-132-101.us-west-2.compute.internal: com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 0, required: 1 Looks like spark sql tried to do a broadcast join and collecting one of the table to master but it is too large. How do we explicitly control the join behavior like this? -- Pei-Lun Lee
LabeledPoint with weight
It seems MLlib right now doesn't support weighted training, training samples have equal importance. Weighted training can be very useful to reduce data size and speed up training. Do you have plan to support it in future? The data format will be something like: label:*weight * index1:value1 index2:value2 ... -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/LabeledPoint-with-weight-tp10291.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Can't see any thing one the storage panel of application UI
Hi, I'm running LogisticRegression of mllib. But I can't see the rdd information from storage panel. http://apache-spark-user-list.1001560.n3.nabble.com/file/n10296/a.png http://apache-spark-user-list.1001560.n3.nabble.com/file/n10296/b.png -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Can-t-see-any-thing-one-the-storage-panel-of-application-UI-tp10296.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Can't see any thing one the storage panel of application UI
Am getting the same issue . Spark version : 1.0 On 21/07/14 4:16 PM, binbinbin915 binbinbin...@live.cn wrote: Hi, I'm running LogisticRegression of mllib. But I can't see the rdd information from storage panel. http://apache-spark-user-list.1001560.n3.nabble.com/file/n10296/a.png http://apache-spark-user-list.1001560.n3.nabble.com/file/n10296/b.png -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Can-t-see-any-thing-on e-the-storage-panel-of-application-UI-tp10296.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: the default GraphX graph-partition strategy on multicore machine?
Thanks so much, Ankur, :)) Excuse me but I am wondering that: (for a chosen partition strategy for my application) 1.1) how to check the size of each partition? is there any api, or log file? 1.2) how to check the processing cost of each partition(time, memory, etc)? 2.1) and the global communication cost of my application upon the chosen partition strategy? On Jul 18, 2014, at 9:18 PM, Ankur Dave ankurd...@gmail.com wrote: Sorry, I didn't read your vertex replication example carefully, so my previous answer is wrong. Here's the correct one: On Fri, Jul 18, 2014 at 9:13 AM, Yifan LI iamyifa...@gmail.com wrote: I don't understand, for instance, we have 3 edge partition tables(EA: a - b, a - c; EB: a - d, a - e; EC: d - c ), 2 vertex partition tables(VA: a, b, c; VB: d, e), the whole vertex table VA will be replicated to all these 3 edge partitions? since each of them refers to some vertexes in VA. Vertices can be replicated individually without requiring the entire vertex partition to be replicated. In this case, here's what will get replicated to each partition: EA: a (from VA), b (from VA), c (from VA) EB: a (from VA), d (from VB), e (from VB) EC: c (from VA), d (from VB) Ankur
Re: DynamoDB input source
Hi, I am invoking the spark-shell (Spark 1.0.0) with: spark-shell --jars \ libs/aws-java-sdk-1.3.26.jar,\ libs/httpclient-4.1.1.jar,\ libs/httpcore-nio-4.1.jar,\ libs/gson-2.1.jar,\ libs/httpclient-cache-4.1.1.jar,\ libs/httpmime-4.1.1.jar,\ libs/hive-dynamodb-handler-0.11.0.jar,\ libs/httpcore-4.1.jar,\ libs/joda-time-2.1.jar and, entering the following in the shell: import org.apache.hadoop.io.Text; import org.apache.hadoop.dynamodb.DynamoDBItemWritable import org.apache.hadoop.dynamodb.read.DynamoDBInputFormat import org.apache.hadoop.mapred.JobConf var jobConf = new JobConf(sc.hadoopConfiguration) jobConf.set(dynamodb.servicename, dynamodb) jobConf.set(dynamodb.input.tableName, ...) jobConf.set(dynamodb.endpoint, dynamodb.eu-west-1.amazonaws.com) jobConf.set(dynamodb.regionid, eu-west-1) jobConf.set(dynamodb.throughput.read, 1) jobConf.set(dynamodb.throughput.read.percent, 1) jobConf.set(dynamodb.awsAccessKeyId, ...) jobConf.set(dynamodb.awsSecretAccessKey, ...) jobConf.set(mapred.output.format.class, org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat) var users = sc.hadoopRDD(jobConf, classOf[DynamoDBInputFormat], classOf[Text], classOf[DynamoDBItemWritable]) users.count() This is raising an npe for FileSplit (as below). Any suggestions on what I might pursue to correct this would be very welcome. ian 14/07/20 23:56:03 INFO deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id 14/07/20 23:56:03 INFO JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId= 14/07/20 23:56:03 INFO AbstractDynamoDBInputFormat: Throughput percent: 1.0 14/07/20 23:56:03 INFO EndpointProvider: Using endpoint for DynamoDB: dynamodb.eu-west-1.amazonaws.com 14/07/20 23:56:03 INFO DynamoDBClient: Describe Table Output: {Table: {TableName: ..., KeySchema: {HashKeyElement: {AttributeName: id, AttributeType: S, }, }, TableStatus: ACTIVE, CreationDateTime: Wed May 07 14:38:30 BST 2014, ProvisionedThroughput: {ReadCapacityUnits: 4, WriteCapacityUnits: 4, }, TableSizeBytes: 2473, ItemCount: 14, }, } 14/07/20 23:56:03 INFO SparkContext: Starting job: count at console:21 14/07/20 23:56:03 INFO DAGScheduler: Got job 0 (count at console:21) with 1 output partitions (allowLocal=false) 14/07/20 23:56:03 INFO DAGScheduler: Final stage: Stage 0(count at console:21) 14/07/20 23:56:03 INFO DAGScheduler: Parents of final stage: List() 14/07/20 23:56:03 INFO DAGScheduler: Missing parents: List() 14/07/20 23:56:03 INFO DAGScheduler: Submitting Stage 0 (HadoopRDD[0] at hadoopRDD at console:18), which has no missing parents 14/07/20 23:56:03 ERROR DAGSchedulerActorSupervisor: eventProcesserActor failed due to the error null; shutting down SparkContext 14/07/20 23:56:04 INFO SparkUI: Stopped Spark web UI at http://10.0.1.7:4040 14/07/20 23:56:04 INFO DAGScheduler: Stopping DAGScheduler 14/07/20 23:56:05 INFO MapOutputTrackerMasterActor: MapOutputTrackerActor stopped! 14/07/20 23:56:05 INFO ConnectionManager: Selector thread was interrupted! 14/07/20 23:56:05 INFO ConnectionManager: ConnectionManager stopped 14/07/20 23:56:05 INFO MemoryStore: MemoryStore cleared 14/07/20 23:56:05 INFO BlockManager: BlockManager stopped 14/07/20 23:56:05 INFO BlockManagerMasterActor: Stopping BlockManagerMaster 14/07/20 23:56:05 INFO BlockManagerMaster: BlockManagerMaster stopped 14/07/20 23:56:05 INFO SparkContext: Successfully stopped SparkContext 14/07/20 23:56:05 ERROR OneForOneStrategy: java.lang.NullPointerException at org.apache.hadoop.mapreduce.lib.input.FileSplit.write(FileSplit.java:80) at org.apache.hadoop.mapred.FileSplit.write(FileSplit.java:85) at org.apache.hadoop.dynamodb.split.DynamoDBSplit.write(DynamoDBSplit.java:63) at org.apache.hadoop.io.ObjectWritable.writeObject(ObjectWritable.java:202) at org.apache.hadoop.io.ObjectWritable.writeObject(ObjectWritable.java:128) at org.apache.hadoop.io.ObjectWritable.write(ObjectWritable.java:82) at org.apache.spark.SerializableWritable.writeObject(SerializableWritable.scala:35) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
DROP IF EXISTS still throws exception about table does not exist?
Hi, all When I try hiveContext.hql(drop table if exists abc) where abc is a non-exist table I still received an exception about non-exist table though if exists is there the same statement runs well in hive shell Some feedback from Hive community is here: https://issues.apache.org/jira/browse/HIVE-7458 “Your are doing hiveContext.hql(DROP TABLE IF EXISTS hivetesting) in Scala schell of the Spark project. What this shell is doing ? Query to remote metastore on non existing table (see on your provided stack). The remote metastore throws NoSuchObjectException(message:default.hivetesting table not found)because Spark code call HiveMetaStoreClient.getTable(HiveMetaStoreClient.java:854) on non-existing table. It's the right behavior. You should check on Spark code why a query is done on non existing table. I think Spark does not handle well the IF EXISTS part of this query. Maybe you could fill a ticket on Spark JIRA. BUT, it's not a bug in HIVE IMHO.” My question is the DDL is executed by Hive itself, doesn’t it? Best, -- Nan Zhu
Re: NullPointerException When Reading Avro Sequence Files
For those curious I used the JavaSparkContext and got access to an AvroSequenceFile (wrapper around Sequence File) using the following: file = sc.newAPIHadoopFile(hdfs path to my file, AvroSequenceFileInputFormat.class, AvroKey.class, AvroValue.class, new Configuration()) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/NullPointerException-when-reading-Avro-Sequence-files-tp10201p10305.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: java.lang.OutOfMemoryError: GC overhead limit exceeded
Hi Yifan This works for me: export SPARK_JAVA_OPTS=-Xms10g -Xmx40g -XX:MaxPermSize=10g export ADD_JARS=/home/abel/spark/MLI/target/MLI-assembly-1.0.jar export SPARK_MEM=40g ./spark-shell Regards On Mon, Jul 21, 2014 at 7:48 AM, Yifan LI iamyifa...@gmail.com wrote: Hi, I am trying to load the Graphx example dataset(LiveJournal, 1.08GB) through *Scala Shell* on my standalone multicore machine(8 cpus, 16GB mem), but an OutOfMemory error was returned when below code was running, val graph = GraphLoader.edgeListFile(sc, path, minEdgePartitions = 16).partitionBy(PartitionStrategy.RandomVertexCut) I guess I should set some parameters to JVM? like -Xmx5120m But how to do this in Scala Shell? I directly used the bin/spark-shell to start spark and seems everything works correctly in WebUI. Or, I should do parameters setting at somewhere(spark-1.0.1)? Best, Yifan LI
Is there anyone who use streaming join to filter spam as guide mentioned?
Hello guys, I'm just trying to use spark streaming features. I noticed that there is join example for filtering spam, so I just want to try. But, nothing happens after join, the output JavaPairDStream content is same as before. So, is there any examples that I can refer to? Thanks for any suggestions. Regards, Hawk
Why spark-submit command hangs?
Hi Experts, I setup Yarn and Spark env: all services runs on a single node. And then submited a WordCount job using spark-submit script with command:./bin/spark-submit tests/wordcount-spark-scala.jar --class scala.spark.WordCount --num-executors 1 --driver-memory 300M --executor-memory 300M --executor-cores 1 yarn-standalone hdfs://hostname/tmp/input hdfs://hostname/tmp/output However, the command hangs and no job is submited to Yarn. Any comments? output:Spark assembly has been built with Hive, including Datanucleus jars on classpath 14/07/21 22:38:42 WARN spark.SparkConf: null jar passed to SparkContext constructor 14/07/21 22:38:43 INFO spark.SecurityManager: Changing view acls to: biadmin 14/07/21 22:38:43 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(biadmin) 14/07/21 22:38:43 INFO slf4j.Slf4jLogger: Slf4jLogger started 14/07/21 22:38:43 INFO Remoting: Starting remoting 14/07/21 22:38:43 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://spark@hostname:56903] 14/07/21 22:38:43 INFO Remoting: Remoting now listens on addresses: [akka.tcp://spark@hostname:56903] 14/07/21 22:38:43 INFO spark.SparkEnv: Registering MapOutputTracker 14/07/21 22:38:43 INFO spark.SparkEnv: Registering BlockManagerMaster 14/07/21 22:38:43 INFO storage.DiskBlockManager: Created local directory at /tmp/spark-local-20140721223843-75cd 14/07/21 22:38:43 INFO storage.MemoryStore: MemoryStore started with capacity 180.0 MB. 14/07/21 22:38:43 INFO network.ConnectionManager: Bound socket to port 57453 with id = ConnectionManagerId(hostname,57453) 14/07/21 22:38:43 INFO storage.BlockManagerMaster: Trying to register BlockManager 14/07/21 22:38:43 INFO storage.BlockManagerInfo: Registering block manager hostname:57453 with 180.0 MB RAM 14/07/21 22:38:43 INFO storage.BlockManagerMaster: Registered BlockManager 14/07/21 22:38:43 INFO spark.HttpServer: Starting HTTP Server 14/07/21 22:38:43 INFO server.Server: jetty-8.y.z-SNAPSHOT 14/07/21 22:38:43 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:19323 14/07/21 22:38:43 INFO broadcast.HttpBroadcast: Broadcast server started at http://9.123.99.10:19323 14/07/21 22:38:43 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-e224a31b-4517-43d8-9778-4b6af07dcad2 14/07/21 22:38:43 INFO spark.HttpServer: Starting HTTP Server 14/07/21 22:38:43 INFO server.Server: jetty-8.y.z-SNAPSHOT 14/07/21 22:38:43 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:35420 14/07/21 22:38:43 INFO server.Server: jetty-8.y.z-SNAPSHOT 14/07/21 22:38:43 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040 14/07/21 22:38:43 INFO ui.SparkUI: Started SparkUI at http://hostname:4040 14/07/21 22:38:44 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/07/21 22:38:44 WARN spark.SparkContext: yarn-standalone is deprecated as of Spark 1.0. Use yarn-cluster instead. 14/07/21 22:38:44 INFO cluster.YarnClusterScheduler: Created YarnClusterScheduler 14/07/21 22:38:44 INFO yarn.ApplicationMaster$$anon$1: Adding shutdown hook for context org.apache.spark.SparkContext@610c610c Thanks! Sam Liu
Re: java.lang.OutOfMemoryError: GC overhead limit exceeded
Thanks, Abel. Best, Yifan LI On Jul 21, 2014, at 4:16 PM, Abel Coronado Iruegas acoronadoirue...@gmail.com wrote: Hi Yifan This works for me: export SPARK_JAVA_OPTS=-Xms10g -Xmx40g -XX:MaxPermSize=10g export ADD_JARS=/home/abel/spark/MLI/target/MLI-assembly-1.0.jar export SPARK_MEM=40g ./spark-shell Regards On Mon, Jul 21, 2014 at 7:48 AM, Yifan LI iamyifa...@gmail.com wrote: Hi, I am trying to load the Graphx example dataset(LiveJournal, 1.08GB) through Scala Shell on my standalone multicore machine(8 cpus, 16GB mem), but an OutOfMemory error was returned when below code was running, val graph = GraphLoader.edgeListFile(sc, path, minEdgePartitions = 16).partitionBy(PartitionStrategy.RandomVertexCut) I guess I should set some parameters to JVM? like -Xmx5120m But how to do this in Scala Shell? I directly used the bin/spark-shell to start spark and seems everything works correctly in WebUI. Or, I should do parameters setting at somewhere(spark-1.0.1)? Best, Yifan LI
Is deferred execution of multiple RDDs ever coming?
Hello fellow Sparkians. In https://groups.google.com/d/msg/spark-users/eXV7Bp3phsc/gVgm-MdeEkwJ, Matei suggested that Spark might get deferred grouping and forced execution of multiple jobs in an efficient way. His code sample: rdd.reduceLater(reduceFunction1) // returns Future[ResultType1] rdd.reduceLater(reduceFunction2) // returns Future[ResultType2] SparkContext.force() // executes all the later operations as part of a single optimized job This would be immensely useful. If you ever want to do a thing where you do two passes over the data and save two different results to disk, you either have to cache the RDD which can be slow or deprive the processing code of memory, or recompute the whole thing twice. If Spark was smart enough to let you group together these operations and fork an RDD (say an RDD.partition method), you could very easily implement these n-pass operations across RDDs and have spark execute them efficiently. Our use case for a feature like this is processing many records and attaching metadata to the records during processing about our confidence in the data-points, and then writing the data to one spot and the metadata to another spot. I've also wanted this for taking a dataset, profiling it for partition size or anomalously sized partitions, and then using the profiling result to repartition the data before saving it to disk, which I think is impossible to do without caching right now. This use case is a bit more interesting because information from earlier on in the DAG needs to influence later stages, and so I suspect the answer will be cache the thing. I explicitly don't want to cache it because I'm not really doing an iterative algorithm where I'm willing to pay the heap and time penalties, I'm just doing an operation which needs run-time information without a collect call. This suggests that something like a repartition with a lazily evaluated accumulator might work as well, but I haven't been able to figure out a solution even with this primitive and the current APIs. So, does anyone know if this feature might land, and if not, where to start implementing it? What would the Python story for Futures be?
Re: Spark Streaming with long batch / window duration
So I think I may end up using hourglass (https://engineering.linkedin.com/datafu/datafus-hourglass-incremental-data-processing-hadoop) a hadoop framework for incremental data processing, it would be very cool if spark (not streaming ) could support something like this -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-with-long-batch-window-duration-tp10191p10311.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
gain access to persisted rdd
Hi, I am using pyspark and have persisted a list of rdds within a function, but I don't have a reference to them anymore. The RDD's are listed in the UI, under the Storage tab, and they have names associated to them (e.g. 4). Is it possible to access the RDD's to unpersist them? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/gain-access-to-persisted-rdd-tp10313.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Give more Java Heap Memory on Standalone mode
Hello, Currently I work on a project in which: I spawn a standalone Apache Spark MLlib job in Standalone mode, from a running Java Process. In the code of the Spark Job I have the following code: SparkConf sparkConf = new SparkConf().setAppName(SparkParallelLoad); sparkConf.set(spark.executor.memory, 8g); JavaSparkContext sc = new JavaSparkContext(sparkConf); ... Also, in my ~/spark/conf/spark-env.sh I have the following values: SPARK_WORKER_CORES=1 export SPARK_WORKER_CORES=1 SPARK_WORKER_MEMORY=2g export SPARK_WORKER_MEMORY=2g SPARK_DAEMON_JAVA_OPTS=-Dspark.deploy.spark.executor.memory=4g export SPARK_DAEMON_JAVA_OPTS=-Dspark.deploy.spark.executor.memory=4g During runtime I receive a Java OutOfMemory exception and a Core dump. My dataset is less than 1 GB and I want to make sure that I cache it all in memory for my ML task. Am I increasing the JVM Heap Memory correctly? Am I doing something wrong? Thank you, Nick
Re: spark1.0.1 spark sql error java.lang.NoClassDefFoundError: Could not initialize class $line11.$read$
Hi Victor, Instead of importing sqlContext.createSchemaRDD, can you explicitly call sqlContext.createSchemaRDD(rdd) to create a SchemaRDD? For example, You have a case class Record. case class Record(data_date: String, mobile: String, create_time: String) Then, you create a RDD[Record] and let's call it mobile. Instead of using mobile.registerAsTable(mobile), can you try the following snippet and see if it works? val mobileSchemaRDD = sqlContext.createSchemaRDD(mobile) mobileSchemaRDD.registerAsTable(mobile) Thanks, Yin On Sun, Jul 20, 2014 at 11:10 PM, Victor Sheng victorsheng...@gmail.com wrote: Hi,Kevin I tried it on spark1.0.0, it works fine. It's a bug in spark1.0.1 ... Thanks, Victor -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark1-0-1-spark-sql-error-java-lang-NoClassDefFoundError-Could-not-initialize-class-line11-read-tp10135p10288.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark Streaming timing considerations
Hi TD, Thanks for the help. The only problem left here is that the dstreamTime contains some extra information which seems date i.e. 1405944367000 ms whereas my application timestamps are just in sec which I converted to ms. e.g. 2300, 2400, 2500 etc. So the filter doesn't take effect. I was thinking to add that extra info to my Time(4000). But I am not really sure what it is? val keyAndValues = eegStreams.map(x= { val token = x.split( ) ((token(0).toDouble * 1000).toLong,token(1).toDouble) }) val transformed = keyAndValues.window(Seconds(8),Seconds(4)).transform((windowedRDD, dstreamTime) = { val currentAppTimeWindowStart = dstreamTime - Time(4000) // define the window over the timestamp that you want to process val currentAppTimeWindowEnd = dstreamTime val filteredRDD = windowedRDD.filter(r = Duration(r._1) currentAppTimeWindowStart Time(r._1) = currentAppTimeWindowEnd) filteredRDD }) The sample input is as under AppTimestamp Datapoints 0 -145.934066 0.003906 0.19536 0.007812 0.19536 0.011719 0.19536 0.015625 0.19536 0.019531 0.976801 0.023438 0.586081 0.027344 -1.758242 0.03125 -1.367521 0.035156 2.930403 0.039062 4.102564 0.042969 3.711844 0.046875 2.148962 0.050781 -4.102564 0.054688 -1.758242 0.058594 3.711844 0.0625 9.181929 0.066406 11.135531 0.070312 4.884005 0.074219 0.976801 0.078125 4.493284 0.082031 11.135531 0.085938 12.698413 0.089844 15.824176 0.09375 21.684982 0.097656 22.466422 0.101562 18.949939 0.105469 14.652015 0.109375 11.135531 0.113281 1.758242 0.117188 -6.056166 0.121094 -0.976801 0.125 0.19536 0.128906 -6.837607 0.132812 -8.400488 0.136719 -14.261294 0.140625 -24.810745 0.144531 -25.592186 0.148438 -19.73138 0.152344 -18.559219 0.15625 -25.201465 Regards, Laeeq On Thursday, July 17, 2014 8:58 PM, Tathagata Das tathagata.das1...@gmail.com wrote: You have to define what is the range records that needs to be filtered out in every windowed RDD, right? For example, when the DStream.window has data from from times 0 - 8 seconds by DStream time, you only want to filter out data that falls into say 4 - 8 seconds by application time. This latter is the application-level time window that you need to define in the transform function. What may help is that there is another version of transform which allows you to get the current DStream time (that is, it will give the value 8) from which you can calculate the app-time-window 4 - 8. val transformed = keyAndValues.window(Seconds(8), Seconds(4)).transform((windowedRDD: RDD[...], dstreamTime: Time) = { val currentAppTimeWindowStart = dstreamTime - appTimeWindowSize // define the window over the timestamp that you want to process val currentAppTimeWindowEnd = dstreamTime val filteredRDD = windowedRDD.filter(r = r._1 = currentAppTimeWindowEnd r._1 currentAppTimeWindowStart) // filter and retain only the records that fall in the current app-time window return filteredRDD }) Hope this helps! TD On Thu, Jul 17, 2014 at 5:54 AM, Laeeq Ahmed laeeqsp...@yahoo.com wrote: Hi TD, I have been able to filter the first WindowedRDD, but I am not sure how to make a generic filter. The larger window is 8 seconds and want to fetch 4 second based on application-time-stamp. I have seen an earlier post which suggest timeStampBasedwindow but I am not sure how to make timestampBasedwindow in the following example. val transformed = keyAndValues.window(Seconds(8), Seconds(4)).transform(windowedRDD = { //val timeStampBasedWindow = ??? // define the window over the timestamp that you want to process val filteredRDD = windowedRDD.filter(_._1 4) // filter and retain only the records that fall in the timestamp-based window return filteredRDD }) Consider the input tuples as (1,23),(1.2,34) . . . . . (3.8,54)(4,413) . . . whereas key is the timestamp. Regards, Laeeq On Saturday, July 12, 2014 8:29 PM, Laeeq Ahmed laeeqsp...@yahoo.com wrote: Hi, Thanks I will try to implement it. Regards, Laeeq On Saturday, July 12, 2014 4:37 AM, Tathagata Das tathagata.das1...@gmail.com wrote: This is not in the current streaming API. Queue stream is useful for testing with generated RDDs, but not for actual data. For actual data stream, the slack time can be implemented by doing DStream.window on a larger window that take slack time in consideration, and then the required application-time-based-window of data filtered out. For example, if you want a slack time of 1 minute and batches of 10 seconds, then do a window operation of 70 seconds, then in each RDD filter out the records with the desired application time and process them. TD On Fri, Jul 11, 2014 at 7:44 AM, Laeeq Ahmed laeeqsp...@yahoo.com wrote: Hi, In the
Re: Spark Streaming timing considerations
That is just standard Unix time. 1405944367000 = Sun, 09 Aug 46522 05:56:40 GMT On Mon, Jul 21, 2014 at 5:43 PM, Laeeq Ahmed laeeqsp...@yahoo.com wrote: Hi TD, Thanks for the help. The only problem left here is that the dstreamTime contains some extra information which seems date i.e. 1405944367000 ms whereas my application timestamps are just in sec which I converted to ms. e.g. 2300, 2400, 2500 etc. So the filter doesn't take effect. I was thinking to add that extra info to my Time(4000). But I am not really sure what it is?
Re: Spark Streaming timing considerations
Uh, right. I mean: 1405944367 = Mon, 21 Jul 2014 12:06:07 GMT On Mon, Jul 21, 2014 at 5:47 PM, Sean Owen so...@cloudera.com wrote: That is just standard Unix time. 1405944367000 = Sun, 09 Aug 46522 05:56:40 GMT On Mon, Jul 21, 2014 at 5:43 PM, Laeeq Ahmed laeeqsp...@yahoo.com wrote: Hi TD, Thanks for the help. The only problem left here is that the dstreamTime contains some extra information which seems date i.e. 1405944367000 ms whereas my application timestamps are just in sec which I converted to ms. e.g. 2300, 2400, 2500 etc. So the filter doesn't take effect. I was thinking to add that extra info to my Time(4000). But I am not really sure what it is?
Re: Give more Java Heap Memory on Standalone mode
Thank you Abel, It seems that your advice worked. Even though I receive a message that it is a deprecated way of defining Spark Memory (the system prompts that I should set spark.driver.memory), the memory is increased. Again, thank you, Nick On Mon, Jul 21, 2014 at 9:42 AM, Abel Coronado Iruegas acoronadoirue...@gmail.com wrote: Hi Nick Maybe if you use: export SPARK_MEM=4g On Mon, Jul 21, 2014 at 11:35 AM, Nick R. Katsipoulakis kat...@cs.pitt.edu wrote: Hello, Currently I work on a project in which: I spawn a standalone Apache Spark MLlib job in Standalone mode, from a running Java Process. In the code of the Spark Job I have the following code: SparkConf sparkConf = new SparkConf().setAppName(SparkParallelLoad); sparkConf.set(spark.executor.memory, 8g); JavaSparkContext sc = new JavaSparkContext(sparkConf); ... Also, in my ~/spark/conf/spark-env.sh I have the following values: SPARK_WORKER_CORES=1 export SPARK_WORKER_CORES=1 SPARK_WORKER_MEMORY=2g export SPARK_WORKER_MEMORY=2g SPARK_DAEMON_JAVA_OPTS=-Dspark.deploy.spark.executor.memory=4g export SPARK_DAEMON_JAVA_OPTS=-Dspark.deploy.spark.executor.memory=4g During runtime I receive a Java OutOfMemory exception and a Core dump. My dataset is less than 1 GB and I want to make sure that I cache it all in memory for my ML task. Am I increasing the JVM Heap Memory correctly? Am I doing something wrong? Thank you, Nick
relationship of RDD[Array[String]] to Array[Array[String]]
It is really nice that Spark RDD's provide functions that are often equivalent to functions found in Scala collections. For example, I can call: myArray.map(myFx) and equivalently myRdd.map(myFx) Awesome! My question is this. Is it possible to write code that works on either an RDD or a local collection without having to have parallel implementations? I can't tell that RDD or Array share any supertypes or traits by looking at the respective scaladocs. Perhaps implicit conversions could be used here. What I would like to do is have a single function whose body is like this: myData.map(myFx) where myData could be an RDD[Array[String]] (for example) or an Array[Array[String]]. Has anyone had success doing this? Thanks, Philip
Re: relationship of RDD[Array[String]] to Array[Array[String]]
It's really more of a Scala question than a Spark question, but the standard OO (not Scala-specific) way is to create your own custom supertype (e.g. MyCollectionTrait), inherited/implemented by two concrete classes (e.g. MyRDD and MyArray), each of which manually forwards method calls to the corresponding pre-existing library implementations. Writing all those forwarding method calls is tedious, but Scala provides at least one bit of syntactic sugar, which alleviates having to type in twice the parameter lists for each method: http://stackoverflow.com/questions/8230831/is-method-parameter-forwarding-possible-in-scala I'm not seeing a way to utilize implicit conversions in this case. Since Scala is statically (albeit inferred) typed, I don't see a way around having a common supertype. On Monday, July 21, 2014 11:01 AM, Philip Ogren philip.og...@oracle.com wrote: It is really nice that Spark RDD's provide functions that are often equivalent to functions found in Scala collections. For example, I can call: myArray.map(myFx) and equivalently myRdd.map(myFx) Awesome! My question is this. Is it possible to write code that works on either an RDD or a local collection without having to have parallel implementations? I can't tell that RDD or Array share any supertypes or traits by looking at the respective scaladocs. Perhaps implicit conversions could be used here. What I would like to do is have a single function whose body is like this: myData.map(myFx) where myData could be an RDD[Array[String]] (for example) or an Array[Array[String]]. Has anyone had success doing this? Thanks, Philip
Re: relationship of RDD[Array[String]] to Array[Array[String]]
heya, Without a bit of gymnastic at the type level, nope. Actually RDD doesn't share any functions with the scala lib (the simple reason I could see is that the Spark's ones are lazy, the default implementations in Scala aren't). However, it'd be possible by implementing an implicit converter from a SeqLike (f.i.) to an RDD, nonetheless it'd be cumbersome because the overlap between the two world isn't entire (for instance, flatMap haven't the same semantic, drop is hard, etc). Also, it'd scary me a bit to have this kind of bazooka waiting me a next corner, by letting me think that a iterative like process can be ran in a distributed world :-). OTOH, the inverse is quite easy, an implicit conv from RDD to an Array is simply a call to collect (take care that RDD is not covariant -- I think it'd be related to the fact that the ClassTag is needed!?) only my .2 ¢ aℕdy ℙetrella about.me/noootsab [image: aℕdy ℙetrella on about.me] http://about.me/noootsab On Mon, Jul 21, 2014 at 7:01 PM, Philip Ogren philip.og...@oracle.com wrote: It is really nice that Spark RDD's provide functions that are often equivalent to functions found in Scala collections. For example, I can call: myArray.map(myFx) and equivalently myRdd.map(myFx) Awesome! My question is this. Is it possible to write code that works on either an RDD or a local collection without having to have parallel implementations? I can't tell that RDD or Array share any supertypes or traits by looking at the respective scaladocs. Perhaps implicit conversions could be used here. What I would like to do is have a single function whose body is like this: myData.map(myFx) where myData could be an RDD[Array[String]] (for example) or an Array[Array[String]]. Has anyone had success doing this? Thanks, Philip
Re: LiveListenerBus throws exception and weird web UI bug
I have the same error! Did you manage to fix it? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/LiveListenerBus-throws-exception-and-weird-web-UI-bug-tp8330p10324.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark 1.0.1 SQL on 160 G parquet file (snappy compressed, made by cloudera impala), 23 core and 60G mem / node, yarn-client mode, always failed
Instead of using union, can you try sqlContext.parquetFile(/user/ hive/warehouse/xxx_parquet.db).registerAsTable(parquetTable)? Then, var all = sql(select some_id, some_type, some_time from parquetTable).map(line = (line(0), (line(1).toString, line(2).toString.substring(0, 19 Thanks, Yin On Sun, Jul 20, 2014 at 8:58 AM, chutium teng@gmail.com wrote: like this: val sc = new SparkContext(new SparkConf().setAppName(SLA Filter)) val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext._ val suffix = args(0) sqlContext.parquetFile(/user/hive/warehouse/xxx_parquet.db/xx001_ + suffix).registerAsTable(xx001) sqlContext.parquetFile(/user/hive/warehouse/xxx_parquet.db/xx002_ + suffix).registerAsTable(xx002) ... ... var xx001 = sql(select some_id, some_type, some_time from xx001).map(line = (line(0), (line(1).toString, line(2).toString.substring(0, 19)) ) ) var xx002 = sql(select some_id, some_type, some_time from xx002).map(line = (line(0), (line(1).toString, line(2).toString.substring(0, 19)) ) ) ... ... var all = xx001 union xx002 ... union ... all..groupByKey.filter( kv = FilterSLA.filterSLA(kv._2.toSeq) ).saveAsTextFile(xxx) filterSLA will turn the input Seq[(String, String)] to Map, then check somethinkg like if map contains type1 and type2 and then if timestamp_type1 - timestamp_type2 2days thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-0-1-SQL-on-160-G-parquet-file-snappy-compressed-made-by-cloudera-impala-23-core-and-60G-mem-d-tp10254p10268.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
RE: Hive From Spark
Hi All, Currently, if you are running Spark HiveContext API with Hive 0.12, it won't work due to the following 2 libraries which are not consistent with Hive 0.12 and Hadoop as well. (Hive libs aligns with Hadoop libs, and as a common practice, they should be consistent to work inter-operable). These are under discussion in the 2 JIRA tickets: https://issues.apache.org/jira/browse/HIVE-7387 https://issues.apache.org/jira/browse/SPARK-2420 When I ran the command by tweaking the classpath and build for Spark 1.0.1-rc3, I was able to create table through HiveContext, however, when I fetch the data, due to incompatible API calls in Guava, it breaks. This is critical since it needs to map the cllumns to the RDD schema. Hive and Hadoop are using an older version of guava libraries (11.0.1) where Spark Hive is using guava 14.0.1+.The community isn't willing to downgrade to 11.0.1 which is the current version for Hadoop 2.2 and Hive 0.12. Be aware of protobuf version as well in Hive 0.12 (it uses protobuf 2.4). scalascala import org.apache.spark.SparkContext import org.apache.spark.SparkContextscala import org.apache.spark.sql.hive._ import org.apache.spark.sql.hive._scalascala val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) hiveContext: org.apache.spark.sql.hive.HiveContext = org.apache.spark.sql.hive.HiveContext@34bee01ascalascala hiveContext.hql(CREATE TABLE IF NOT EXISTS src (key INT, value STRING)) res0: org.apache.spark.sql.SchemaRDD = SchemaRDD[0] at RDD at SchemaRDD.scala:104 == Query Plan == Native command: executed by Hivescala hiveContext.hql(LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src) res1: org.apache.spark.sql.SchemaRDD = SchemaRDD[3] at RDD at SchemaRDD.scala:104 == Query Plan == Native command: executed by Hivescalascala // Queries are expressed in HiveQLscala hiveContext.hql(FROM src SELECT key, value).collect().foreach(println) java.lang.NoSuchMethodError: com.google.common.hash.HashFunction.hashInt(I)Lcom/google/common/hash/HashCode; at org.apache.spark.util.collection.OpenHashSet.org$apache$spark$util$collection$OpenHashSet$$hashcode(OpenHashSet.scala:261) at org.apache.spark.util.collection.OpenHashSet$mcI$sp.getPos$mcI$sp(OpenHashSet.scala:165) at org.apache.spark.util.collection.OpenHashSet$mcI$sp.contains$mcI$sp(OpenHashSet.scala:102) at org.apache.spark.util.SizeEstimator$$anonfun$visitArray$2.apply$mcVI$sp(SizeEstimator.scala:214) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at org.apache.spark.util.SizeEstimator$.visitArray(SizeEstimator.scala:210) at org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:169) at org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:161) at org.apache.spark.util.SizeEstimator$.estimate(SizeEstimator.scala:155) at org.apache.spark.storage.MemoryStore.putValues(MemoryStore.scala:75) at org.apache.spark.storage.MemoryStore.putValues(MemoryStore.scala:92) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:661) at org.apache.spark.storage.BlockManager.put(BlockManager.scala:546) at org.apache.spark.storage.BlockManager.putSingle(BlockManager.scala:812) at org.apache.spark.broadcast.HttpBroadcast.init(HttpBroadcast.scala:52) at org.apache.spark.broadcast.HttpBroadcastFactory.newBroadcast(HttpBroadcastFactory.scala:35) at org.apache.spark.broadcast.HttpBroadcastFactory.newBroadcast(HttpBroadcastFactory.scala:29) at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62) at org.apache.spark.SparkContext.broadcast(SparkContext.scala:776) at org.apache.spark.sql.hive.HadoopTableReader.init(TableReader.scala:60) at org.apache.spark.sql.hive.execution.HiveTableScan.init(HiveTableScan.scala:70) at org.apache.spark.sql.hive.HiveStrategies$HiveTableScans$$anonfun$4.apply(HiveStrategies.scala:73) at org.apache.spark.sql.hive.HiveStrategies$HiveTableScans$$anonfun$4.apply(HiveStrategies.scala:73) at org.apache.spark.sql.SQLContext$SparkPlanner.pruneFilterProject(SQLContext.scala:280) at org.apache.spark.sql.hive.HiveStrategies$HiveTableScans$.apply(HiveStrategies.scala:69) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59) at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:316) at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:316) at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:319) at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:319) at org.apache.spark.sql.SchemaRDD.collect(SchemaRDD.scala:420) at
Re: Hive From Spark
I haven't seen anyone actively 'unwilling' -- I hope not. See discussion at https://issues.apache.org/jira/browse/SPARK-2420 where I sketch what a downgrade means. I think it just hasn't gotten a looking over. Contrary to what I thought earlier, the conflict does in fact cause problems in theory, and you show it causes a problem in practice. Not to mention it causes issues for Hive-on-Spark now. On Mon, Jul 21, 2014 at 6:27 PM, Andrew Lee alee...@hotmail.com wrote: Hive and Hadoop are using an older version of guava libraries (11.0.1) where Spark Hive is using guava 14.0.1+. The community isn't willing to downgrade to 11.0.1 which is the current version for Hadoop 2.2 and Hive 0.12.
Re: Why spark-submit command hangs?
Hi Sam, Did you specify the MASTER in your spark-env.sh? I ask because I didn't see a --master in your launch command. Also, your app seems to take in a master (yarn-standalone). This is not exactly correct because by the time the SparkContext is launched locally, which is the default, it is too late to use yarn-cluster mode by definition, since the driver should launched within one of the containers on the worker machines. I would suggest the following: - change your application to not take in the Spark master as a command line argument - use yarn-cluster instead of yarn-standalone (which is deprecated) - add --master yarn-cluster in your spark-submit command Another worrying thing is the warning from your logs: 14/07/21 22:38:42 WARN spark.SparkConf: null jar passed to SparkContext constructor How are you creating your SparkContext? Andrew 2014-07-21 7:47 GMT-07:00 Sam Liu liuqiyun_sp...@sina.com: Hi Experts, I setup Yarn and Spark env: all services runs on a single node. And then submited a WordCount job using spark-submit script with command: ./bin/spark-submit tests/wordcount-spark-scala.jar --class scala.spark.WordCount --num-executors 1 --driver-memory 300M --executor-memory 300M --executor-cores 1 yarn-standalone hdfs://hostname/tmp/input hdfs://hostname/tmp/output However, the command hangs and no job is submited to Yarn. Any comments? output: Spark assembly has been built with Hive, including Datanucleus jars on classpath 14/07/21 22:38:42 WARN spark.SparkConf: null jar passed to SparkContext constructor 14/07/21 22:38:43 INFO spark.SecurityManager: Changing view acls to: biadmin 14/07/21 22:38:43 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(biadmin) 14/07/21 22:38:43 INFO slf4j.Slf4jLogger: Slf4jLogger started 14/07/21 22:38:43 INFO Remoting: Starting remoting 14/07/21 22:38:43 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://spark@hostname:56903] 14/07/21 22:38:43 INFO Remoting: Remoting now listens on addresses: [akka.tcp://spark@hostname:56903] 14/07/21 22:38:43 INFO spark.SparkEnv: Registering MapOutputTracker 14/07/21 22:38:43 INFO spark.SparkEnv: Registering BlockManagerMaster 14/07/21 22:38:43 INFO storage.DiskBlockManager: Created local directory at /tmp/spark-local-20140721223843-75cd 14/07/21 22:38:43 INFO storage.MemoryStore: MemoryStore started with capacity 180.0 MB. 14/07/21 22:38:43 INFO network.ConnectionManager: Bound socket to port 57453 with id = ConnectionManagerId(hostname,57453) 14/07/21 22:38:43 INFO storage.BlockManagerMaster: Trying to register BlockManager 14/07/21 22:38:43 INFO storage.BlockManagerInfo: Registering block manager hostname:57453 with 180.0 MB RAM 14/07/21 22:38:43 INFO storage.BlockManagerMaster: Registered BlockManager 14/07/21 22:38:43 INFO spark.HttpServer: Starting HTTP Server 14/07/21 22:38:43 INFO server.Server: jetty-8.y.z-SNAPSHOT 14/07/21 22:38:43 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:19323 14/07/21 22:38:43 INFO broadcast.HttpBroadcast: Broadcast server started at http://9.123.99.10:19323 14/07/21 22:38:43 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-e224a31b-4517-43d8-9778-4b6af07dcad2 14/07/21 22:38:43 INFO spark.HttpServer: Starting HTTP Server 14/07/21 22:38:43 INFO server.Server: jetty-8.y.z-SNAPSHOT 14/07/21 22:38:43 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:35420 14/07/21 22:38:43 INFO server.Server: jetty-8.y.z-SNAPSHOT 14/07/21 22:38:43 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040 14/07/21 22:38:43 INFO ui.SparkUI: Started SparkUI at http://hostname:4040 14/07/21 22:38:44 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/07/21 22:38:44 WARN spark.SparkContext: yarn-standalone is deprecated as of Spark 1.0. Use yarn-cluster instead. 14/07/21 22:38:44 INFO cluster.YarnClusterScheduler: Created YarnClusterScheduler 14/07/21 22:38:44 INFO yarn.ApplicationMaster$$anon$1: Adding shutdown hook for context org.apache.spark.SparkContext@610c610c Thanks! Sam Liu
Re: relationship of RDD[Array[String]] to Array[Array[String]]
Thanks Michael, That is one solution that I had thought of. It seems like a bit of overkill for the few methods I want to do this for - but I will think about it. I guess I was hoping that I was missing something more obvious/easier. Philip On 07/21/2014 11:20 AM, Michael Malak wrote: It's really more of a Scala question than a Spark question, but the standard OO (not Scala-specific) way is to create your own custom supertype (e.g. MyCollectionTrait), inherited/implemented by two concrete classes (e.g. MyRDD and MyArray), each of which manually forwards method calls to the corresponding pre-existing library implementations. Writing all those forwarding method calls is tedious, but Scala provides at least one bit of syntactic sugar, which alleviates having to type in twice the parameter lists for each method: http://stackoverflow.com/questions/8230831/is-method-parameter-forwarding-possible-in-scala I'm not seeing a way to utilize implicit conversions in this case. Since Scala is statically (albeit inferred) typed, I don't see a way around having a common supertype. On Monday, July 21, 2014 11:01 AM, Philip Ogren philip.og...@oracle.com wrote: It is really nice that Spark RDD's provide functions that are often equivalent to functions found in Scala collections. For example, I can call: myArray.map(myFx) and equivalently myRdd.map(myFx) Awesome! My question is this. Is it possible to write code that works on either an RDD or a local collection without having to have parallel implementations? I can't tell that RDD or Array share any supertypes or traits by looking at the respective scaladocs. Perhaps implicit conversions could be used here. What I would like to do is have a single function whose body is like this: myData.map(myFx) where myData could be an RDD[Array[String]] (for example) or an Array[Array[String]]. Has anyone had success doing this? Thanks, Philip
Re: LiveListenerBus throws exception and weird web UI bug
Hi all, This error happens because we receive a completed event for a particular stage that we don't know about, i.e. a stage we haven't received a submitted event for. The root cause of this, as Baoxu explained, is usually because the event queue is full and the listener begins to drop events. In this case we are dropping the submitted event. This particular exception should be fixed in the latest master, as we now check for whether the key exists before indexing directly into it. Unfortunately, this is not in Spark 1.0.1, but will be fixed in Spark 1.1. There is currently no bullet-proof workaround for this issue, but you might try to reduce the number of concurrently running tasks (partitions) to avoid emitting too many events. The root cause of the listener queue taking too much time to process events is recorded in SPARK-2316, which we also intend to fix by Spark 1.1. Andrew 2014-07-21 10:23 GMT-07:00 mrm ma...@skimlinks.com: I have the same error! Did you manage to fix it? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/LiveListenerBus-throws-exception-and-weird-web-UI-bug-tp8330p10324.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
RDD pipe partitionwise
Dear all, Is there any example of mapPartitions that fork external process or how to make RDD.pipe working on every data of a partition ? Cheers, Jaonary
Re: gain access to persisted rdd
Hi Maria, If you don't have a reference to a persisted RDD, it will be automatically unpersisted on the next GC by the ContextCleaner. This is implemented for scala, but should still work in python because python uses reference counting to clean up objects that are no longer strongly referenced. Andrew 2014-07-21 8:37 GMT-07:00 mrm ma...@skimlinks.com: Hi, I am using pyspark and have persisted a list of rdds within a function, but I don't have a reference to them anymore. The RDD's are listed in the UI, under the Storage tab, and they have names associated to them (e.g. 4). Is it possible to access the RDD's to unpersist them? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/gain-access-to-persisted-rdd-tp10313.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Give more Java Heap Memory on Standalone mode
Hi Nick and Abel, Looks like you are requesting 8g for your executors, but only allowing 2g on the workers. You should set SPARK_WORKER_MEMORY to at least 8g if you intend to use that much memory in your application. Also, you shouldn't have to set SPARK_DAEMON_JAVA_OPTS; you can just set spark.executor.memory as you have done so in your SparkConf. As you may have already noticed, SPARK_MEM is deprecated in favor of spark.executor.memory and spark.driver.memory. If you are running Spark 1.0+, you can use spark-submit with the --executor-memory and --driver-memory to set this on the command line. Andrew 2014-07-21 10:01 GMT-07:00 Nick R. Katsipoulakis kat...@cs.pitt.edu: Thank you Abel, It seems that your advice worked. Even though I receive a message that it is a deprecated way of defining Spark Memory (the system prompts that I should set spark.driver.memory), the memory is increased. Again, thank you, Nick On Mon, Jul 21, 2014 at 9:42 AM, Abel Coronado Iruegas acoronadoirue...@gmail.com wrote: Hi Nick Maybe if you use: export SPARK_MEM=4g On Mon, Jul 21, 2014 at 11:35 AM, Nick R. Katsipoulakis kat...@cs.pitt.edu wrote: Hello, Currently I work on a project in which: I spawn a standalone Apache Spark MLlib job in Standalone mode, from a running Java Process. In the code of the Spark Job I have the following code: SparkConf sparkConf = new SparkConf().setAppName(SparkParallelLoad); sparkConf.set(spark.executor.memory, 8g); JavaSparkContext sc = new JavaSparkContext(sparkConf); ... Also, in my ~/spark/conf/spark-env.sh I have the following values: SPARK_WORKER_CORES=1 export SPARK_WORKER_CORES=1 SPARK_WORKER_MEMORY=2g export SPARK_WORKER_MEMORY=2g SPARK_DAEMON_JAVA_OPTS=-Dspark.deploy.spark.executor.memory=4g export SPARK_DAEMON_JAVA_OPTS=-Dspark.deploy.spark.executor.memory=4g During runtime I receive a Java OutOfMemory exception and a Core dump. My dataset is less than 1 GB and I want to make sure that I cache it all in memory for my ML task. Am I increasing the JVM Heap Memory correctly? Am I doing something wrong? Thank you, Nick
Re: Spark 1.0.1 SQL on 160 G parquet file (snappy compressed, made by cloudera impala), 23 core and 60G mem / node, yarn-client mode, always failed
Hi, unfortunately it is not so straightforward xxx_parquet.db is a folder of managed database created by hive/impala, so, every sub element in it is a table in hive/impala, they are folders in HDFS, and each table has different schema, and in its folder there are one or more parquet files. that means xx001_suffix xx002_suffix are folders, there are some parquet files like xx001_suffix/parquet_file1_with_schema1 xx002_suffix/parquet_file1_with_schema2 xx002_suffix/parquet_file2_with_schema2 it seems only union can do this job~ Nonetheless, thank you very much, maybe the only reason is that spark eating up too much memory... -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-0-1-SQL-on-160-G-parquet-file-snappy-compressed-made-by-cloudera-impala-23-core-and-60G-mem-d-tp10254p10335.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Caching issue with msg: RDD block could not be dropped from memory as it does not exist
Hi Rindra, Depending on what you're doing with your groupBy, you may end up inflating your data quite a bit. Even if your machine has 16G, by default spark-shell only uses 512M, and the amount used for storing blocks is only 60% of that (spark.storage.memoryFraction), so this space becomes ~300M. This is still many multiples of the size of your dataset, but not by orders of magnitude. If you are running Spark 1.0+, you can increase the amount of memory used by spark-shell by adding --driver-memory 1g as a command line argument in local mode, or --executor-memory 1g in any other mode. (Also, it seems that you set your log level to WARN. The cause is most probably because the cache is not big enough, but setting the log level to INFO will provide you with more information on the exact sizes that are being used by the storage and the blocks). Andrew 2014-07-19 13:01 GMT-07:00 rindra rindra@gmail.com: Hi, I am working with a small dataset about 13Mbyte on the spark-shell. After doing a groupBy on the RDD, I wanted to cache RDD in memory but I keep getting these warnings: scala rdd.cache() res28: rdd.type = MappedRDD[63] at repartition at console:28 scala rdd.count() 14/07/19 12:45:18 WARN BlockManager: Block rdd_63_82 could not be dropped from memory as it does not exist 14/07/19 12:45:18 WARN BlockManager: Putting block rdd_63_82 failed 14/07/19 12:45:18 WARN BlockManager: Block rdd_63_40 could not be dropped from memory as it does not exist 14/07/19 12:45:18 WARN BlockManager: Putting block rdd_63_40 failed res29: Long = 5 It seems that I could not cache the data in memory even though my local machine has 16Gb RAM and the data is only 13MB with 100 partitions size. How to prevent this caching issue from happening? Thanks. Rindra -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Caching-issue-with-msg-RDD-block-could-not-be-dropped-from-memory-as-it-does-not-exist-tp10248.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: gain access to persisted rdd
but at least if user want to access the persisted RDDs, they can use sc.getPersistentRDDs in the same context. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/gain-access-to-persisted-rdd-tp10313p10337.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: spark streaming rate limiting from kafka
Hi Tathagata, I am currentlycreating multiple DStream to consumefrom different topics. How can I let each consumer consume from different partitions. I find the following parameters from Spark API: createStream[K, V, U : Decoder[_], T : Decoder[_]](jssc: JavaStreamingContext https://spark.apache.org/docs/1.0.0/api/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.html , keyTypeClass: Class[K], valueTypeClass: Class[V],keyDecoderClass: Class[U] , valueDecoderClass: Class[T], kafkaParams: Map[String, String], topics: Map [String, Integer],storageLevel: StorageLevel https://spark.apache.org/docs/1.0.0/api/scala/org/apache/spark/storage/StorageLevel.html ): JavaPairReceiverInputDStream https://spark.apache.org/docs/1.0.0/api/scala/org/apache/spark/streaming/api/java/JavaPairReceiverInputDStream.html [K, V] Create an input stream that pulls messages form a Kafka Broker. The topics parameter is: *Map of (topic_name - numPartitions) to consume. Each partition is consumed in its own thread* Does numPartitions mean the total number of partitions to consume from topic_name or the index of the partition? How can we specify for each createStream which partition of the Kafka topic to consume? I think if so, I will get a lot of parallelism from the source of the data. Thanks! Bill On Thu, Jul 17, 2014 at 6:21 PM, Tathagata Das tathagata.das1...@gmail.com wrote: You can create multiple kafka stream to partition your topics across them, which will run multiple receivers or multiple executors. This is covered in the Spark streaming guide. http://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving And for the purpose of this thread, to answer the original question, we now have the ability https://issues.apache.org/jira/browse/SPARK-1854?jql=project%20%3D%20SPARK%20AND%20resolution%20%3D%20Unresolved%20AND%20component%20%3D%20Streaming%20ORDER%20BY%20priority%20DESC to limit the receiving rate. Its in the master branch, and will be available in Spark 1.1. It basically sets the limits at the receiver level (so applies to all sources) on what is the max records per second that can will be received by the receiver. TD On Thu, Jul 17, 2014 at 6:15 PM, Tobias Pfeiffer t...@preferred.jp wrote: Bill, are you saying, after repartition(400), you have 400 partitions on one host and the other hosts receive nothing of the data? Tobias On Fri, Jul 18, 2014 at 8:11 AM, Bill Jay bill.jaypeter...@gmail.com wrote: I also have an issue consuming from Kafka. When I consume from Kafka, there are always a single executor working on this job. Even I use repartition, it seems that there is still a single executor. Does anyone has an idea how to add parallelism to this job? On Thu, Jul 17, 2014 at 2:06 PM, Chen Song chen.song...@gmail.com wrote: Thanks Luis and Tobias. On Tue, Jul 1, 2014 at 11:39 PM, Tobias Pfeiffer t...@preferred.jp wrote: Hi, On Wed, Jul 2, 2014 at 1:57 AM, Chen Song chen.song...@gmail.com wrote: * Is there a way to control how far Kafka Dstream can read on topic-partition (via offset for example). By setting this to a small number, it will force DStream to read less data initially. Please see the post at http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201406.mbox/%3ccaph-c_m2ppurjx-n_tehh0bvqe_6la-rvgtrf1k-lwrmme+...@mail.gmail.com%3E Kafka's auto.offset.reset parameter may be what you are looking for. Tobias -- Chen Song
Re: can't get jobs to run on cluster (enough memory and cpus are available on worker)
I got this working by having our sysadmin update our security group to allow incoming traffic from the local subnet on ports 1-65535. I'm not sure if there's a more specific range I could have used, but so far, everything is running! Thanks for all the responses Marcelo and Andrew!! Matt On Thu, Jul 17, 2014 at 9:10 PM, Andrew Or and...@databricks.com wrote: Hi Matt, The security group shouldn't be an issue; the ports listed in `spark_ec2.py` are only for communication with the outside world. How did you launch your application? I notice you did not launch your driver from your Master node. What happens if you did? Another thing is that there seems to be some inconsistency or missing pieces in the logs you posted. After an executor says driver disassociated, what happens in the driver logs? Is an exception thrown or something? It would be useful if you could also post your conf/spark-env.sh. Andrew 2014-07-17 14:11 GMT-07:00 Marcelo Vanzin van...@cloudera.com: Hi Matt, I'm not very familiar with setup on ec2; the closest I can point you at is to look at the launch_cluster in ec2/spark_ec2.py, where the ports seem to be configured. On Thu, Jul 17, 2014 at 1:29 PM, Matt Work Coarr mattcoarr.w...@gmail.com wrote: Thanks Marcelo! This is a huge help!! Looking at the executor logs (in a vanilla spark install, I'm finding them in $SPARK_HOME/work/*)... It launches the executor, but it looks like the CoarseGrainedExecutorBackend is having trouble talking to the driver (exactly what you said!!!). Do you know what the range of random ports that is used for the the executor-to-driver? Is that range adjustable? Any config setting or environment variable? I manually setup my ec2 security group to include all the ports that the spark ec2 script ($SPARK_HOME/ec2/spark_ec2.py) sets up in it's security groups. They included (for those listed above 1): 1 50060 50070 50075 60060 60070 60075 Obviously I'll need to make some adjustments to my EC2 security group! Just need to figure out exactly what should be in there. To keep things simple, I just have one security group for the master, slaves, and the driver machine. In listing the port ranges in my current security group I looked at the ports that spark_ec2.py sets up as well as the ports listed in the spark standalone mode documentation page under configuring ports for network security: http://spark.apache.org/docs/latest/spark-standalone.html Here are the relevant fragments from the executor log: Spark Executor Command: /cask/jdk/bin/java -cp ::/cask/spark/conf:/cask/spark/lib/spark-assembly-1.0.0-hadoop2.2.0.jar:/cask/spark/lib/datanucleus-api-jdo-3. 2.1.jar:/cask/spark/lib/datanucleus-rdbms-3.2.1.jar:/cask/spark/lib/datanucleus-core-3.2.2.jar -XX:MaxPermSize=128m -Dspark.akka.frameSize=100 -Dspark.akka. frameSize=100 -Xms512M -Xmx512M org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp://spark@ip-10-202-11-191.ec2.internal:46787/user/CoarseGra inedScheduler 0 ip-10-202-8-45.ec2.internal 8 akka.tcp://sparkWorker@ip-10-202-8-45.ec2.internal:7101/user/Worker app-20140717195146- ... 14/07/17 19:51:47 DEBUG NativeCodeLoader: Trying to load the custom-built native-hadoop library... 14/07/17 19:51:47 DEBUG NativeCodeLoader: Failed to load native-hadoop with error: java.lang.UnsatisfiedLinkError: no hadoop in java.library.path 14/07/17 19:51:47 DEBUG NativeCodeLoader: java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib 14/07/17 19:51:47 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/07/17 19:51:47 DEBUG JniBasedUnixGroupsMappingWithFallback: Falling back to shell based 14/07/17 19:51:47 DEBUG JniBasedUnixGroupsMappingWithFallback: Group mapping impl=org.apache.hadoop.security.ShellBasedUnixGroupsMapping 14/07/17 19:51:48 DEBUG Groups: Group mapping impl=org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback; cacheTimeout=30 14/07/17 19:51:48 DEBUG SparkHadoopUtil: running as user: ec2-user ... 14/07/17 19:51:48 INFO CoarseGrainedExecutorBackend: Connecting to driver: akka.tcp://spark@ip-10-202-11-191.ec2.internal :46787/user/CoarseGrainedScheduler 14/07/17 19:51:48 INFO WorkerWatcher: Connecting to worker akka.tcp://sparkWorker@ip-10-202-8-45.ec2.internal:7101/user/Worker 14/07/17 19:51:49 INFO WorkerWatcher: Successfully connected to akka.tcp://sparkWorker@ip-10-202-8-45.ec2.internal:7101/user/Worker 14/07/17 19:53:29 ERROR CoarseGrainedExecutorBackend: Driver Disassociated [akka.tcp://sparkExecutor@ip-10-202-8-45.ec2.internal:55670] - [akka.tcp://spark@ip-10-202-11-191.ec2.internal:46787] disassociated! Shutting down. Thanks a bunch! Matt
launching a spark cluster in ec2 from within an application
I would like to programmatically start a spark cluster in ec2 from another app running in ec2, run my job and then destroy the cluster. I can launch a spark EMR cluster easily enough using the SDK however I ran into two problems: 1) I was only able to retrieve the address of the master node from the console, not via the SDK. 2) I was not able to connect to the master from my app after setting spark://public_dns:7077 as the master in the SparkConf (where public_dns is the address listed for the cluster on the EMR console page in amazon). I kept getting all masters are unresponsive errors. In addition, the amazon docs only speak of running spark jobs in emr by ssh'ing to the master, launching a spark shell and running the jobs from there. Is it even possible to do programmatically from another app or must you login into the master and run jobs from the shell if you want to use spark in amazon EMR? The second approach I tried was simply calling the spark-ec2 script from my app passing the same parameters that I use to launch the cluster manually from the cli. This failed because the ec2.connect call returns None when called from my app (scala/java on play) whereas it works perfectly when called from the cli. Is there a recommended method to launch ec2 clusters dynamically from within an app running in ec2? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/launching-a-spark-cluster-in-ec2-from-within-an-application-tp10340.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: LabeledPoint with weight
This is a useful feature but it may be hard to have it in v1.1 due to limited time. Hopefully, we can support it in v1.2. -Xiangrui On Mon, Jul 21, 2014 at 12:58 AM, Jiusheng Chen chenjiush...@gmail.com wrote: It seems MLlib right now doesn't support weighted training, training samples have equal importance. Weighted training can be very useful to reduce data size and speed up training. Do you have plan to support it in future? The data format will be something like: label:*weight * index1:value1 index2:value2 ... -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/LabeledPoint-with-weight-tp10291.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark 1.0.1 SQL on 160 G parquet file (snappy compressed, made by cloudera impala), 23 core and 60G mem / node, yarn-client mode, always failed
What's the exception you're seeing? Is it an OOM? On Mon, Jul 21, 2014 at 11:20 AM, chutium teng@gmail.com wrote: Hi, unfortunately it is not so straightforward xxx_parquet.db is a folder of managed database created by hive/impala, so, every sub element in it is a table in hive/impala, they are folders in HDFS, and each table has different schema, and in its folder there are one or more parquet files. that means xx001_suffix xx002_suffix are folders, there are some parquet files like xx001_suffix/parquet_file1_with_schema1 xx002_suffix/parquet_file1_with_schema2 xx002_suffix/parquet_file2_with_schema2 it seems only union can do this job~ Nonetheless, thank you very much, maybe the only reason is that spark eating up too much memory... -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-0-1-SQL-on-160-G-parquet-file-snappy-compressed-made-by-cloudera-impala-23-core-and-60G-mem-d-tp10254p10335.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark job tracker.
An also i am facing one issue. If i run the program in yarn-cluster mode it works absolutely fine but if i change it to yarn-client mode i get this below error. Application application_1405471266091_0055 failed 2 times due to AM Container for appattempt_1405471266091_0055_02 exited with exitCode: -1000 due to: File does not exist: /user/hadoop/.sparkStaging/application_1405471266091_0055/commons-math3-3.0.jar I have commons-math3-3.0.jar in the class path and i am loading it to staging also. yarn.Client: Uploading file:***/commons-math3-3.0.jar to ***/user/hadoop/.sparkStaging/application_1405471266091_0055/commons-math3-3.0.jar. from the logs. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-job-tracker-tp8367p10343.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark 1.0.1 SQL on 160 G parquet file (snappy compressed, made by cloudera impala), 23 core and 60G mem / node, yarn-client mode, always failed
no, something like this 14/07/20 00:19:29 ERROR cluster.YarnClientClusterScheduler: Lost executor 2 on 02.xxx: remote Akka client disassociated ... ... 14/07/20 00:21:13 WARN scheduler.TaskSetManager: Lost TID 832 (task 1.2:186) 14/07/20 00:21:13 WARN scheduler.TaskSetManager: Loss was due to java.io.IOException java.io.IOException: Filesystem closed at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:703) at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:779) at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:840) at java.io.DataInputStream.readFully(DataInputStream.java:195) at java.io.DataInputStream.readFully(DataInputStream.java:169) at parquet.hadoop.ParquetFileReader$ConsecutiveChunkList.readAll(ParquetFileReader.java:599) at parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:360) at parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:100) at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:172) at parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:130) at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:122) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) ulimit is increased -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-0-1-SQL-on-160-G-parquet-file-snappy-compressed-made-by-cloudera-impala-23-core-and-60G-mem-d-tp10254p10344.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Does spark streaming fit to our application
Hi, Our application is required to do some aggregations on data that will be coming as a stream for over two months. I would like to know if spark streaming will be suitable for our requirement. After going through some documentation and videos i think we can do aggregations on data based on window timeframe that will be in minutes. I am not sure if we can cache that data or we can store the data in hdfs for further calculations in spark streaming. Please help!! Thanks, -Srini. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Does-spark-streaming-fit-to-our-application-tp10345.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
broadcast variable get cleaned by ContextCleaner unexpectedly ?
Hi, all When I run some Spark application (actually unit test of the application in Jenkins ), I found that I always hit the FileNotFoundException when reading broadcast variable The program itself works well, except the unit test Here is the example log: 14/07/21 19:49:13 INFO Executor: Running task ID 4 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 2) 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 2 in 95 ms on localhost (progress: 3/106) 14/07/21 19:49:13 INFO TableOutputFormat: Created table instance for hdfstest_customers 14/07/21 19:49:13 INFO Executor: Serialized size of result for 3 is 596 14/07/21 19:49:13 INFO Executor: Sending result for 3 directly to driver 14/07/21 19:49:13 INFO BlockManager: Found block broadcast_0 locally 14/07/21 19:49:13 INFO Executor: Finished task ID 3 14/07/21 19:49:13 INFO TaskSetManager: Starting task 0.0:5 as TID 5 on executor localhost: localhost (PROCESS_LOCAL) 14/07/21 19:49:13 INFO TaskSetManager: Serialized task 0.0:5 as 11885 bytes in 0 ms 14/07/21 19:49:13 INFO Executor: Running task ID 5 14/07/21 19:49:13 INFO BlockManager: Removing broadcast 0 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 3) 14/07/21 19:49:13 INFO ContextCleaner: Cleaned broadcast 0 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 3 in 97 ms on localhost (progress: 4/106) 14/07/21 19:49:13 INFO BlockManager: Found block broadcast_0 locally 14/07/21 19:49:13 INFO BlockManager: Removing block broadcast_0 14/07/21 19:49:13 INFO MemoryStore: Block broadcast_0 of size 202564 dropped from memory (free 886623436) 14/07/21 19:49:13 INFO ContextCleaner: Cleaned shuffle 0 14/07/21 19:49:13 INFO ShuffleBlockManager: Deleted all files for shuffle 0 14/07/21 19:49:13 INFO HadoopRDD: Input split: hdfs://172.31.34.184:9000/etltest/hdfsData/customer.csv:25+5 14/07/21 19:49:13 INFO HadoopRDD: Input split: hdfs://172.31.34.184:9000/etltest/hdfsData/customer.csv:20+5 14/07/21 19:49:13 INFO TableOutputFormat: Created table instance for hdfstest_customers 14/07/21 19:49:13 INFO Executor: Serialized size of result for 4 is 596 14/07/21 19:49:13 INFO Executor: Sending result for 4 directly to driver 14/07/21 19:49:13 INFO Executor: Finished task ID 4 14/07/21 19:49:13 INFO TaskSetManager: Starting task 0.0:6 as TID 6 on executor localhost: localhost (PROCESS_LOCAL) 14/07/21 19:49:13 INFO TaskSetManager: Serialized task 0.0:6 as 11885 bytes in 0 ms 14/07/21 19:49:13 INFO Executor: Running task ID 6 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 4) 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 4 in 80 ms on localhost (progress: 5/106) 14/07/21 19:49:13 INFO TableOutputFormat: Created table instance for hdfstest_customers 14/07/21 19:49:13 INFO Executor: Serialized size of result for 5 is 596 14/07/21 19:49:13 INFO Executor: Sending result for 5 directly to driver 14/07/21 19:49:13 INFO Executor: Finished task ID 5 14/07/21 19:49:13 INFO TaskSetManager: Starting task 0.0:7 as TID 7 on executor localhost: localhost (PROCESS_LOCAL) 14/07/21 19:49:13 INFO TaskSetManager: Serialized task 0.0:7 as 11885 bytes in 0 ms 14/07/21 19:49:13 INFO Executor: Running task ID 7 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 5) 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 5 in 77 ms on localhost (progress: 6/106) 14/07/21 19:49:13 INFO HttpBroadcast: Started reading broadcast variable 0 14/07/21 19:49:13 INFO HttpBroadcast: Started reading broadcast variable 0 14/07/21 19:49:13 ERROR Executor: Exception in task ID 6 java.io.FileNotFoundException: http://172.31.34.174:52070/broadcast_0 at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1624) at org.apache.spark.broadcast.HttpBroadcast$.read(HttpBroadcast.scala:196) at org.apache.spark.broadcast.HttpBroadcast.readObject(HttpBroadcast.scala:89) at sun.reflect.GeneratedMethodAccessor24.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at
Re: spark sql left join gives KryoException: Buffer overflow
When SPARK-2211 is done, will spark sql automatically choose join algorithms? Is there some way to manually hint the optimizer? Ideally we will select the best algorithm for you. We are also considering ways to allow the user to hint.
Re: Spark Streaming timing considerations
You will have to use some function that converts the dstreamTime (ms since epoch, same format as returned by System.currentTimeMillis), and your application-level time. TD On Mon, Jul 21, 2014 at 9:47 AM, Sean Owen so...@cloudera.com wrote: Uh, right. I mean: 1405944367 = Mon, 21 Jul 2014 12:06:07 GMT On Mon, Jul 21, 2014 at 5:47 PM, Sean Owen so...@cloudera.com wrote: That is just standard Unix time. 1405944367000 = Sun, 09 Aug 46522 05:56:40 GMT On Mon, Jul 21, 2014 at 5:43 PM, Laeeq Ahmed laeeqsp...@yahoo.com wrote: Hi TD, Thanks for the help. The only problem left here is that the dstreamTime contains some extra information which seems date i.e. 1405944367000 ms whereas my application timestamps are just in sec which I converted to ms. e.g. 2300, 2400, 2500 etc. So the filter doesn't take effect. I was thinking to add that extra info to my Time(4000). But I am not really sure what it is?
Re: Uber jar with SBT
Just to confirm, are you interested in submitting the spark job inside the cluster of the spark standalone mode (that is, one of the workers will be running the driver)? For that, spark-submit does support that fully yet. You can probably use the instructions present in Spark 0.9.1 to do that. Regarding spark-submit's behavior, that is the expected behavior. Spark-submit waits for the driver program to terminate. TD On Sat, Jul 19, 2014 at 7:34 AM, boci boci.b...@gmail.com wrote: Hi! I using java7, I found the problem. I not run start and await termination on streaming context, now it's work BUT spark-submit never return (it's run in the foreground and receive the kafka streams)... what I miss? (I want to send the job to standalone cluster worker process) b0c1 -- Skype: boci13, Hangout: boci.b...@gmail.com On Sat, Jul 19, 2014 at 3:32 PM, Sean Owen so...@cloudera.com wrote: Are you building / running with Java 6? I imagine your .jar files has more than 65536 files, and Java 6 has various issues with jars this large. If possible, use Java 7 everywhere. https://issues.apache.org/jira/browse/SPARK-1520 On Sat, Jul 19, 2014 at 2:30 PM, boci boci.b...@gmail.com wrote: Hi Guys, I try to create spark uber jar with sbt but I have a lot of problem... I want to use the following: - Spark streaming - Kafka - Elsaticsearch - HBase the current jar size is cca 60M and it's not working. - When I deploy with spark-submit: It's running and exit without any error - When I try to start with local[*] mode, it's say: Exception in thread main java.lang.NoClassDefFoundError: org/apache/spark/Logging = but I start with java -cp /.../spark-assembly-1.0.1-hadoop2.2.0.jar -jar my.jar Any idea how can solve this? (which lib required to set provided wich required for run... later I want to run this jar in yarn cluster) b0c1 -- Skype: boci13, Hangout: boci.b...@gmail.com
Re: Is there anyone who use streaming join to filter spam as guide mentioned?
Could you share your code snippet so that we can take a look? TD On Mon, Jul 21, 2014 at 7:23 AM, hawkwang wanghao.b...@gmail.com wrote: Hello guys, I'm just trying to use spark streaming features. I noticed that there is join example for filtering spam, so I just want to try. But, nothing happens after join, the output JavaPairDStream content is same as before. So, is there any examples that I can refer to? Thanks for any suggestions. Regards, Hawk
Re: broadcast variable get cleaned by ContextCleaner unexpectedly ?
Hi, TD, Thanks for the reply I tried to reproduce this in a simpler program, but no luck However, the program has been very simple, just load some files from HDFS and write them to HBase…. --- It seems that the issue only appears when I run the unit test in Jenkins (not fail every time, in usual, it will success in 1/10 times) I once suspected that it’s related to some concurrency issue, but even I disable the parallel test in built.sbt, the problem is still there --- Best, -- Nan Zhu On Monday, July 21, 2014 at 5:40 PM, Tathagata Das wrote: The ContextCleaner cleans up data and metadata related to RDDs and broadcast variables, only when those variables are not in scope and get garbage-collected by the JVM. So if the broadcast variable in question is probably somehow going out of scope even before the job using the broadcast variable is in progress. Could you reproduce this behavior reliably in a simple code snippet that you can share with us? TD On Mon, Jul 21, 2014 at 2:29 PM, Nan Zhu zhunanmcg...@gmail.com (mailto:zhunanmcg...@gmail.com) wrote: Hi, all When I run some Spark application (actually unit test of the application in Jenkins ), I found that I always hit the FileNotFoundException when reading broadcast variable The program itself works well, except the unit test Here is the example log: 14/07/21 19:49:13 INFO Executor: Running task ID 4 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 2) 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 2 in 95 ms on localhost (progress: 3/106) 14/07/21 19:49:13 INFO TableOutputFormat: Created table instance for hdfstest_customers 14/07/21 19:49:13 INFO Executor: Serialized size of result for 3 is 596 14/07/21 19:49:13 INFO Executor: Sending result for 3 directly to driver 14/07/21 19:49:13 INFO BlockManager: Found block broadcast_0 locally 14/07/21 19:49:13 INFO Executor: Finished task ID 3 14/07/21 19:49:13 INFO TaskSetManager: Starting task 0.0:5 as TID 5 on executor localhost: localhost (PROCESS_LOCAL) 14/07/21 19:49:13 INFO TaskSetManager: Serialized task 0.0:5 as 11885 bytes in 0 ms 14/07/21 19:49:13 INFO Executor: Running task ID 5 14/07/21 19:49:13 INFO BlockManager: Removing broadcast 0 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 3) 14/07/21 19:49:13 INFO ContextCleaner: Cleaned broadcast 0 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 3 in 97 ms on localhost (progress: 4/106) 14/07/21 19:49:13 INFO BlockManager: Found block broadcast_0 locally 14/07/21 19:49:13 INFO BlockManager: Removing block broadcast_0 14/07/21 19:49:13 INFO MemoryStore: Block broadcast_0 of size 202564 dropped from memory (free 886623436) 14/07/21 19:49:13 INFO ContextCleaner: Cleaned shuffle 0 14/07/21 19:49:13 INFO ShuffleBlockManager: Deleted all files for shuffle 0 14/07/21 19:49:13 INFO HadoopRDD: Input split: hdfs://172.31.34.184:9000/etltest/hdfsData/customer.csv:25+5 14/07/21 (http://172.31.34.184:9000/etltest/hdfsData/customer.csv:25+514/07/21) 19:49:13 INFO HadoopRDD: Input split: hdfs://172.31.34.184:9000/etltest/hdfsData/customer.csv:20+5 14/07/21 (http://172.31.34.184:9000/etltest/hdfsData/customer.csv:20+514/07/21) 19:49:13 INFO TableOutputFormat: Created table instance for hdfstest_customers 14/07/21 19:49:13 INFO Executor: Serialized size of result for 4 is 596 14/07/21 19:49:13 INFO Executor: Sending result for 4 directly to driver 14/07/21 19:49:13 INFO Executor: Finished task ID 4 14/07/21 19:49:13 INFO TaskSetManager: Starting task 0.0:6 as TID 6 on executor localhost: localhost (PROCESS_LOCAL) 14/07/21 19:49:13 INFO TaskSetManager: Serialized task 0.0:6 as 11885 bytes in 0 ms 14/07/21 19:49:13 INFO Executor: Running task ID 6 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 4) 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 4 in 80 ms on localhost (progress: 5/106) 14/07/21 19:49:13 INFO TableOutputFormat: Created table instance for hdfstest_customers 14/07/21 19:49:13 INFO Executor: Serialized size of result for 5 is 596 14/07/21 19:49:13 INFO Executor: Sending result for 5 directly to driver 14/07/21 19:49:13 INFO Executor: Finished task ID 5 14/07/21 19:49:13 INFO TaskSetManager: Starting task 0.0:7 as TID 7 on executor localhost: localhost (PROCESS_LOCAL) 14/07/21 19:49:13 INFO TaskSetManager: Serialized task 0.0:7 as 11885 bytes in 0 ms 14/07/21 19:49:13 INFO Executor: Running task ID 7 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 5) 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 5 in 77 ms on localhost (progress: 6/106) 14/07/21 19:49:13 INFO HttpBroadcast: Started reading broadcast variable 0 14/07/21 19:49:13 INFO HttpBroadcast: Started reading broadcast variable 0 14/07/21 19:49:13 ERROR Executor: Exception in task ID 6
Spark Partitioner vs Cassandra Partitioner
Hi, I am new to Spark, have used hadoop for some time and just entered the mailing list. I am considering using spark in my application, reading data from Cassandra in Python and writting mapped data back to Cassandra or to ES after it. The first question I have is: Is it possible to use https://github.com/datastax/spark-cassandra-connector with pyspark? I noticed there is an example of cassandra input format in the master branch, but I guess it will only work in the last release. The second question is about how Spark does M/R over NoSQL tools like Cassandra. If I understood it correctly, By using spark cassandra connector an RDD is provided and I can read data from Cassandra, and use Spark to M/R it. However, when I do that, I still need HDFS to store intermediate results. Correct me if I am wrong, but MAP results are stored in local filesystem, then a partitioner is used to shuffle data to Spark nodes and then data is reduced. I would like to understand why doing that using a tool like Cassandra, for example. Cassandra has partitioners itself, so I could just write the MAP output (using batch inserts) to an intermediate column family and, after map phase is complete, reduce the data. No need for shuffling, as Cassandra does that very well. Do you agree with my understanding? I wonder if I can do that using Spark, if this could be a good feature in future or if you have good reasons to think it would not perform well or something like that. Thanks in advance, I look forward for answers. Best regards, Marcelo Valle.
Re: broadcast variable get cleaned by ContextCleaner unexpectedly ?
Hi, TD, I think I got more insights to the problem in the Jenkins test file, I mistakenly pass a wrong value to spark.cores.max, which is much larger than the expected value (I passed master address as local[6], and spark.core.max as 200) If I set a more consistent value, everything goes well, But I do not think it will bring this problem even the spark.cores.max is too large? Best, -- Nan Zhu On Monday, July 21, 2014 at 6:11 PM, Nan Zhu wrote: Hi, TD, Thanks for the reply I tried to reproduce this in a simpler program, but no luck However, the program has been very simple, just load some files from HDFS and write them to HBase…. --- It seems that the issue only appears when I run the unit test in Jenkins (not fail every time, in usual, it will success in 1/10 times) I once suspected that it’s related to some concurrency issue, but even I disable the parallel test in built.sbt, the problem is still there --- Best, -- Nan Zhu On Monday, July 21, 2014 at 5:40 PM, Tathagata Das wrote: The ContextCleaner cleans up data and metadata related to RDDs and broadcast variables, only when those variables are not in scope and get garbage-collected by the JVM. So if the broadcast variable in question is probably somehow going out of scope even before the job using the broadcast variable is in progress. Could you reproduce this behavior reliably in a simple code snippet that you can share with us? TD On Mon, Jul 21, 2014 at 2:29 PM, Nan Zhu zhunanmcg...@gmail.com (mailto:zhunanmcg...@gmail.com) wrote: Hi, all When I run some Spark application (actually unit test of the application in Jenkins ), I found that I always hit the FileNotFoundException when reading broadcast variable The program itself works well, except the unit test Here is the example log: 14/07/21 19:49:13 INFO Executor: Running task ID 4 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 2) 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 2 in 95 ms on localhost (progress: 3/106) 14/07/21 19:49:13 INFO TableOutputFormat: Created table instance for hdfstest_customers 14/07/21 19:49:13 INFO Executor: Serialized size of result for 3 is 596 14/07/21 19:49:13 INFO Executor: Sending result for 3 directly to driver 14/07/21 19:49:13 INFO BlockManager: Found block broadcast_0 locally 14/07/21 19:49:13 INFO Executor: Finished task ID 3 14/07/21 19:49:13 INFO TaskSetManager: Starting task 0.0:5 as TID 5 on executor localhost: localhost (PROCESS_LOCAL) 14/07/21 19:49:13 INFO TaskSetManager: Serialized task 0.0:5 as 11885 bytes in 0 ms 14/07/21 19:49:13 INFO Executor: Running task ID 5 14/07/21 19:49:13 INFO BlockManager: Removing broadcast 0 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 3) 14/07/21 19:49:13 INFO ContextCleaner: Cleaned broadcast 0 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 3 in 97 ms on localhost (progress: 4/106) 14/07/21 19:49:13 INFO BlockManager: Found block broadcast_0 locally 14/07/21 19:49:13 INFO BlockManager: Removing block broadcast_0 14/07/21 19:49:13 INFO MemoryStore: Block broadcast_0 of size 202564 dropped from memory (free 886623436) 14/07/21 19:49:13 INFO ContextCleaner: Cleaned shuffle 0 14/07/21 19:49:13 INFO ShuffleBlockManager: Deleted all files for shuffle 0 14/07/21 19:49:13 INFO HadoopRDD: Input split: hdfs://172.31.34.184:9000/etltest/hdfsData/customer.csv:25+5 14/07/21 (http://172.31.34.184:9000/etltest/hdfsData/customer.csv:25+514/07/21) 19:49:13 INFO HadoopRDD: Input split: hdfs://172.31.34.184:9000/etltest/hdfsData/customer.csv:20+5 14/07/21 (http://172.31.34.184:9000/etltest/hdfsData/customer.csv:20+514/07/21) 19:49:13 INFO TableOutputFormat: Created table instance for hdfstest_customers 14/07/21 19:49:13 INFO Executor: Serialized size of result for 4 is 596 14/07/21 19:49:13 INFO Executor: Sending result for 4 directly to driver 14/07/21 19:49:13 INFO Executor: Finished task ID 4 14/07/21 19:49:13 INFO TaskSetManager: Starting task 0.0:6 as TID 6 on executor localhost: localhost (PROCESS_LOCAL) 14/07/21 19:49:13 INFO TaskSetManager: Serialized task 0.0:6 as 11885 bytes in 0 ms 14/07/21 19:49:13 INFO Executor: Running task ID 6 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 4) 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 4 in 80 ms on localhost (progress: 5/106) 14/07/21 19:49:13 INFO TableOutputFormat: Created table instance for hdfstest_customers 14/07/21 19:49:13 INFO Executor: Serialized size of result for 5 is 596 14/07/21 19:49:13 INFO Executor: Sending result for 5 directly to driver 14/07/21 19:49:13 INFO Executor: Finished task ID 5 14/07/21 19:49:13 INFO TaskSetManager: Starting task 0.0:7 as TID 7 on
Re: Launching with m3.2xlarge instances: /mnt and /mnt2 mounted on 7gb drive
Actually the script in the master branch is also broken (it's pointing to an older AMI). Try 1.0.1 for launching clusters. On Jul 20, 2014, at 2:25 PM, Chris DuBois chris.dub...@gmail.com wrote: I pulled the latest last night. I'm on commit 4da01e3. On Sun, Jul 20, 2014 at 2:08 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Is this with the 1.0.0 scripts? I believe it's fixed in 1.0.1. Matei On Jul 20, 2014, at 1:22 AM, Chris DuBois chris.dub...@gmail.com wrote: Using the spark-ec2 script with m3.2xlarge instances seems to not have /mnt and /mnt2 pointing to the 80gb SSDs that come with that instance. Does anybody know whether extra steps are required when using this instance type? Thanks, Chris
Re: DROP IF EXISTS still throws exception about table does not exist?
Ah, I see, thanks, Yin -- Nan Zhu On Monday, July 21, 2014 at 5:00 PM, Yin Huai wrote: Hi Nan, It is basically a log entry because your table does not exist. It is not a real exception. Thanks, Yin On Mon, Jul 21, 2014 at 7:10 AM, Nan Zhu zhunanmcg...@gmail.com (mailto:zhunanmcg...@gmail.com) wrote: a related JIRA: https://issues.apache.org/jira/browse/SPARK-2605 -- Nan Zhu On Monday, July 21, 2014 at 10:10 AM, Nan Zhu wrote: Hi, all When I try hiveContext.hql(drop table if exists abc) where abc is a non-exist table I still received an exception about non-exist table though if exists is there the same statement runs well in hive shell Some feedback from Hive community is here: https://issues.apache.org/jira/browse/HIVE-7458 “Your are doing hiveContext.hql(DROP TABLE IF EXISTS hivetesting) in Scala schell of the Spark project. What this shell is doing ? Query to remote metastore on non existing table (see on your provided stack). The remote metastore throws NoSuchObjectException(message:default.hivetesting table not found)because Spark code call HiveMetaStoreClient.getTable(HiveMetaStoreClient.java:854) on non-existing table. It's the right behavior. You should check on Spark code why a query is done on non existing table. I think Spark does not handle well the IF EXISTS part of this query. Maybe you could fill a ticket on Spark JIRA. BUT, it's not a bug in HIVE IMHO.” My question is the DDL is executed by Hive itself, doesn’t it? Best, -- Nan Zhu
unable to create rdd with pyspark newAPIHadoopRDD
Hello, I have only just started playing around with spark to see if it fits my needs. I was trying to read some data from elasticsearch as an rdd, so that I could perform some python based analytics on it. I am unable to create the rdd object as of now, failing with a serialization error. Working of spark repo commit tag in master: abeacffb7bcdfa3eeb1e969aa546029a7b464eaa. Steps I am doing as mentioned in patch: https://github.com/apache/spark/pull/455 IPYTHON=1 SPARK_CLASSPATH=/Users/umeshdangat/Downloads/elasticsearch-hadoop-2.0.0/dist/elasticsearch-hadoop-mr-2.0.0.jar ./bin/pyspark from pyspark import SparkContext sc = SparkContext('local[2]') conf = {'es.resource': 'twitter/tweet'} #index/type rdd = sc.newAPIHadoopRDD(org.elasticsearch.hadoop.mr.EsInputFormat, org.apache.hadoop.io.NullWritable, org.elasticsearch.hadoop.mr.LinkedMapWritable, conf=conf) Stack Trace: Py4JJavaError Traceback (most recent call last) /Users/umeshdangat/Documents/spark/ipython-input-4-ee964756398b in module() 1 rdd = sc.newAPIHadoopRDD(org.elasticsearch.hadoop.mr.EsInputFormat, org.apache.hadoop.io.NullWritable, org.elasticsearch.hadoop.mr.LinkedMapWritable, conf=conf) /Users/umeshdangat/Documents/spark/python/pyspark/context.pyc in newAPIHadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter, valueConverter, conf) 426 jconf = self._dictToJavaMap(conf) 427 jrdd = self._jvm.PythonRDD.newAPIHadoopRDD(self._jsc, inputFormatClass, keyClass, -- 428valueClass, keyConverter, valueConverter, jconf) 429 return RDD(jrdd, self, PickleSerializer()) 430 /Users/umeshdangat/Documents/spark/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py in __call__(self, *args) 535 answer = self.gateway_client.send_command(command) 536 return_value = get_return_value(answer, self.gateway_client, -- 537 self.target_id, self.name) 538 539 for temp_arg in temp_args: /Users/umeshdangat/Documents/spark/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 298 raise Py4JJavaError( 299 'An error occurred while calling {0}{1}{2}.\n'. -- 300 format(target_id, '.', name), value) 301 else: 302 raise Py4JError( Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 2.0 in stage 1.0 (TID 2) had a not serializable result: scala.collection.convert.Wrappers$MapWrapper at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1045) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1029) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1027) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1027) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:632) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:632) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:632) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1230) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/unable-to-create-rdd-with-pyspark-newAPIHadoopRDD-tp10358.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: broadcast variable get cleaned by ContextCleaner unexpectedly ?
That is definitely weird. spark.core.max should not affect thing when they are running local mode. And, I am trying to think of scenarios that could cause a broadcast variable used in the current job to fall out of scope, but they all seem very far fetched. So i am really curious to see the code where this could be happening. Either ways, you could turn off the behavior by using spark.cleaner.referenceTracking=false TD On Mon, Jul 21, 2014 at 3:52 PM, Nan Zhu zhunanmcg...@gmail.com wrote: Hi, TD, I think I got more insights to the problem in the Jenkins test file, I mistakenly pass a wrong value to spark.cores.max, which is much larger than the expected value (I passed master address as local[6], and spark.core.max as 200) If I set a more consistent value, everything goes well, But I do not think it will bring this problem even the spark.cores.max is too large? Best, -- Nan Zhu On Monday, July 21, 2014 at 6:11 PM, Nan Zhu wrote: Hi, TD, Thanks for the reply I tried to reproduce this in a simpler program, but no luck However, the program has been very simple, just load some files from HDFS and write them to HBase…. --- It seems that the issue only appears when I run the unit test in Jenkins (not fail every time, in usual, it will success in 1/10 times) I once suspected that it’s related to some concurrency issue, but even I disable the parallel test in built.sbt, the problem is still there --- Best, -- Nan Zhu On Monday, July 21, 2014 at 5:40 PM, Tathagata Das wrote: The ContextCleaner cleans up data and metadata related to RDDs and broadcast variables, only when those variables are not in scope and get garbage-collected by the JVM. So if the broadcast variable in question is probably somehow going out of scope even before the job using the broadcast variable is in progress. Could you reproduce this behavior reliably in a simple code snippet that you can share with us? TD On Mon, Jul 21, 2014 at 2:29 PM, Nan Zhu zhunanmcg...@gmail.com wrote: Hi, all When I run some Spark application (actually unit test of the application in Jenkins ), I found that I always hit the FileNotFoundException when reading broadcast variable The program itself works well, except the unit test Here is the example log: 14/07/21 19:49:13 INFO Executor: Running task ID 4 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 2) 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 2 in 95 ms on localhost (progress: 3/106) 14/07/21 19:49:13 INFO TableOutputFormat: Created table instance for hdfstest_customers 14/07/21 19:49:13 INFO Executor: Serialized size of result for 3 is 596 14/07/21 19:49:13 INFO Executor: Sending result for 3 directly to driver 14/07/21 19:49:13 INFO BlockManager: Found block broadcast_0 locally 14/07/21 19:49:13 INFO Executor: Finished task ID 3 14/07/21 19:49:13 INFO TaskSetManager: Starting task 0.0:5 as TID 5 on executor localhost: localhost (PROCESS_LOCAL) 14/07/21 19:49:13 INFO TaskSetManager: Serialized task 0.0:5 as 11885 bytes in 0 ms 14/07/21 19:49:13 INFO Executor: Running task ID 5 14/07/21 19:49:13 INFO BlockManager: Removing broadcast 0 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 3)*14/07/21 19:49:13 INFO ContextCleaner: Cleaned broadcast 0* 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 3 in 97 ms on localhost (progress: 4/106) 14/07/21 19:49:13 INFO BlockManager: Found block broadcast_0 locally 14/07/21 19:49:13 INFO BlockManager: Removing block broadcast_0*14/07/21 19:49:13 INFO MemoryStore: Block broadcast_0 of size 202564 dropped from memory (free 886623436)* 14/07/21 19:49:13 INFO ContextCleaner: Cleaned shuffle 0 14/07/21 19:49:13 INFO ShuffleBlockManager: Deleted all files for shuffle 0 14/07/21 19:49:13 INFO HadoopRDD: Input split: hdfs://172.31.34.184:9000/etltest/hdfsData/customer.csv:25+5 14/07/21 http://172.31.34.184:9000/etltest/hdfsData/customer.csv:25+514/07/21 19:49:13 INFO HadoopRDD: Input split: hdfs://172.31.34.184:9000/etltest/hdfsData/customer.csv:20+5 14/07/21 http://172.31.34.184:9000/etltest/hdfsData/customer.csv:20+514/07/21 19:49:13 INFO TableOutputFormat: Created table instance for hdfstest_customers 14/07/21 19:49:13 INFO Executor: Serialized size of result for 4 is 596 14/07/21 19:49:13 INFO Executor: Sending result for 4 directly to driver 14/07/21 19:49:13 INFO Executor: Finished task ID 4 14/07/21 19:49:13 INFO TaskSetManager: Starting task 0.0:6 as TID 6 on executor localhost: localhost (PROCESS_LOCAL) 14/07/21 19:49:13 INFO TaskSetManager: Serialized task 0.0:6 as 11885 bytes in 0 ms 14/07/21 19:49:13 INFO Executor: Running task ID 6 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 4) 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 4 in 80 ms on localhost (progress: 5/106) 14/07/21 19:49:13 INFO TableOutputFormat: Created table instance for hdfstest_customers 14/07/21 19:49:13
RE: error from DecisonTree Training:
So this is a bug unsolved (for java) yet? From: Jack Yang [mailto:j...@uow.edu.au] Sent: Friday, 18 July 2014 4:52 PM To: user@spark.apache.org Subject: error from DecisonTree Training: Hi All, I got an error while using DecisionTreeModel (my program is written in Java, spark 1.0.1, scala 2.10.1). I have read a local file, loaded it as RDD, and then sent to decisionTree for training. See below for details: JavaRDDLabeledPoint Points = lines.map(new ParsePoint()).cache(); LogisticRegressionModel model = LogisticRegressionWithSGD.train(Points.rdd(),iterations, stepSize); // until here it is working Strategy strategy = new Strategy( ); DecisionTree decisionTree = new DecisionTree(strategy); DecisionTreeModel decisionTreeModel = decisionTree.train(Points.rdd()); The error is : java.lang.ClassCastException: [Ljava.lang.Object; cannot be cast to [Lorg.apache.spark.mllib.regression.LabeledPoint; Any thoughts? Best regards, Jack
Re: Launching with m3.2xlarge instances: /mnt and /mnt2 mounted on 7gb drive
You can also try a different region. I tested us-west-2 yesterday, and it worked well. -Xiangrui On Sun, Jul 20, 2014 at 4:35 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Actually the script in the master branch is also broken (it's pointing to an older AMI). Try 1.0.1 for launching clusters. On Jul 20, 2014, at 2:25 PM, Chris DuBois chris.dub...@gmail.com wrote: I pulled the latest last night. I'm on commit 4da01e3. On Sun, Jul 20, 2014 at 2:08 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Is this with the 1.0.0 scripts? I believe it's fixed in 1.0.1. Matei On Jul 20, 2014, at 1:22 AM, Chris DuBois chris.dub...@gmail.com wrote: Using the spark-ec2 script with m3.2xlarge instances seems to not have /mnt and /mnt2 pointing to the 80gb SSDs that come with that instance. Does anybody know whether extra steps are required when using this instance type? Thanks, Chris
Re: error from DecisonTree Training:
This is a known issue: https://issues.apache.org/jira/browse/SPARK-2197 . Joseph is working on it. -Xiangrui On Mon, Jul 21, 2014 at 4:20 PM, Jack Yang j...@uow.edu.au wrote: So this is a bug unsolved (for java) yet? From: Jack Yang [mailto:j...@uow.edu.au] Sent: Friday, 18 July 2014 4:52 PM To: user@spark.apache.org Subject: error from DecisonTree Training: Hi All, I got an error while using DecisionTreeModel (my program is written in Java, spark 1.0.1, scala 2.10.1). I have read a local file, loaded it as RDD, and then sent to decisionTree for training. See below for details: JavaRDDLabeledPoint Points = lines.map(new ParsePoint()).cache(); LogisticRegressionModel model = LogisticRegressionWithSGD.train(Points.rdd(),iterations, stepSize); // until here it is working Strategy strategy = new Strategy( ….); DecisionTree decisionTree = new DecisionTree(strategy); DecisionTreeModel decisionTreeModel = decisionTree.train(Points.rdd()); The error is : java.lang.ClassCastException: [Ljava.lang.Object; cannot be cast to [Lorg.apache.spark.mllib.regression.LabeledPoint; Any thoughts? Best regards, Jack
Re: broadcast variable get cleaned by ContextCleaner unexpectedly ?
Ah, sorry, sorry, my brain just damaged….. sent some wrong information not “spark.cores.max” but the minPartitions in sc.textFile() Best, -- Nan Zhu On Monday, July 21, 2014 at 7:17 PM, Tathagata Das wrote: That is definitely weird. spark.core.max should not affect thing when they are running local mode. And, I am trying to think of scenarios that could cause a broadcast variable used in the current job to fall out of scope, but they all seem very far fetched. So i am really curious to see the code where this could be happening. Either ways, you could turn off the behavior by using spark.cleaner.referenceTracking=false TD On Mon, Jul 21, 2014 at 3:52 PM, Nan Zhu zhunanmcg...@gmail.com (mailto:zhunanmcg...@gmail.com) wrote: Hi, TD, I think I got more insights to the problem in the Jenkins test file, I mistakenly pass a wrong value to spark.cores.max, which is much larger than the expected value (I passed master address as local[6], and spark.core.max as 200) If I set a more consistent value, everything goes well, But I do not think it will bring this problem even the spark.cores.max is too large? Best, -- Nan Zhu On Monday, July 21, 2014 at 6:11 PM, Nan Zhu wrote: Hi, TD, Thanks for the reply I tried to reproduce this in a simpler program, but no luck However, the program has been very simple, just load some files from HDFS and write them to HBase…. --- It seems that the issue only appears when I run the unit test in Jenkins (not fail every time, in usual, it will success in 1/10 times) I once suspected that it’s related to some concurrency issue, but even I disable the parallel test in built.sbt, the problem is still there --- Best, -- Nan Zhu On Monday, July 21, 2014 at 5:40 PM, Tathagata Das wrote: The ContextCleaner cleans up data and metadata related to RDDs and broadcast variables, only when those variables are not in scope and get garbage-collected by the JVM. So if the broadcast variable in question is probably somehow going out of scope even before the job using the broadcast variable is in progress. Could you reproduce this behavior reliably in a simple code snippet that you can share with us? TD On Mon, Jul 21, 2014 at 2:29 PM, Nan Zhu zhunanmcg...@gmail.com (mailto:zhunanmcg...@gmail.com) wrote: Hi, all When I run some Spark application (actually unit test of the application in Jenkins ), I found that I always hit the FileNotFoundException when reading broadcast variable The program itself works well, except the unit test Here is the example log: 14/07/21 19:49:13 INFO Executor: Running task ID 4 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 2) 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 2 in 95 ms on localhost (progress: 3/106) 14/07/21 19:49:13 INFO TableOutputFormat: Created table instance for hdfstest_customers 14/07/21 19:49:13 INFO Executor: Serialized size of result for 3 is 596 14/07/21 19:49:13 INFO Executor: Sending result for 3 directly to driver 14/07/21 19:49:13 INFO BlockManager: Found block broadcast_0 locally 14/07/21 19:49:13 INFO Executor: Finished task ID 3 14/07/21 19:49:13 INFO TaskSetManager: Starting task 0.0:5 as TID 5 on executor localhost: localhost (PROCESS_LOCAL) 14/07/21 19:49:13 INFO TaskSetManager: Serialized task 0.0:5 as 11885 bytes in 0 ms 14/07/21 19:49:13 INFO Executor: Running task ID 5 14/07/21 19:49:13 INFO BlockManager: Removing broadcast 0 14/07/21 19:49:13 INFO DAGScheduler: Completed ResultTask(0, 3) 14/07/21 19:49:13 INFO ContextCleaner: Cleaned broadcast 0 14/07/21 19:49:13 INFO TaskSetManager: Finished TID 3 in 97 ms on localhost (progress: 4/106) 14/07/21 19:49:13 INFO BlockManager: Found block broadcast_0 locally 14/07/21 19:49:13 INFO BlockManager: Removing block broadcast_0 14/07/21 19:49:13 INFO MemoryStore: Block broadcast_0 of size 202564 dropped from memory (free 886623436) 14/07/21 19:49:13 INFO ContextCleaner: Cleaned shuffle 0 14/07/21 19:49:13 INFO ShuffleBlockManager: Deleted all files for shuffle 0 14/07/21 19:49:13 INFO HadoopRDD: Input split: hdfs://172.31.34.184:9000/etltest/hdfsData/customer.csv:25+5 14/07/21 (http://172.31.34.184:9000/etltest/hdfsData/customer.csv:25+514/07/21) 19:49:13 INFO HadoopRDD: Input split: hdfs://172.31.34.184:9000/etltest/hdfsData/customer.csv:20+5 14/07/21 (http://172.31.34.184:9000/etltest/hdfsData/customer.csv:20+514/07/21) 19:49:13 INFO TableOutputFormat: Created table instance for hdfstest_customers
Re: How to map each line to (line number, line)?
I'm not sure if you guys ever picked a preferred method for doing this, but I just encountered it and came up with this method that's working reasonably well on a small dataset. It should be quite easily generalizable to non-String RDDs. def addRowNumber(r: RDD[String]): RDD[Tuple2[Long,String]] = { val sc = r.sparkContext val partitionSizes = r.mapPartitionsWithIndex( (index, rows) = Iterator( (index, rows.size) ) ).collect val partitionGlobalStartIndex = partitionSizes.sortBy(_._1).map(_._2).scanLeft(0)(_+_) val startIndexes = sc.broadcast(partitionGlobalStartIndex) r.mapPartitionsWithIndex( (partitionIndex, rows) = { val partitionStartIndex = startIndexes.value(partitionIndex) rows.zipWithIndex map { case (row, rowIndex) = (partitionStartIndex + rowIndex, row) } }) } On Wed, Jan 1, 2014 at 4:05 AM, Guillaume Pitel guillaume.pi...@exensa.com wrote: I'm not very comfortable with the idea of generating a rdd from the range (it might take a lot of memory), dispatching it to the nodes, then zipping. You should try and compare the two approaches and give us the performance comparison. Guillaume Why not use a zipped RDD? http://spark.incubator.apache.org/docs/latest/api/core/index.html#org.apache.spark.rdd.ZippedRDD I do not know why no one else suggested this. Of course it has 3 extra loops (one for counting rdd, one for generating the range, one for zipping). Apart from this performance problem, any other caveats? I have used something like this in the past. val index = sc.parallelize(Range.Long(0, rdd.count, 1), rdd.partitions.size) val rddWithIndex = rdd.zip(index) If that doesn't work, then you could try zipPartitions as well, since it has slightly more relaxed constraints. -- [image: eXenSa] *Guillaume PITEL, Président* +33(0)6 25 48 86 80 / +33(0)9 70 44 67 53 eXenSa S.A.S. http://www.exensa.com/ 41, rue Périer - 92120 Montrouge - FRANCE Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05
RE: error from DecisonTree Training:
That is nice. Thanks Xiangrui. -Original Message- From: Xiangrui Meng [mailto:men...@gmail.com] Sent: Tuesday, 22 July 2014 9:31 AM To: user@spark.apache.org Subject: Re: error from DecisonTree Training: This is a known issue: https://issues.apache.org/jira/browse/SPARK-2197 . Joseph is working on it. -Xiangrui On Mon, Jul 21, 2014 at 4:20 PM, Jack Yang j...@uow.edu.au wrote: So this is a bug unsolved (for java) yet? From: Jack Yang [mailto:j...@uow.edu.au] Sent: Friday, 18 July 2014 4:52 PM To: user@spark.apache.org Subject: error from DecisonTree Training: Hi All, I got an error while using DecisionTreeModel (my program is written in Java, spark 1.0.1, scala 2.10.1). I have read a local file, loaded it as RDD, and then sent to decisionTree for training. See below for details: JavaRDDLabeledPoint Points = lines.map(new ParsePoint()).cache(); LogisticRegressionModel model = LogisticRegressionWithSGD.train(Points.rdd(),iterations, stepSize); // until here it is working Strategy strategy = new Strategy( ….); DecisionTree decisionTree = new DecisionTree(strategy); DecisionTreeModel decisionTreeModel = decisionTree.train(Points.rdd()); The error is : java.lang.ClassCastException: [Ljava.lang.Object; cannot be cast to [Lorg.apache.spark.mllib.regression.LabeledPoint; Any thoughts? Best regards, Jack
答复: LiveListenerBus throws exception and weird web UI bug
Hi all, Here is my fix https://github.com/apache/spark/pull/1356, although not handsome, but work well. Any Suggestions? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/LiveListenerBus-throws-exception-and-weird-web-UI-bug-tp8330p10324.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Joining by timestamp.
Hi I have peculiar problem, I have two data sets (large ones) . Data set1: ((timestamp),iterable[Any]) = { (2014-07-10T00:02:45.045+,ArrayBuffer((2014-07-10T00:02:45.045+,98.4859,22))) (2014-07-10T00:07:32.618+,ArrayBuffer((2014-07-10T00:07:32.618+,75.4737,22))) } DataSet2: ((timestamp),iterable[Any]) ={ (2014-07-10T00:03:16.952+,ArrayBuffer((2014-07-10T00:03:16.952+,99.6148,23))) (2014-07-10T00:08:11.329+,ArrayBuffer((2014-07-10T00:08:11.329+,80.9017,23))) } I need to join them , But the catch is , both time stamps are not same , they can be approximately 4mins +/-. those records needs to be joined Any idea is very much appreciated. I am thinking right now. file descriptor for sorted Dataset2. Read the sorted records of dataset1 . for each record , check for any record matching with the criteria , if match emit the record1,record2 if not matching continue reading record2 until it matches. I know this works for a very small files , That's the reason I need help. Thanks, D. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Joining-by-timestamp-tp10367.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Task not serializable: java.io.NotSerializableException: org.apache.spark.SparkContext
I have the same problem On Sat, Jul 19, 2014 at 12:31 AM, lihu lihu...@gmail.com wrote: Hi, Everyone. I have a piece of following code. When I run it, it occurred the error just like below, it seem that the SparkContext is not serializable, but i do not try to use the SparkContext except the broadcast. [In fact, this code is in the MLLib, I just try to broadcast the centerArrays ] it can success in the redeceBykey operation, but failed at the collect operation, this confused me. INFO DAGScheduler: Failed to run collect at KMeans.scala:235 [error] (run-main-0) org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: org.apache.spark.SparkContext org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: org.apache.spark.SparkContext at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026) private def initKMeansParallel(data: RDD[Array[Double]]): Array[ClusterCenters] = { @transient val sc = data.sparkContext // I try to add the transient annotation here, but it doesn't work // Initialize each run's center to a random point val seed = new XORShiftRandom().nextInt() val sample = data.takeSample(true, runs, seed).toSeq val centers = Array.tabulate(runs)(r = ArrayBuffer(sample(r))) // On each step, sample 2 * k points on average for each run with probability proportional // to their squared distance from that run's current centers for (step - 0 until initializationSteps) { val centerArrays = sc.broadcast(centers.map(_.toArray)) val sumCosts = data.flatMap { point = for (r - 0 until runs) yield (r, KMeans.pointCost(centerArrays.value(r), point)) }.reduceByKey(_ + _).collectAsMap() //can pass at this point val chosen = data.mapPartitionsWithIndex { (index, points) = val rand = new XORShiftRandom(seed ^ (step 16) ^ index) for { p - points r - 0 until runs if rand.nextDouble() KMeans.pointCost(centerArrays.value(r), p) * 2 * k / sumCosts(r) } yield (r, p) }.collect() // failed at this point. for ((r, p) - chosen) { centers(r) += p } }
saveAsSequenceFile for DStream
First of all, I do not know Scala, but learning. I'm doing a proof of concept by streaming content from a socket, counting the words and write it to a Tachyon disk. A different script will read the file stream and print out the results. val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER) val words = lines.flatMap(_.split( )) val wordCounts = words.map(x = (x, 1)).reduceByKey(_ + _) wordCounts.saveAs???Files(tachyon://localhost:19998/files/WordCounts) ssc.start() ssc.awaitTermination() I already did a proof of concept to write and read sequence files but there doesn't seem to be a saveAsSequenceFiles() method in DStream. What is the best way to write out an RDD to a stream so that the timestamps are in the filenames and so there is minimal overhead in reading the data back in as objects, see below. My simple successful proof was the following: val rdd = sc.parallelize(Array((a,2), (b,3), (c,1))) rdd.saveAsSequenceFile(tachyon://.../123.sf2) val rdd2 = sc.sequenceFile[String,Int](tachyon://.../123.sf2) How can I do something similar with streaming? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/saveAsSequenceFile-for-DStream-tp10369.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
RE: Joining by timestamp.
This is a very interesting problem. SparkSQL supports the Non Equi Join, but it is in very low efficiency with large tables. One possible solution is make both table partition based and the partition keys are (cast(ds as bigint) / 240), and with each partition in dataset1, you probably can write SQL like select * from dataset1 inner join dataset2 on abs(cast(dataset1.ds as bigint) -cast(dataset2.ds as bigint)) 240. We assume the dataset2 is always the 3 adjacent partitions and the partitions key are: key -1, key, and key + 1 (which means the dataset2 with maximum ds 240 seconds greater than dataset 1 and minimum timestamp 240 seconds less than the dataset1). After finish iterating every partition in dataset1, you should get the result. BTW, not sure if you really want the stream sql: https://github.com/thunderain-project/StreamSQL -Original Message- From: durga [mailto:durgak...@gmail.com] Sent: Tuesday, July 22, 2014 8:41 AM To: u...@spark.incubator.apache.org Subject: Joining by timestamp. Hi I have peculiar problem, I have two data sets (large ones) . Data set1: ((timestamp),iterable[Any]) = { (2014-07-10T00:02:45.045+,ArrayBuffer((2014-07-10T00:02:45.045+,98.4859,22))) (2014-07-10T00:07:32.618+,ArrayBuffer((2014-07-10T00:07:32.618+,75.4737,22))) } DataSet2: ((timestamp),iterable[Any]) ={ (2014-07-10T00:03:16.952+,ArrayBuffer((2014-07-10T00:03:16.952+,99.6148,23))) (2014-07-10T00:08:11.329+,ArrayBuffer((2014-07-10T00:08:11.329+,80.9017,23))) } I need to join them , But the catch is , both time stamps are not same , they can be approximately 4mins +/-. those records needs to be joined Any idea is very much appreciated. I am thinking right now. file descriptor for sorted Dataset2. Read the sorted records of dataset1 . for each record , check for any record matching with the criteria , if match emit the record1,record2 if not matching continue reading record2 until it matches. I know this works for a very small files , That's the reason I need help. Thanks, D. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Joining-by-timestamp-tp10367.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: spark streaming rate limiting from kafka
Bill, numPartitions means the number of Spark partitions that the data received from Kafka will be split to. It has nothing to do with Kafka partitions, as far as I know. If you create multiple Kafka consumers, it doesn't seem like you can specify which consumer will consume which Kafka partitions. Instead, Kafka (at least with the interface that is exposed by the Spark Streaming API) will do something called rebalance and assign Kafka partitions to consumers evenly, you can see this in the client logs. When using multiple Kafka consumers with auto.offset.reset = true, please expect to run into this one: https://issues.apache.org/jira/browse/SPARK-2383 Tobias On Tue, Jul 22, 2014 at 3:40 AM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi Tathagata, I am currentlycreating multiple DStream to consumefrom different topics. How can I let each consumer consume from different partitions. I find the following parameters from Spark API: createStream[K, V, U : Decoder[_], T : Decoder[_]](jssc: JavaStreamingContext https://spark.apache.org/docs/1.0.0/api/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.html , keyTypeClass: Class[K], valueTypeClass: Class[V],keyDecoderClass: Class[ U], valueDecoderClass: Class[T], kafkaParams: Map[String, String], topics: Map[String, Integer],storageLevel: StorageLevel https://spark.apache.org/docs/1.0.0/api/scala/org/apache/spark/storage/StorageLevel.html ): JavaPairReceiverInputDStream https://spark.apache.org/docs/1.0.0/api/scala/org/apache/spark/streaming/api/java/JavaPairReceiverInputDStream.html [K, V] Create an input stream that pulls messages form a Kafka Broker. The topics parameter is: *Map of (topic_name - numPartitions) to consume. Each partition is consumed in its own thread* Does numPartitions mean the total number of partitions to consume from topic_name or the index of the partition? How can we specify for each createStream which partition of the Kafka topic to consume? I think if so, I will get a lot of parallelism from the source of the data. Thanks! Bill On Thu, Jul 17, 2014 at 6:21 PM, Tathagata Das tathagata.das1...@gmail.com wrote: You can create multiple kafka stream to partition your topics across them, which will run multiple receivers or multiple executors. This is covered in the Spark streaming guide. http://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving And for the purpose of this thread, to answer the original question, we now have the ability https://issues.apache.org/jira/browse/SPARK-1854?jql=project%20%3D%20SPARK%20AND%20resolution%20%3D%20Unresolved%20AND%20component%20%3D%20Streaming%20ORDER%20BY%20priority%20DESC to limit the receiving rate. Its in the master branch, and will be available in Spark 1.1. It basically sets the limits at the receiver level (so applies to all sources) on what is the max records per second that can will be received by the receiver. TD On Thu, Jul 17, 2014 at 6:15 PM, Tobias Pfeiffer t...@preferred.jp wrote: Bill, are you saying, after repartition(400), you have 400 partitions on one host and the other hosts receive nothing of the data? Tobias On Fri, Jul 18, 2014 at 8:11 AM, Bill Jay bill.jaypeter...@gmail.com wrote: I also have an issue consuming from Kafka. When I consume from Kafka, there are always a single executor working on this job. Even I use repartition, it seems that there is still a single executor. Does anyone has an idea how to add parallelism to this job? On Thu, Jul 17, 2014 at 2:06 PM, Chen Song chen.song...@gmail.com wrote: Thanks Luis and Tobias. On Tue, Jul 1, 2014 at 11:39 PM, Tobias Pfeiffer t...@preferred.jp wrote: Hi, On Wed, Jul 2, 2014 at 1:57 AM, Chen Song chen.song...@gmail.com wrote: * Is there a way to control how far Kafka Dstream can read on topic-partition (via offset for example). By setting this to a small number, it will force DStream to read less data initially. Please see the post at http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201406.mbox/%3ccaph-c_m2ppurjx-n_tehh0bvqe_6la-rvgtrf1k-lwrmme+...@mail.gmail.com%3E Kafka's auto.offset.reset parameter may be what you are looking for. Tobias -- Chen Song
Understanding Spark
Hi, I'm just a new one in the world big data and I'm trying understand the use cases of several projects. Of course one of them is Spark. I wanna know that what is the proper way of examining my data that resides on my MySQL server? Think that I'm saving every page view of a user with the timestamp in a table called views (id, user_id, page, created_at). Let's assume there are millions of rows in this table. In the Spark examples, there are some text files which are analysed. So in the case of data that resides in MySQL, what should be my approach? By analysing the data, you can think of generating page recommendations for similar users. Thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Understanding-Spark-tp10373.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
defaultMinPartitions in textFile
Hi, I started to use spark on yarn recently and found a problem while tuning my program. When SparkContext is initialized as sc and ready to read text file from hdfs, the textFile(path, defaultMinPartitions) method is called. I traced down the second parameter in the spark source code and finally found this: conf.getInt(spark.default.parallelism, math.max(totalCoreCount.get(), 2)) in CoarseGrainedSchedulerBackend.scala I do not specify the property spark.default.parallelism anywhere so the getInt will return value from the larger one between totalCoreCount and 2. When I submit the application using spark-submit and specify the parameter: --num-executors 2 --executor-cores 6, I suppose the totalCoreCount will be 2*6 = 12, so defaultMinPartitions will be 12. But when I print the value of defaultMinPartitions in my program, I still get 2 in return, How does this happen, or where do I make a mistake?
RE: data locality
Sandy, I just tried the standalone cluster and didn't have chance to try Yarn yet. So if I understand correctly, there are *no* special handling of task assignment according to the HDFS block's location when Spark is running as a *standalone* cluster. Please correct me if I'm wrong. Thank you for your patience! From: Sandy Ryza [mailto:sandy.r...@cloudera.com] Sent: 2014年7月22日 9:47 To: user@spark.apache.org Subject: Re: data locality This currently only works for YARN. The standalone default is to place an executor on every node for every job. The total number of executors is specified by the user. -Sandy On Fri, Jul 18, 2014 at 2:00 AM, Haopu Wang hw...@qilinsoft.com wrote: Sandy, Do you mean the “preferred location” is working for standalone cluster also? Because I check the code of SparkContext and see comments as below: // This is used only by YARN for now, but should be relevant to other cluster types (Mesos, // etc) too. This is typically generated from InputFormatInfo.computePreferredLocations. It // contains a map from hostname to a list of input format splits on the host. private[spark] var preferredNodeLocationData: Map[String, Set[SplitInfo]] = Map() BTW, even with the preferred hosts, how does Spark decide how many total executors to use for this application? Thanks again! From: Sandy Ryza [mailto:sandy.r...@cloudera.com] Sent: Friday, July 18, 2014 3:44 PM To: user@spark.apache.org Subject: Re: data locality Hi Haopu, Spark will ask HDFS for file block locations and try to assign tasks based on these. There is a snag. Spark schedules its tasks inside of executor processes that stick around for the lifetime of a Spark application. Spark requests executors before it runs any jobs, i.e. before it has any information about where the input data for the jobs is located. If the executors occupy significantly fewer nodes than exist in the cluster, it can be difficult for Spark to achieve data locality. The workaround for this is an API that allows passing in a set of preferred locations when instantiating a Spark context. This API is currently broken in Spark 1.0, and will likely changed to be something a little simpler in a future release. val locData = InputFormatInfo.computePreferredLocations (Seq(new InputFormatInfo(conf, classOf[TextInputFormat], new Path(“myfile.txt”))) val sc = new SparkContext(conf, locData) -Sandy On Fri, Jul 18, 2014 at 12:35 AM, Haopu Wang hw...@qilinsoft.com wrote: I have a standalone spark cluster and a HDFS cluster which share some of nodes. When reading HDFS file, how does spark assign tasks to nodes? Will it ask HDFS the location for each file block in order to get a right worker node? How about a spark cluster on Yarn? Thank you very much!
Re: Question about initial message in graphx
On Mon, Jul 21, 2014 at 8:05 PM, Bin WU bw...@connect.ust.hk wrote: I am not sure how to specify different initial values to each node in the graph. Moreover, I am wondering why initial message is necessary. I think we can instead initialize the graph and then pass it into Pregel interface? Indeed it's not necessary, and a future update to Pregel will probably remove it: https://github.com/apache/spark/pull/1217 But the initial message parameter doesn't prevent you from specifying different initial *values* for each vertex. Pregel respects the initial vertex values of the provided graph; the initial message just ensures that it will run vprog at least once per vertex. If you don't want an initial message, one option is to set aside a special message value for it and check for this in vprog. For example, if the message type is Int, you could change it to Option[Int] and use None as the initial message. Ankur http://www.ankurdave.com/
new error for me
Does anyone know what this error means: 14/07/21 23:07:22 INFO TaskSchedulerImpl: Adding task set 3.0 with 1 tasks 14/07/21 23:07:22 INFO TaskSetManager: Starting task 3.0:0 as TID 1620 on executor 27: r104u05.oculus.local (PROCESS_LOCAL) 14/07/21 23:07:22 INFO TaskSetManager: Serialized task 3.0:0 as 8620 bytes in 1 ms 14/07/21 23:07:36 INFO BlockManagerInfo: Added taskresult_1620 in memory on r104u05.oculus.local:50795 (size: 64.9 MB, free: 18.3 GB) 14/07/21 23:07:36 INFO SendingConnection: Initiating connection to [r104u05. oculus.local/192.168.0.105:50795] 14/07/21 23:07:57 INFO ConnectionManager: key already cancelled ? sun.nio.ch.SelectionKeyImpl@1d86a150 java.nio.channels.CancelledKeyException at sun.nio.ch.SelectionKeyImpl.ensureValid(SelectionKeyImpl.java:73) at sun.nio.ch.SelectionKeyImpl.interestOps(SelectionKeyImpl.java:77) at org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:265) at org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:115) 14/07/21 23:07:57 WARN SendingConnection: Error finishing connection to r104u05.oculus.local/192.168.0.105:50795 java.net.ConnectException: Connection timed out at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:735) at org.apache.spark.network.SendingConnection.finishConnect(Connection.scala:318) at org.apache.spark.network.ConnectionManager$$anon$7.run(ConnectionManager.scala:202) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:724) 14/07/21 23:07:57 INFO ConnectionManager: Handling connection error on connection to ConnectionManagerId(r104u05.oculus.local,50795) 14/07/21 23:07:57 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(r104u05.oculus.local,50795) 14/07/21 23:07:57 INFO ConnectionManager: Notifying org.apache.spark.network.ConnectionManager$MessageStatus@13ad274d 14/07/21 23:07:57 INFO ConnectionManager: Handling connection error on connection to ConnectionManagerId(r104u05.oculus.local,50795) 14/07/21 23:07:57 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(r104u05.oculus.local,50795) 14/07/21 23:07:57 INFO ConnectionManager: Removing SendingConnection to ConnectionManagerId(r104u05.oculus.local,50795) 14/07/21 23:07:57 WARN TaskSetManager: Lost TID 1620 (task 3.0:0) 14/07/21 23:07:57 WARN TaskSetManager: Lost result for TID 1620 on host r104u05.oculus.local I've never seen this one before, and now it's coming up consistently. Thanks, -Nathan
Re: defaultMinPartitions in textFile
well, I think you miss this line of code in SparkContext.scala line 1242-1243(master): /** Default min number of partitions for Hadoop RDDs when not given by user */ def defaultMinPartitions: Int = math.min(defaultParallelism, 2) so the defaultMinPartitions will be 2 unless the defaultParallelism is less than 2... -- Ye Xianjin Sent with Sparrow (http://www.sparrowmailapp.com/?sig) On Tuesday, July 22, 2014 at 10:18 AM, Wang, Jensen wrote: Hi, I started to use spark on yarn recently and found a problem while tuning my program. When SparkContext is initialized as sc and ready to read text file from hdfs, the textFile(path, defaultMinPartitions) method is called. I traced down the second parameter in the spark source code and finally found this: conf.getInt(spark.default.parallelism, math.max(totalCoreCount.get(), 2)) in CoarseGrainedSchedulerBackend.scala I do not specify the property “spark.default.parallelism” anywhere so the getInt will return value from the larger one between totalCoreCount and 2. When I submit the application using spark-submit and specify the parameter: --num-executors 2 --executor-cores 6, I suppose the totalCoreCount will be 2*6 = 12, so defaultMinPartitions will be 12. But when I print the value of defaultMinPartitions in my program, I still get 2 in return, How does this happen, or where do I make a mistake?
RE: defaultMinPartitions in textFile
Yes, Great. I thought it was math.max instead of math.min on that line. Thank you! From: Ye Xianjin [mailto:advance...@gmail.com] Sent: Tuesday, July 22, 2014 11:37 AM To: user@spark.apache.org Subject: Re: defaultMinPartitions in textFile well, I think you miss this line of code in SparkContext.scala line 1242-1243(master): /** Default min number of partitions for Hadoop RDDs when not given by user */ def defaultMinPartitions: Int = math.min(defaultParallelism, 2) so the defaultMinPartitions will be 2 unless the defaultParallelism is less than 2... -- Ye Xianjin Sent with Sparrowhttp://www.sparrowmailapp.com/?sig On Tuesday, July 22, 2014 at 10:18 AM, Wang, Jensen wrote: Hi, I started to use spark on yarn recently and found a problem while tuning my program. When SparkContext is initialized as sc and ready to read text file from hdfs, the textFile(path, defaultMinPartitions) method is called. I traced down the second parameter in the spark source code and finally found this: conf.getInt(spark.default.parallelism, math.max(totalCoreCount.get(), 2)) in CoarseGrainedSchedulerBackend.scala I do not specify the property “spark.default.parallelism” anywhere so the getInt will return value from the larger one between totalCoreCount and 2. When I submit the application using spark-submit and specify the parameter: --num-executors 2 --executor-cores 6, I suppose the totalCoreCount will be 2*6 = 12, so defaultMinPartitions will be 12. But when I print the value of defaultMinPartitions in my program, I still get 2 in return, How does this happen, or where do I make a mistake?
RE: Joining by timestamp.
Hi Chen, I am new to the Spark as well as SparkSQL , could you please explain how would I create a table and run query on top of it.That would be super helpful. Thanks, D. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Joining-by-timestamp-tp10367p10381.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
RE: Joining by timestamp.
Actually it's just a pseudo algorithm I described, you can do it with spark API. Hope the algorithm helpful. -Original Message- From: durga [mailto:durgak...@gmail.com] Sent: Tuesday, July 22, 2014 11:56 AM To: u...@spark.incubator.apache.org Subject: RE: Joining by timestamp. Hi Chen, I am new to the Spark as well as SparkSQL , could you please explain how would I create a table and run query on top of it.That would be super helpful. Thanks, D. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Joining-by-timestamp-tp10367p10381.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
RE: Joining by timestamp.
Hi Chen, Thank you very much for your reply. I think I do not understand how can I do the join using spark api. If you have time , could you please write some code . Thanks again, D. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Joining-by-timestamp-tp10367p10383.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
RE: Joining by timestamp.
Durga, you can start from the documents http://spark.apache.org/docs/latest/quick-start.html http://spark.apache.org/docs/latest/programming-guide.html -Original Message- From: durga [mailto:durgak...@gmail.com] Sent: Tuesday, July 22, 2014 12:45 PM To: u...@spark.incubator.apache.org Subject: RE: Joining by timestamp. Hi Chen, Thank you very much for your reply. I think I do not understand how can I do the join using spark api. If you have time , could you please write some code . Thanks again, D. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Joining-by-timestamp-tp10367p10383.html Sent from the Apache Spark User List mailing list archive at Nabble.com.