Re: spark 1.3.1 : unable to access s3n:// urls (no file system for scheme s3n:)
Thanks all... btw, s3n load works without any issues with spark-1.3.1-bulit-for-hadoop 2.4 I tried this on 1.3.1-hadoop26 sc.hadoopConfiguration.set(fs.s3n.impl, org.apache.hadoop.fs.s3native.NativeS3FileSystem) val f = sc.textFile(s3n://bucket/file) f.count No it can't find the implementation path. Looks like some jar is missing ? java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3native.NativeS3FileSystem not found at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2074) at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2578) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91) On Wednesday, April 22, 2015, Shuai Zheng szheng.c...@gmail.com wrote: Below is my code to access s3n without problem (only for 1.3.1. there is a bug in 1.3.0). Configuration hadoopConf = ctx.hadoopConfiguration(); hadoopConf.set(fs.s3n.impl, org.apache.hadoop.fs.s3native.NativeS3FileSystem); hadoopConf.set(fs.s3n.awsAccessKeyId, awsAccessKeyId); hadoopConf.set(fs.s3n.awsSecretAccessKey, awsSecretAccessKey); Regards, Shuai *From:* Sujee Maniyam [mailto:su...@sujee.net javascript:_e(%7B%7D,'cvml','su...@sujee.net');] *Sent:* Wednesday, April 22, 2015 12:45 PM *To:* Spark User List *Subject:* spark 1.3.1 : unable to access s3n:// urls (no file system for scheme s3n:) Hi all I am unable to access s3n:// urls using sc.textFile().. getting 'no file system for scheme s3n://' error. a bug or some conf settings missing? See below for details: env variables : AWS_SECRET_ACCESS_KEY=set AWS_ACCESS_KEY_ID=set spark/RELAESE : Spark 1.3.1 (git revision 908a0bf) built for Hadoop 2.6.0 Build flags: -Phadoop-2.4 -Dhadoop.version=2.6.0 -Phive -Phive-thriftserver -Pyarn -DzincPort=3034 ./bin/spark-shell val f = sc.textFile(s3n://bucket/file) f.count error== java.io.IOException: No FileSystem for scheme: s3n at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2584) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91) at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370) at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296) at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:256) at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228) at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:203) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1512) at org.apache.spark.rdd.RDD.count(RDD.scala:1006) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:24) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:29) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:31) at $iwC$$iwC$$iwC$$iwC$$iwC.init(console:33) at $iwC$$iwC$$iwC$$iwC.init(console:35) at $iwC$$iwC$$iwC.init(console:37) at $iwC$$iwC.init(console:39) at $iwC.init(console:41) at init(console:43) at .init(console:47) at .clinit(console) at .init(console:7) at .clinit(console) at $print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at
Re: Instantiating/starting Spark jobs programmatically
Hi firemonk9, What you're doing looks interesting. Can you share some more details? Are you running the same spark context for each job, or are you running a seperate spark context for each job? Does your system need sharing of rdd's across multiple jobs? If yes, how do you implement that? Also what prompted you to run Yarn instead of standalone? Does this give some performance benefit? Have you evaluated yarn vs mesos? Also have you looked at spark jobserver by ooyala? It makes doing some if the stuff I mentioned easier. IIRC it also works with yarn. Definitely works with Mesos. Heres the link https://github.com/spark-jobserver/spark-jobserver Thanks Anshul On 23 Apr 2015 20:32, Dean Wampler deanwamp...@gmail.com wrote: I strongly recommend spawning a new process for the Spark jobs. Much cleaner separation. Your driver program won't be clobbered if the Spark job dies, etc. It can even watch for failures and restart. In the Scala standard library, the sys.process package has classes for constructing and interoperating with external processes. Perhaps Java has something similar these days? dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Tue, Apr 21, 2015 at 2:15 PM, Steve Loughran ste...@hortonworks.com wrote: On 21 Apr 2015, at 17:34, Richard Marscher rmarsc...@localytics.com wrote: - There are System.exit calls built into Spark as of now that could kill your running JVM. We have shadowed some of the most offensive bits within our own application to work around this. You'd likely want to do that or to do your own Spark fork. For example, if the SparkContext can't connect to your cluster master node when it is created, it will System.exit. people can block errant System.exit calls by running under a SecurityManager. Less than ideal (and there's a small performance hit) -but possible
RE: Map Question
You need to expose that variable the same way you'd expose any other variable in Python that you wanted to see across modules. As long as you share a spark context all will work as expected. http://stackoverflow.com/questions/142545/python-how-to-make-a-cross-module-variable Sent with Good (www.good.com) -Original Message- From: Vadim Bichutskiy [vadim.bichuts...@gmail.commailto:vadim.bichuts...@gmail.com] Sent: Thursday, April 23, 2015 12:00 PM Eastern Standard Time To: Tathagata Das Cc: user@spark.apache.org Subject: Re: Map Question Here it is. How do I access a broadcastVar in a function that's in another module (process_stuff.py below): Thanks, Vadim main.py --- from pyspark import SparkContext, SparkConf from pyspark.streaming import StreamingContext from pyspark.sql import SQLContext from process_stuff import myfunc from metadata import get_metadata conf = SparkConf().setAppName('My App').setMaster('local[4]') sc = SparkContext(conf=conf) ssc = StreamingContext(sc, 30) sqlContext = SQLContext(sc) distFile = ssc.textFileStream(s3n://...) distFile.foreachRDD(process) mylist = get_metadata() print 'BROADCASTING...' broadcastVar = sc.broadcast(mylist) print broadcastVar print broadcastVar.value print 'FINISHED BROADCASTING...' ## mylist and broadcastVar, broadcastVar.value print fine def getSqlContextInstance(sparkContext): if ('sqlContextSingletonInstance' not in globals()): globals()['sqlContextSingletonInstance'] = SQLContext(sparkContext) return globals()['sqlContextSingletonInstance'] def process(rdd): sqlContext = getSqlContextInstance(rdd.context) if rdd.take(1): jsondf = sqlContext.jsonRDD(rdd) #jsondf.printSchema() jsondf.registerTempTable('mytable') stuff = sqlContext.sql(SELECT ...) stuff_mapped = stuff.map(myfunc) ## I want myfunc to see mylist from above? ... process_stuff.py -- def myfunc(x): metadata = broadcastVar.value # NameError: broadcastVar not found -- HOW TO FIX? ... metadata.py def get_metadata(): ... return mylist [https://mailfoogae.appspot.com/t?sender=admFkaW0uYmljaHV0c2tpeUBnbWFpbC5jb20%3Dtype=zerocontentguid=d750a2b5-528a-47e7-8d0c-df37c6ff3370]ᐧ On Wed, Apr 22, 2015 at 6:47 PM, Tathagata Das t...@databricks.commailto:t...@databricks.com wrote: Can you give full code? especially the myfunc? On Wed, Apr 22, 2015 at 2:20 PM, Vadim Bichutskiy vadim.bichuts...@gmail.commailto:vadim.bichuts...@gmail.com wrote: Here's what I did: print 'BROADCASTING...' broadcastVar = sc.broadcast(mylist) print broadcastVar print broadcastVar.value print 'FINISHED BROADCASTING...' The above works fine, but when I call myrdd.map(myfunc) I get NameError: global name 'broadcastVar' is not defined The myfunc function is in a different module. How do I make it aware of broadcastVar? [https://mailfoogae.appspot.com/t?sender=admFkaW0uYmljaHV0c2tpeUBnbWFpbC5jb20%3Dtype=zerocontentguid=cccea2c4-02b9-45f0-9e40-d25891e0ded5]ᐧ On Wed, Apr 22, 2015 at 2:13 PM, Vadim Bichutskiy vadim.bichuts...@gmail.commailto:vadim.bichuts...@gmail.com wrote: Great. Will try to modify the code. Always room to optimize! [https://mailfoogae.appspot.com/t?sender=admFkaW0uYmljaHV0c2tpeUBnbWFpbC5jb20%3Dtype=zerocontentguid=82843831-9ce6-4e1b-9fe8-72b9b7180fc4]ᐧ On Wed, Apr 22, 2015 at 2:11 PM, Tathagata Das t...@databricks.commailto:t...@databricks.com wrote: Absolutely. The same code would work for local as well as distributed mode! On Wed, Apr 22, 2015 at 11:08 AM, Vadim Bichutskiy vadim.bichuts...@gmail.commailto:vadim.bichuts...@gmail.com wrote: Can I use broadcast vars in local mode? [https://mailfoogae.appspot.com/t?sender=admFkaW0uYmljaHV0c2tpeUBnbWFpbC5jb20%3Dtype=zerocontentguid=641ba5c3-4ac7-4614-84a9-45aafd24502f]ᐧ On Wed, Apr 22, 2015 at 2:06 PM, Tathagata Das t...@databricks.commailto:t...@databricks.com wrote: Yep. Not efficient. Pretty bad actually. That's why broadcast variable were introduced right at the very beginning of Spark. On Wed, Apr 22, 2015 at 10:58 AM, Vadim Bichutskiy vadim.bichuts...@gmail.commailto:vadim.bichuts...@gmail.com wrote: Thanks TD. I was looking into broadcast variables. Right now I am running it locally...and I plan to move it to production on EC2. The way I fixed it is by doing myrdd.map(lambda x: (x, mylist)).map(myfunc) but I don't think it's efficient? mylist is filled only once at the start and never changes. Vadim [https://mailfoogae.appspot.com/t?sender=admFkaW0uYmljaHV0c2tpeUBnbWFpbC5jb20%3Dtype=zerocontentguid=5aa8db9d-d2c8-49b1-821f-621a3d2aaf87]ᐧ On Wed, Apr 22, 2015 at 1:42 PM, Tathagata Das t...@databricks.commailto:t...@databricks.com wrote: Is the mylist present on every executor? If not, then you have to pass it on. And broadcasts are the best way to pass them on. But note that once broadcasted it will immutable at the executors, and if you update the list at the driver, you will have to broadcast it again. TD
Spark + Hue
Hi all Is there any good documentation on how to integrate spark with Hue 3.7.x? Is the only way to install spark Job Server? Thanks in advance for your help
Re: Trouble working with Spark-CSV package (error: object databricks is not a member of package com)
Hm, no I don't have that in my path. However, someone on the spark-csv project advised that since I could not get another package/example to work, that this might be a Spark / Yarn issue: https://github.com/databricks/spark-csv/issues/54 Thoughts? I'll open a ticket later this afternoon if the discussion turns that way. Thank you, by the way, for the work on this project. Mo On Thu, Apr 23, 2015 at 5:17 AM, Krishna Sankar ksanka...@gmail.com wrote: Do you have commons-csv-1.1-bin.jar in your path somewhere ? I had to download and add this. Cheers k/ On Wed, Apr 22, 2015 at 11:01 AM, Mohammed Omer beancinemat...@gmail.com wrote: Afternoon all, I'm working with Scala 2.11.6, and Spark 1.3.1 built from source via: `mvn -Pyarn -Phadoop-2.4 -Dscala-2.11 -DskipTests clean package` The error is encountered when running spark shell via: `spark-shell --packages com.databricks:spark-csv_2.11:1.0.3` The full trace of the commands can be found at https://gist.github.com/momer/9d1ca583f9978ec9739d Not sure if I've done something wrong, or if the documentation is outdated, or...? Would appreciate any input or push in the right direction! Thank you, Mo
Re: A Spark Group by is running forever
I have seen multiple blogs stating to use reduceByKey instead of groupByKey. Could someone please help me in converting below code to use reduceByKey Code some spark processing ... Below val viEventsWithListingsJoinSpsLevelMetric: org.apache.spark.rdd.RDD[(com.ebay.ep.poc.spark.reporting.process.detail.model.DetailInputRecord, com.ebay.ep.poc.spark.reporting.process.detail.viewitem.provider.VISummary, Long)] val sellerSegments = viEventsWithListingsJoinSpsLevelMetric.groupBy { case (viDetail, vi, itemId) = (viDetail.get(0), viDetail.get(1).asInstanceOf[Long], viDetail.get(2), viDetail.get(8).asInstanceOf[Int]) } We grouby above key so that we get an iterable (list), with list we can compute .max values for powersellers and sellerstdlevel. val powerSellerLevel = sellerSegments.map { case (k, v) = val viGrouped = v.toList val viPowerSellers = viGrouped.map { viTuple = Option(viTuple._2.powerSellerLevel).getOrElse() } val viSellerStandardLevels = viGrouped.map { viTuple = Option(viTuple._2.sellerStdLevel).getOrElse() } val powerSellerLevel = viPowerSellers.max val sellerStandardLevel = viSellerStandardLevels.max val viEventDetail = viGrouped.head._1 val viSummary = viGrouped.head._2 viSummary.powerSellerLevel = powerSellerLevel viSummary.sellerStdLevel = sellerStandardLevel viSummary.itemId = viGrouped.head._3 (viEventDetail, viSummary) } The above groupBy query ran for 6H and does not seem to finish. Hence i started thinking of reduceByKey. Now reduceByKey() needs pairs and hence i modified viEventsWithListingsJoinSpsLevelMetric ( x,y,z) to viEventsWithListingsJoinSpsLevelMetric (A,B). I moved the key generated through groupByquery into the processing of viEventsWithListingsJoinSpsLevelMetric, so that viEventsWithListingsJoinSpsLevelMetric is of type A,B. Hence it is modified as (((viEventDetail.get(0), viEventDetail.get(1).asInstanceOf[Long], viEventDetail.get(2), viEventDetail.get(8).asInstanceOf[Int])),(viEventDetail, viSummary, itemId)). Now i want to compute max values, and i do the next processing using reduceByKey val powerSellerLevel = viEventsWithListingsJoinSpsLevelMetric.reduceByKey { case (k, v) = val viGrouped = v.toList // Some code to compute max needs to go here. } But i get a compiler error that v.toList is not supported. [ERROR] /Users/dvasthimal/ebay/projects/ep-spark/ep-spark/src/main/scala/com/ebay/ep/poc/spark/reporting/process/detail/viewitem/provider/VISummaryDataProvider.scala:115: error: value toList is not a member of (com.ebay.ep.poc.spark.reporting.process.detail.model.DetailInputRecord, com.ebay.ep.poc.spark.reporting.process.detail.viewitem.provider.VISummary, Long) [INFO] val viGrouped = v.toList [INFO] ^ [ERROR] one error found Now if you think, groupBy was generating (k, Iterable) and hence the next map() could get list and run through that list to compute max. How is that possible with reduceByKey because it never generates max. Suggestions are appreciated. -Deepak On Thu, Apr 23, 2015 at 1:23 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I have a groupBy query after a map-side join leftOuterJoin. And this query is running for more than 2 hours. asks IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch Time DurationGC TimeShuffle Read Size / RecordsWrite TimeShuffle Write Size / RecordsErrors 0 36 0 RUNNING PROCESS_LOCAL 17 / phxaishdc9dn1560.stratus.phx.ebay.com 2015/04/22 23:27:00 1.4 h 29 s 61.8 MB / 63144909 0.0 B / 0 The input looks to be only 60 MB. *Command* ./bin/spark-submit -v --master yarn-cluster --driver-class-path /apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/yarn/lib/guava-11.0.2.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar --jars /apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar,/home/dvasthimal/spark1.3/spark_reporting_dep_only-1.0-SNAPSHOT.jar *--num-executors 36 --driver-memory 12g --driver-java-options -XX:MaxPermSize=8G --executor-memory 12g* *--executor-cores 6* --queue hdmi-express --class com.ebay.ep.poc.spark.reporting.SparkApp /home/dvasthimal/spark1.3/spark_reporting-1.0-SNAPSHOT.jar startDate=2015-04-6 endDate=2015-04-7 input=/user/dvasthimal/epdatasets_small/exptsession subcommand=viewItem output=/user/dvasthimal/epdatasets/viewItem buffersize=128 maxbuffersize=1068 maxResultSize=2G Queries 1. val viEvents = details.map { vi = (vi.get(14).asInstanceOf[Long], vi) } 2. Brodcast Map - Join val lstgItemMap = listings.map { lstg = (lstg.getItemId().toLong, lstg) } .collectAsMapval broadCastMap = sc.broadcast(lstgItemMap) val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary, Long))] =
dynamicAllocation spark-shell
If I enable dynamicAllocation and then use spark-shell or pyspark, things start out working as expected: running simple commands causes new executors to start and complete tasks. If the shell is left idle for a while, executors start getting killed off: 15/04/23 10:52:43 INFO cluster.YarnClientSchedulerBackend: Requesting to kill executor(s) 368 15/04/23 10:52:43 INFO spark.ExecutorAllocationManager: Removing executor 368 because it has been idle for 600 seconds (new desired total will be 665) That makes sense. But the action also results in error messages: 15/04/23 10:52:47 ERROR cluster.YarnScheduler: Lost executor 368 on hostname: remote Akka client disassociated 15/04/23 10:52:47 INFO scheduler.DAGScheduler: Executor lost: 368 (epoch 0) 15/04/23 10:52:47 INFO spark.ExecutorAllocationManager: Existing executor 368 has been removed (new total is 665) 15/04/23 10:52:47 INFO storage.BlockManagerMasterActor: Trying to remove executor 368 from BlockManagerMaster. 15/04/23 10:52:47 INFO storage.BlockManagerMasterActor: Removing block manager BlockManagerId(368, hostname, 35877) 15/04/23 10:52:47 INFO storage.BlockManagerMaster: Removed 368 successfully in removeExecutor After that, trying to run a simple command results in: 15/04/23 10:13:30 ERROR util.Utils: Uncaught exception in thread spark-dynamic-executor-allocation-0 java.lang.IllegalArgumentException: Attempted to request a negative number of executor(s) -663 from the cluster manager. Please specify a positive number! And then only the single remaining executor attempts to complete the new tasks. Am I missing some kind of simple configuration item, are other people seeing the same behavior as a bug, or is this actually expected? Mike Stone - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark 1.3.1 : unable to access s3n:// urls (no file system for scheme s3n:)
NativeS3FileSystem class is in hadoop-aws jar. Looks like it was not on classpath. Cheers On Thu, Apr 23, 2015 at 7:30 AM, Sujee Maniyam su...@sujee.net wrote: Thanks all... btw, s3n load works without any issues with spark-1.3.1-bulit-for-hadoop 2.4 I tried this on 1.3.1-hadoop26 sc.hadoopConfiguration.set(fs.s3n.impl, org.apache.hadoop.fs.s3native.NativeS3FileSystem) val f = sc.textFile(s3n://bucket/file) f.count No it can't find the implementation path. Looks like some jar is missing ? java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3native.NativeS3FileSystem not found at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2074) at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2578) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91) On Wednesday, April 22, 2015, Shuai Zheng szheng.c...@gmail.com wrote: Below is my code to access s3n without problem (only for 1.3.1. there is a bug in 1.3.0). Configuration hadoopConf = ctx.hadoopConfiguration(); hadoopConf.set(fs.s3n.impl, org.apache.hadoop.fs.s3native.NativeS3FileSystem); hadoopConf.set(fs.s3n.awsAccessKeyId, awsAccessKeyId); hadoopConf.set(fs.s3n.awsSecretAccessKey, awsSecretAccessKey); Regards, Shuai *From:* Sujee Maniyam [mailto:su...@sujee.net] *Sent:* Wednesday, April 22, 2015 12:45 PM *To:* Spark User List *Subject:* spark 1.3.1 : unable to access s3n:// urls (no file system for scheme s3n:) Hi all I am unable to access s3n:// urls using sc.textFile().. getting 'no file system for scheme s3n://' error. a bug or some conf settings missing? See below for details: env variables : AWS_SECRET_ACCESS_KEY=set AWS_ACCESS_KEY_ID=set spark/RELAESE : Spark 1.3.1 (git revision 908a0bf) built for Hadoop 2.6.0 Build flags: -Phadoop-2.4 -Dhadoop.version=2.6.0 -Phive -Phive-thriftserver -Pyarn -DzincPort=3034 ./bin/spark-shell val f = sc.textFile(s3n://bucket/file) f.count error== java.io.IOException: No FileSystem for scheme: s3n at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2584) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91) at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370) at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296) at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:256) at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228) at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:203) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1512) at org.apache.spark.rdd.RDD.count(RDD.scala:1006) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:24) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:29) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:31) at $iwC$$iwC$$iwC$$iwC$$iwC.init(console:33) at $iwC$$iwC$$iwC$$iwC.init(console:35) at $iwC$$iwC$$iwC.init(console:37) at $iwC$$iwC.init(console:39) at $iwC.init(console:41) at init(console:43) at .init(console:47) at .clinit(console) at .init(console:7) at .clinit(console) at $print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at
Re: Instantiating/starting Spark jobs programmatically
I strongly recommend spawning a new process for the Spark jobs. Much cleaner separation. Your driver program won't be clobbered if the Spark job dies, etc. It can even watch for failures and restart. In the Scala standard library, the sys.process package has classes for constructing and interoperating with external processes. Perhaps Java has something similar these days? dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Tue, Apr 21, 2015 at 2:15 PM, Steve Loughran ste...@hortonworks.com wrote: On 21 Apr 2015, at 17:34, Richard Marscher rmarsc...@localytics.com wrote: - There are System.exit calls built into Spark as of now that could kill your running JVM. We have shadowed some of the most offensive bits within our own application to work around this. You'd likely want to do that or to do your own Spark fork. For example, if the SparkContext can't connect to your cluster master node when it is created, it will System.exit. people can block errant System.exit calls by running under a SecurityManager. Less than ideal (and there's a small performance hit) -but possible
Tasks run only on one machine
Using Spark streaming to create a large volume of small nano-batch input files, ~4k per file, thousands of ‘part-x’ files. When reading the nano-batch files and doing a distributed calculation my tasks run only on the machine where it was launched. I’m launching in “yarn-client” mode. The rdd is created using sc.textFile(“list of thousand files”) What would cause the read to occur only on the machine that launched the driver. Do I need to do something to the RDD after reading? Has some partition factor been applied to all derived rdds? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
[Spark Streaming] Help with updateStateByKey()
Hi everybody, I think I could use some help with the /updateStateByKey()/ JAVA method in Spark Streaming. *Context:* I have a /JavaReceiverInputDStreamDataUpdate du/ DStream, where object /DataUpdate/ mainly has 2 fields of interest (in my case), namely du.personId (an Integer) and du.cell.hashCode() (Integer, again). Obviously, I am processing several /DataUpdate/ objects (coming from a log file read in microbatches), and every /personId/ will be 'associated' to several /du.cell.hashCode()/s. What I need to do is, for every /personId/ statefully counting how many times it appears with a particular /du.cell.hashCode()/, possibly partitioning by the /personId/ key. (Long story short: an area is split in cells and I wonder how many times every person appears in every cell ) In a very naive way, I guess everything should look like a /HashMappersonId, HashMaplt;cell.hashCode(), count/, but I am not quite sure how to partition by /personId/ and increase the count. It looks like method /updateStateByKey()/ should do the trick (I am new to Spark Streaming), yet I can't figure out in which way. Any suggestions? Feel free to ask anything in case I was unclear or more information is needed. :) Thank you! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Help-with-updateStateByKey-tp22637.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Tasks run only on one machine
Sure var columns = mc.textFile(source).map { line = line.split(delimiter) } Here “source” is a comma delimited list of files or directories. Both the textFile and .map tasks happen only on the machine they were launched from. Later other distributed operations happen but I suspect if I can figure out why the fist line is run only on the client machine the rest will clear up too. Here are some subsequent lines. if(filterColumn != -1) { columns = columns.filter { tokens = tokens(filterColumn) == filterBy } } val interactions = columns.map { tokens = tokens(rowIDColumn) - tokens(columnIDPosition) } interactions.cache() On Apr 23, 2015, at 10:14 AM, Jeetendra Gangele gangele...@gmail.com wrote: Will you be able to paste code here? On 23 April 2015 at 22:21, Pat Ferrel p...@occamsmachete.com mailto:p...@occamsmachete.com wrote: Using Spark streaming to create a large volume of small nano-batch input files, ~4k per file, thousands of ‘part-x’ files. When reading the nano-batch files and doing a distributed calculation my tasks run only on the machine where it was launched. I’m launching in “yarn-client” mode. The rdd is created using sc.textFile(“list of thousand files”) What would cause the read to occur only on the machine that launched the driver. Do I need to do something to the RDD after reading? Has some partition factor been applied to all derived rdds? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org mailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org mailto:user-h...@spark.apache.org
Re: Tasks run only on one machine
Argh, I looked and there really isn’t that much data yet. There will be thousands but starting small. I bet this is just a total data size not requiring all workers thing—sorry, nevermind. On Apr 23, 2015, at 10:30 AM, Pat Ferrel p...@occamsmachete.com wrote: They are in HDFS so available on all workers On Apr 23, 2015, at 10:29 AM, Pat Ferrel p...@occamsmachete.com wrote: Physically? Not sure, they were written using the nano-batch rdds in a streaming job that is in a separate driver. The job is a Kafka consumer. Would that effect all derived rdds? If so is there something I can do to mix it up or does Spark know best about execution speed here? On Apr 23, 2015, at 10:23 AM, Sean Owen so...@cloudera.com wrote: Where are the file splits? meaning is it possible they were also (only) available on one node and that was also your driver? On Thu, Apr 23, 2015 at 1:21 PM, Pat Ferrel p...@occamsmachete.com wrote: Sure var columns = mc.textFile(source).map { line = line.split(delimiter) } Here “source” is a comma delimited list of files or directories. Both the textFile and .map tasks happen only on the machine they were launched from. Later other distributed operations happen but I suspect if I can figure out why the fist line is run only on the client machine the rest will clear up too. Here are some subsequent lines. if(filterColumn != -1) { columns = columns.filter { tokens = tokens(filterColumn) == filterBy } } val interactions = columns.map { tokens = tokens(rowIDColumn) - tokens(columnIDPosition) } interactions.cache() On Apr 23, 2015, at 10:14 AM, Jeetendra Gangele gangele...@gmail.com wrote: Will you be able to paste code here? On 23 April 2015 at 22:21, Pat Ferrel p...@occamsmachete.com wrote: Using Spark streaming to create a large volume of small nano-batch input files, ~4k per file, thousands of ‘part-x’ files. When reading the nano-batch files and doing a distributed calculation my tasks run only on the machine where it was launched. I’m launching in “yarn-client” mode. The rdd is created using sc.textFile(“list of thousand files”) What would cause the read to occur only on the machine that launched the driver. Do I need to do something to the RDD after reading? Has some partition factor been applied to all derived rdds? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Slower performance when bigger memory?
Hi All, I am running some benchmark on r3*8xlarge instance. I have a cluster with one master (no executor on it) and one slave (r3*8xlarge). My job has 1000 tasks in stage 0. R3*8xlarge has 244G memory and 32 cores. If I create 4 executors, each has 8 core+50G memory, each task will take around 320s-380s. And if I only use one big executor with 32 cores and 200G memory, each task will take 760s-900s. And I check the log, looks like the minor GC takes much longer when using 200G memory: 285.242: [GC [PSYoungGen: 29027310K-8646087K(31119872K)] 38810417K-19703013K(135977472K), 11.2509770 secs] [Times: user=38.95 sys=120.65, real=11.25 secs] And when it uses 50G memory, the minor GC takes only less than 1s. I try to see what is the best way to configure the Spark. For some special reason, I tempt to use a bigger memory on single executor if no significant penalty on performance. But now looks like it is? Anyone has any idea? Regards, Shuai
Re: Shuffle files not cleaned up (Spark 1.2.1)
Thanks for the response, Conor. I tried with those settings and for a while it seemed like it was cleaning up shuffle files after itself. However, after exactly 5 hours later it started throwing exceptions and eventually stopped working again. A sample stack trace is below. What is curious about 5 hours is that I set the cleaner ttl to 5 hours after changing the max window size to 1 hour (down from 6 hours in order to test). It also stopped cleaning the shuffle files after this started happening. Any idea why this could be happening? 2015-04-22 17:39:52,040 ERROR Executor task launch worker-989 Executor.logError - Exception in task 0.0 in stage 215425.0 (TID 425147) java.lang.Exception: Could not compute split, block input-0-1429706099000 not found at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:198) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Thanks NB On Tue, Apr 21, 2015 at 5:14 AM, Conor Fennell conor.fenn...@altocloud.com wrote: Hi, We set the spark.cleaner.ttl to some reasonable time and also set spark.streaming.unpersist=true. Those together cleaned up the shuffle files for us. -Conor On Tue, Apr 21, 2015 at 8:18 AM, N B nb.nos...@gmail.com wrote: We already do have a cron job in place to clean just the shuffle files. However, what I would really like to know is whether there is a proper way of telling spark to clean up these files once its done with them? Thanks NB On Mon, Apr 20, 2015 at 10:47 AM, Jeetendra Gangele gangele...@gmail.com wrote: Write a crone job for this like below 12 * * * * find $SPARK_HOME/work -cmin +1440 -prune -exec rm -rf {} \+ 32 * * * * find /tmp -type d -cmin +1440 -name spark-*-*-* -prune -exec rm -rf {} \+ 52 * * * * find $SPARK_LOCAL_DIR -mindepth 1 -maxdepth 1 -type d -cmin +1440 -name spark-*-*-* -prune -exec rm -rf {} \+ On 20 April 2015 at 23:12, N B nb.nos...@gmail.com wrote: Hi all, I had posed this query as part of a different thread but did not get a response there. So creating a new thread hoping to catch someone's attention. We are experiencing this issue of shuffle files being left behind and not being cleaned up by Spark. Since this is a Spark streaming application, it is expected to stay up indefinitely, so shuffle files not being cleaned up is a big problem right now. Our max window size is 6 hours, so we have set up a cron job to clean up shuffle files older than 12 hours otherwise it will eat up all our disk space. Please see the following. It seems the non-cleaning of shuffle files is being documented in 1.3.1. https://github.com/apache/spark/pull/5074/files https://issues.apache.org/jira/browse/SPARK-5836 Also, for some reason, the following JIRAs that were reported as functional issues were closed as Duplicates of the above Documentation bug. Does this mean that this issue won't be tackled at all? https://issues.apache.org/jira/browse/SPARK-3563 https://issues.apache.org/jira/browse/SPARK-4796 https://issues.apache.org/jira/browse/SPARK-6011 Any further insight into whether this is being looked into and meanwhile how to handle shuffle files will be greatly appreciated. Thanks NB
Re: Bug? Can't reference to the column by name after join two DataFrame on a same name key
Hi Shuai, You can use as to create a table alias. For example, df1.as(df1). Then you can use $df1.col to refer it. Thanks, Yin On Thu, Apr 23, 2015 at 11:14 AM, Shuai Zheng szheng.c...@gmail.com wrote: Hi All, I use 1.3.1 When I have two DF and join them on a same name key, after that, I can’t get the common key by name. Basically: select * from t1 inner join t2 on t1.col1 = t2.col1 And I am using purely DataFrame, spark SqlContext not HiveContext DataFrame df3 = df1.join(df2, df1.col(col).equalTo(df2.col(col))).select( *col*); because df1 and df2 join on the same key col, Then I can't reference the key col. I understand I should use a full qualified name for that column (like in SQL, use t1.col), but I don’t know how should I address this in spark sql. Exception in thread main org.apache.spark.sql.AnalysisException: Reference 'id' is ambiguous, could be: id#8L, id#0L.; It looks that joined key can't be referenced by name or by df1.col name pattern. The https://issues.apache.org/jira/browse/SPARK-5278 refer to a hive case, so I am not sure whether it is the same issue, but I still have the issue in latest code. It looks like the result after join won't keep the parent DF information anywhere? I check the ticket: https://issues.apache.org/jira/browse/SPARK-6273 But not sure whether it is the same issue? Should I open a new ticket for this? Regards, Shuai
Re: Tasks run only on one machine
Where are the file splits? meaning is it possible they were also (only) available on one node and that was also your driver? On Thu, Apr 23, 2015 at 1:21 PM, Pat Ferrel p...@occamsmachete.com wrote: Sure var columns = mc.textFile(source).map { line = line.split(delimiter) } Here “source” is a comma delimited list of files or directories. Both the textFile and .map tasks happen only on the machine they were launched from. Later other distributed operations happen but I suspect if I can figure out why the fist line is run only on the client machine the rest will clear up too. Here are some subsequent lines. if(filterColumn != -1) { columns = columns.filter { tokens = tokens(filterColumn) == filterBy } } val interactions = columns.map { tokens = tokens(rowIDColumn) - tokens(columnIDPosition) } interactions.cache() On Apr 23, 2015, at 10:14 AM, Jeetendra Gangele gangele...@gmail.com wrote: Will you be able to paste code here? On 23 April 2015 at 22:21, Pat Ferrel p...@occamsmachete.com wrote: Using Spark streaming to create a large volume of small nano-batch input files, ~4k per file, thousands of ‘part-x’ files. When reading the nano-batch files and doing a distributed calculation my tasks run only on the machine where it was launched. I’m launching in “yarn-client” mode. The rdd is created using sc.textFile(“list of thousand files”) What would cause the read to occur only on the machine that launched the driver. Do I need to do something to the RDD after reading? Has some partition factor been applied to all derived rdds? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Slower performance when bigger memory?
Shuai: Please take a look at: http://blog.takipi.com/garbage-collectors-serial-vs-parallel-vs-cms-vs-the-g1-and-whats-new-in-java-8/ On Apr 23, 2015, at 10:18 AM, Dean Wampler deanwamp...@gmail.com wrote: JVM's often have significant GC overhead with heaps bigger than 64GB. You might try your experiments with configurations below this threshold. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition (O'Reilly) Typesafe @deanwampler http://polyglotprogramming.com On Thu, Apr 23, 2015 at 12:14 PM, Shuai Zheng szheng.c...@gmail.com wrote: Hi All, I am running some benchmark on r3*8xlarge instance. I have a cluster with one master (no executor on it) and one slave (r3*8xlarge). My job has 1000 tasks in stage 0. R3*8xlarge has 244G memory and 32 cores. If I create 4 executors, each has 8 core+50G memory, each task will take around 320s-380s. And if I only use one big executor with 32 cores and 200G memory, each task will take 760s-900s. And I check the log, looks like the minor GC takes much longer when using 200G memory: 285.242: [GC [PSYoungGen: 29027310K-8646087K(31119872K)] 38810417K-19703013K(135977472K), 11.2509770 secs] [Times: user=38.95 sys=120.65, real=11.25 secs] And when it uses 50G memory, the minor GC takes only less than 1s. I try to see what is the best way to configure the Spark. For some special reason, I tempt to use a bigger memory on single executor if no significant penalty on performance. But now looks like it is? Anyone has any idea? Regards, Shuai
Re: Tasks run only on one machine
Physically? Not sure, they were written using the nano-batch rdds in a streaming job that is in a separate driver. The job is a Kafka consumer. Would that effect all derived rdds? If so is there something I can do to mix it up or does Spark know best about execution speed here? On Apr 23, 2015, at 10:23 AM, Sean Owen so...@cloudera.com wrote: Where are the file splits? meaning is it possible they were also (only) available on one node and that was also your driver? On Thu, Apr 23, 2015 at 1:21 PM, Pat Ferrel p...@occamsmachete.com wrote: Sure var columns = mc.textFile(source).map { line = line.split(delimiter) } Here “source” is a comma delimited list of files or directories. Both the textFile and .map tasks happen only on the machine they were launched from. Later other distributed operations happen but I suspect if I can figure out why the fist line is run only on the client machine the rest will clear up too. Here are some subsequent lines. if(filterColumn != -1) { columns = columns.filter { tokens = tokens(filterColumn) == filterBy } } val interactions = columns.map { tokens = tokens(rowIDColumn) - tokens(columnIDPosition) } interactions.cache() On Apr 23, 2015, at 10:14 AM, Jeetendra Gangele gangele...@gmail.com wrote: Will you be able to paste code here? On 23 April 2015 at 22:21, Pat Ferrel p...@occamsmachete.com wrote: Using Spark streaming to create a large volume of small nano-batch input files, ~4k per file, thousands of ‘part-x’ files. When reading the nano-batch files and doing a distributed calculation my tasks run only on the machine where it was launched. I’m launching in “yarn-client” mode. The rdd is created using sc.textFile(“list of thousand files”) What would cause the read to occur only on the machine that launched the driver. Do I need to do something to the RDD after reading? Has some partition factor been applied to all derived rdds? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Map Question
Here it is. How do I access a broadcastVar in a function that's in another module (process_stuff.py below): Thanks, Vadim main.py --- from pyspark import SparkContext, SparkConf from pyspark.streaming import StreamingContext from pyspark.sql import SQLContext from process_stuff import myfunc from metadata import get_metadata conf = SparkConf().setAppName('My App').setMaster('local[4]') sc = SparkContext(conf=conf) ssc = StreamingContext(sc, 30) sqlContext = SQLContext(sc) distFile = ssc.textFileStream(s3n://...) distFile.foreachRDD(process) mylist = get_metadata() print 'BROADCASTING...' broadcastVar = sc.broadcast(mylist) print broadcastVar print broadcastVar.value print 'FINISHED BROADCASTING...' ## mylist and broadcastVar, broadcastVar.value print fine def getSqlContextInstance(sparkContext): if ('sqlContextSingletonInstance' not in globals()): globals()['sqlContextSingletonInstance'] = SQLContext(sparkContext) return globals()['sqlContextSingletonInstance'] def process(rdd): sqlContext = getSqlContextInstance(rdd.context) if rdd.take(1): jsondf = sqlContext.jsonRDD(rdd) #jsondf.printSchema() jsondf.registerTempTable('mytable') stuff = sqlContext.sql(SELECT ...) stuff_mapped = stuff.map(myfunc) ## I want myfunc to see mylist from above? ... process_stuff.py -- def myfunc(x): metadata = broadcastVar.value # NameError: broadcastVar not found -- HOW TO FIX? ... metadata.py def get_metadata(): ... return mylist ᐧ On Wed, Apr 22, 2015 at 6:47 PM, Tathagata Das t...@databricks.com wrote: Can you give full code? especially the myfunc? On Wed, Apr 22, 2015 at 2:20 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Here's what I did: print 'BROADCASTING...' broadcastVar = sc.broadcast(mylist) print broadcastVar print broadcastVar.value print 'FINISHED BROADCASTING...' The above works fine, but when I call myrdd.map(myfunc) I get *NameError: global name 'broadcastVar' is not defined* The myfunc function is in a different module. How do I make it aware of broadcastVar? ᐧ On Wed, Apr 22, 2015 at 2:13 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Great. Will try to modify the code. Always room to optimize! ᐧ On Wed, Apr 22, 2015 at 2:11 PM, Tathagata Das t...@databricks.com wrote: Absolutely. The same code would work for local as well as distributed mode! On Wed, Apr 22, 2015 at 11:08 AM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Can I use broadcast vars in local mode? ᐧ On Wed, Apr 22, 2015 at 2:06 PM, Tathagata Das t...@databricks.com wrote: Yep. Not efficient. Pretty bad actually. That's why broadcast variable were introduced right at the very beginning of Spark. On Wed, Apr 22, 2015 at 10:58 AM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Thanks TD. I was looking into broadcast variables. Right now I am running it locally...and I plan to move it to production on EC2. The way I fixed it is by doing myrdd.map(lambda x: (x, mylist)).map(myfunc) but I don't think it's efficient? mylist is filled only once at the start and never changes. Vadim ᐧ On Wed, Apr 22, 2015 at 1:42 PM, Tathagata Das t...@databricks.com wrote: Is the mylist present on every executor? If not, then you have to pass it on. And broadcasts are the best way to pass them on. But note that once broadcasted it will immutable at the executors, and if you update the list at the driver, you will have to broadcast it again. TD On Wed, Apr 22, 2015 at 9:28 AM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: I am using Spark Streaming with Python. For each RDD, I call a map, i.e., myrdd.map(myfunc), myfunc is in a separate Python module. In yet another separate Python module I have a global list, i.e. mylist, that's populated with metadata. I can't get myfunc to see mylist...it's always empty. Alternatively, I guess I could pass mylist to map. Any suggestions? Thanks, Vadim
Re: Map Question
Thanks Ilya. I am having trouble doing that. Can you give me an example? ᐧ On Thu, Apr 23, 2015 at 12:06 PM, Ganelin, Ilya ilya.gane...@capitalone.com wrote: You need to expose that variable the same way you'd expose any other variable in Python that you wanted to see across modules. As long as you share a spark context all will work as expected. http://stackoverflow.com/questions/142545/python-how-to-make-a-cross-module-variable Sent with Good (www.good.com) -Original Message- *From: *Vadim Bichutskiy [vadim.bichuts...@gmail.com] *Sent: *Thursday, April 23, 2015 12:00 PM Eastern Standard Time *To: *Tathagata Das *Cc: *user@spark.apache.org *Subject: *Re: Map Question Here it is. How do I access a broadcastVar in a function that's in another module (process_stuff.py below): Thanks, Vadim main.py --- from pyspark import SparkContext, SparkConf from pyspark.streaming import StreamingContext from pyspark.sql import SQLContext from process_stuff import myfunc from metadata import get_metadata conf = SparkConf().setAppName('My App').setMaster('local[4]') sc = SparkContext(conf=conf) ssc = StreamingContext(sc, 30) sqlContext = SQLContext(sc) distFile = ssc.textFileStream(s3n://...) distFile.foreachRDD(process) mylist = get_metadata() print 'BROADCASTING...' broadcastVar = sc.broadcast(mylist) print broadcastVar print broadcastVar.value print 'FINISHED BROADCASTING...' ## mylist and broadcastVar, broadcastVar.value print fine def getSqlContextInstance(sparkContext): if ('sqlContextSingletonInstance' not in globals()): globals()['sqlContextSingletonInstance'] = SQLContext(sparkContext) return globals()['sqlContextSingletonInstance'] def process(rdd): sqlContext = getSqlContextInstance(rdd.context) if rdd.take(1): jsondf = sqlContext.jsonRDD(rdd) #jsondf.printSchema() jsondf.registerTempTable('mytable') stuff = sqlContext.sql(SELECT ...) stuff_mapped = stuff.map(myfunc) ## I want myfunc to see mylist from above? ... process_stuff.py -- def myfunc(x): metadata = broadcastVar.value # NameError: broadcastVar not found -- HOW TO FIX? ... metadata.py def get_metadata(): ... return mylist ᐧ On Wed, Apr 22, 2015 at 6:47 PM, Tathagata Das t...@databricks.com wrote: Can you give full code? especially the myfunc? On Wed, Apr 22, 2015 at 2:20 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Here's what I did: print 'BROADCASTING...' broadcastVar = sc.broadcast(mylist) print broadcastVar print broadcastVar.value print 'FINISHED BROADCASTING...' The above works fine, but when I call myrdd.map(myfunc) I get *NameError: global name 'broadcastVar' is not defined* The myfunc function is in a different module. How do I make it aware of broadcastVar? ᐧ On Wed, Apr 22, 2015 at 2:13 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Great. Will try to modify the code. Always room to optimize! ᐧ On Wed, Apr 22, 2015 at 2:11 PM, Tathagata Das t...@databricks.com wrote: Absolutely. The same code would work for local as well as distributed mode! On Wed, Apr 22, 2015 at 11:08 AM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Can I use broadcast vars in local mode? ᐧ On Wed, Apr 22, 2015 at 2:06 PM, Tathagata Das t...@databricks.com wrote: Yep. Not efficient. Pretty bad actually. That's why broadcast variable were introduced right at the very beginning of Spark. On Wed, Apr 22, 2015 at 10:58 AM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Thanks TD. I was looking into broadcast variables. Right now I am running it locally...and I plan to move it to production on EC2. The way I fixed it is by doing myrdd.map(lambda x: (x, mylist)).map(myfunc) but I don't think it's efficient? mylist is filled only once at the start and never changes. Vadim ᐧ On Wed, Apr 22, 2015 at 1:42 PM, Tathagata Das t...@databricks.com wrote: Is the mylist present on every executor? If not, then you have to pass it on. And broadcasts are the best way to pass them on. But note that once broadcasted it will immutable at the executors, and if you update the list at the driver, you will have to broadcast it again. TD On Wed, Apr 22, 2015 at 9:28 AM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: I am using Spark Streaming with Python. For each RDD, I call a map, i.e., myrdd.map(myfunc), myfunc is in a separate Python module. In yet another separate Python module I have a global list, i.e. mylist, that's populated with metadata. I can't get myfunc to see mylist...it's always empty. Alternatively, I guess I could pass mylist to map. Any suggestions? Thanks, Vadim -- The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted
Re: Tasks run only on one machine
Will you be able to paste code here? On 23 April 2015 at 22:21, Pat Ferrel p...@occamsmachete.com wrote: Using Spark streaming to create a large volume of small nano-batch input files, ~4k per file, thousands of 'part-x' files. When reading the nano-batch files and doing a distributed calculation my tasks run only on the machine where it was launched. I'm launching in yarn-client mode. The rdd is created using sc.textFile(list of thousand files) What would cause the read to occur only on the machine that launched the driver. Do I need to do something to the RDD after reading? Has some partition factor been applied to all derived rdds? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: dynamicAllocation spark-shell
Hi, Attempted to request a negative number of executor(s) -663 from the cluster manager. Please specify a positive number! This is a bug in dynamic allocation. Here is the jira- https://issues.apache.org/jira/browse/SPARK-6954 Thanks! Cheolsoo On Thu, Apr 23, 2015 at 7:57 AM, Michael Stone mst...@mathom.us wrote: If I enable dynamicAllocation and then use spark-shell or pyspark, things start out working as expected: running simple commands causes new executors to start and complete tasks. If the shell is left idle for a while, executors start getting killed off: 15/04/23 10:52:43 INFO cluster.YarnClientSchedulerBackend: Requesting to kill executor(s) 368 15/04/23 10:52:43 INFO spark.ExecutorAllocationManager: Removing executor 368 because it has been idle for 600 seconds (new desired total will be 665) That makes sense. But the action also results in error messages: 15/04/23 10:52:47 ERROR cluster.YarnScheduler: Lost executor 368 on hostname: remote Akka client disassociated 15/04/23 10:52:47 INFO scheduler.DAGScheduler: Executor lost: 368 (epoch 0) 15/04/23 10:52:47 INFO spark.ExecutorAllocationManager: Existing executor 368 has been removed (new total is 665) 15/04/23 10:52:47 INFO storage.BlockManagerMasterActor: Trying to remove executor 368 from BlockManagerMaster. 15/04/23 10:52:47 INFO storage.BlockManagerMasterActor: Removing block manager BlockManagerId(368, hostname, 35877) 15/04/23 10:52:47 INFO storage.BlockManagerMaster: Removed 368 successfully in removeExecutor After that, trying to run a simple command results in: 15/04/23 10:13:30 ERROR util.Utils: Uncaught exception in thread spark-dynamic-executor-allocation-0 java.lang.IllegalArgumentException: Attempted to request a negative number of executor(s) -663 from the cluster manager. Please specify a positive number! And then only the single remaining executor attempts to complete the new tasks. Am I missing some kind of simple configuration item, are other people seeing the same behavior as a bug, or is this actually expected? Mike Stone - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Shuffle files not cleaned up (Spark 1.2.1)
What was the state of your streaming application? Was it falling behind with a large increasing scheduling delay? TD On Thu, Apr 23, 2015 at 11:31 AM, N B nb.nos...@gmail.com wrote: Thanks for the response, Conor. I tried with those settings and for a while it seemed like it was cleaning up shuffle files after itself. However, after exactly 5 hours later it started throwing exceptions and eventually stopped working again. A sample stack trace is below. What is curious about 5 hours is that I set the cleaner ttl to 5 hours after changing the max window size to 1 hour (down from 6 hours in order to test). It also stopped cleaning the shuffle files after this started happening. Any idea why this could be happening? 2015-04-22 17:39:52,040 ERROR Executor task launch worker-989 Executor.logError - Exception in task 0.0 in stage 215425.0 (TID 425147) java.lang.Exception: Could not compute split, block input-0-1429706099000 not found at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:198) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Thanks NB On Tue, Apr 21, 2015 at 5:14 AM, Conor Fennell conor.fenn...@altocloud.com wrote: Hi, We set the spark.cleaner.ttl to some reasonable time and also set spark.streaming.unpersist=true. Those together cleaned up the shuffle files for us. -Conor On Tue, Apr 21, 2015 at 8:18 AM, N B nb.nos...@gmail.com wrote: We already do have a cron job in place to clean just the shuffle files. However, what I would really like to know is whether there is a proper way of telling spark to clean up these files once its done with them? Thanks NB On Mon, Apr 20, 2015 at 10:47 AM, Jeetendra Gangele gangele...@gmail.com wrote: Write a crone job for this like below 12 * * * * find $SPARK_HOME/work -cmin +1440 -prune -exec rm -rf {} \+ 32 * * * * find /tmp -type d -cmin +1440 -name spark-*-*-* -prune -exec rm -rf {} \+ 52 * * * * find $SPARK_LOCAL_DIR -mindepth 1 -maxdepth 1 -type d -cmin +1440 -name spark-*-*-* -prune -exec rm -rf {} \+ On 20 April 2015 at 23:12, N B nb.nos...@gmail.com wrote: Hi all, I had posed this query as part of a different thread but did not get a response there. So creating a new thread hoping to catch someone's attention. We are experiencing this issue of shuffle files being left behind and not being cleaned up by Spark. Since this is a Spark streaming application, it is expected to stay up indefinitely, so shuffle files not being cleaned up is a big problem right now. Our max window size is 6 hours, so we have set up a cron job to clean up shuffle files older than 12 hours otherwise it will eat up all our disk space. Please see the following. It seems the non-cleaning of shuffle files is being documented in 1.3.1. https://github.com/apache/spark/pull/5074/files https://issues.apache.org/jira/browse/SPARK-5836 Also, for some reason, the following JIRAs that were reported as functional issues were closed as Duplicates of the above Documentation bug. Does this mean that this issue won't be tackled at all? https://issues.apache.org/jira/browse/SPARK-3563 https://issues.apache.org/jira/browse/SPARK-4796 https://issues.apache.org/jira/browse/SPARK-6011 Any further insight into whether this is being looked into and meanwhile how to handle shuffle files will be greatly appreciated. Thanks NB
Non-Deterministic Graph Building
Hi Everyone, I am running into a really weird problem that only one other person has reported to the best of my knowledge (and the thread never yielded a resolution). I build a GraphX Graph from an input EdgeRDD and VertexRDD via the Graph(VertexRDD,EdgeRDD) constructor. When I execute Graph.triplets on the Graph I get wildly varying results where the triplet source and destination vertex data are inconsistent between runs and rarely, if ever, match what I would expect from the input edge pairs that are used to generate the VertexRDD and EdgeRDDs. Here's what I know for sure: 1. Consistency of Input Edge Data--I read the edges in from HBase and generate a raw edge RDD containing tuples consisting of a source edge name and destination edge name. I've written this RDD out to HDFS over several runs and confirmed that generation of the raw edge RDD is deterministic. 2. Consistency of Edge and Vertex Count--the overall numbers of edges and vertices in the EdgeRDD and VertexRDD, respectively, are consistent between jobs. 3. Inconsistency of Triplet Data--the output from Graph.triplets varies between jobs, where the edge pairings are different. 4. Disconnect Between Input Edge Data and Triplets--the input edge data often does not match the corresponding triplet data for the same job, but in some cases will. Interestingly, while the actual edge pairings as seen in the input edge data RDD and the triplets often don't match, the total number of edges in the input edge RDD and triplets RDD for each edge name is the same. Based upon what I've seen, it seems as if the vertex ids are skewed somehow, especially given point (4) where I noted that the total number of appearances of an edge name is consistent between input edge RDD data and triplet RDD data for the same job but, again, the pairings with edges on the other end of the relationship can vary. I will post my code later tonight/tomorrow AM, but wanted to see if this problem description matches what anyone else has seen. Thanks --John -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Non-Deterministic-Graph-Building-tp22638.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Question regarding join with multiple columns with pyspark
Hi experts, Sorry if this is a n00b question or has already been answered... Am trying to use the data frames API in python to join 2 dataframes with more than 1 column. The example I've seen in the documentation only shows a single column - so I tried this: Example code import pandas as pd from pyspark.sql import SQLContext hc = SQLContext(sc) A = pd.DataFrame({'year': ['1993', '2005', '1994'], 'month': ['5', '12', '12'], 'value': [100, 200, 300]}) a = hc.createDataFrame(A) B = pd.DataFrame({'year': ['1993', '1993'], 'month': ['12', '12'], 'value': [101, 102]}) b = hc.createDataFrame(B) print Pandas # try with Pandas print A print B print pd.merge(A, B, on=['year', 'month'], how='inner') print Spark print a.toPandas() print b.toPandas() print a.join(b, a.year==b.year and a.month==b.month, 'inner').toPandas() *Output Pandas month value year 0 5100 1993 112200 2005 212300 1994 month value year 012101 1993 112102 1993 Empty DataFrame Columns: [month, value_x, year, value_y] Index: [] Spark month value year 0 5100 1993 112200 2005 212300 1994 month value year 012101 1993 112102 1993 month value year month value year 012200 200512102 1993 112200 200512101 1993 212300 199412102 1993 312300 199412101 1993 It looks like Spark returns some results where an inner join should return nothing. Am I doing the join with two columns in the wrong way? If yes, what is the right syntax for this? Thanks! Ali - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Bug? Can't reference to the column by name after join two DataFrame on a same name key
Got it. Thanks! J From: Yin Huai [mailto:yh...@databricks.com] Sent: Thursday, April 23, 2015 2:35 PM To: Shuai Zheng Cc: user Subject: Re: Bug? Can't reference to the column by name after join two DataFrame on a same name key Hi Shuai, You can use as to create a table alias. For example, df1.as(df1). Then you can use $df1.col to refer it. Thanks, Yin On Thu, Apr 23, 2015 at 11:14 AM, Shuai Zheng szheng.c...@gmail.com wrote: Hi All, I use 1.3.1 When I have two DF and join them on a same name key, after that, I can’t get the common key by name. Basically: select * from t1 inner join t2 on t1.col1 = t2.col1 And I am using purely DataFrame, spark SqlContext not HiveContext DataFrame df3 = df1.join(df2, df1.col(col).equalTo(df2.col(col))).select(col); because df1 and df2 join on the same key col, Then I can't reference the key col. I understand I should use a full qualified name for that column (like in SQL, use t1.col), but I don’t know how should I address this in spark sql. Exception in thread main org.apache.spark.sql.AnalysisException: Reference 'id' is ambiguous, could be: id#8L, id#0L.; It looks that joined key can't be referenced by name or by df1.col name pattern. The https://issues.apache.org/jira/browse/SPARK-5278 refer to a hive case, so I am not sure whether it is the same issue, but I still have the issue in latest code. It looks like the result after join won't keep the parent DF information anywhere? I check the ticket: https://issues.apache.org/jira/browse/SPARK-6273 But not sure whether it is the same issue? Should I open a new ticket for this? Regards, Shuai
Re: why does groupByKey return RDD[(K, Iterable[V])] not RDD[(K, CompactBuffer[V])] ?
Should I repost this to dev list ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/why-does-groupByKey-return-RDD-K-Iterable-V-not-RDD-K-CompactBuffer-V-tp22616p22640.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Getting error running MLlib example with new cluster
I had asked this question before, but wanted to ask again as I think it is related to my pom file or project setup. I have been trying on/off for the past month to try to run this MLlib example: - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Pyspark where do third parties libraries need to be installed under Yarn-client mode
I am trying to figure out python library management. So my question is: Where do third party Python libraries(ex. numpy, scipy, etc.) need to exist if I running a spark job via 'spark-submit' against my cluster in 'yarn client' mode. Do the libraries need to only exist on the client(ie. the server executing the driver code) or do the libraries need to exist on the datanode/worker nodes where the tasks are executed? The documentation seems to indicate that under 'yarn client' the libraries are only need on the client machine not the entire cluster. If the libraries are needed across all cluster machines, any suggestions on a deployment strategy or dependency management model that works well? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Pyspark-where-do-third-parties-libraries-need-to-be-installed-under-Yarn-client-mode-tp22639.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Getting error running MLlib example with new cluster
Sorry, accidentally sent the last email before finishing. I had asked this question before, but wanted to ask again as I think it is now related to my pom file or project setup. Really appreciate the help! I have been trying on/off for the past month to try to run this MLlib example: https://github.com/databricks/learning-spark/blob/master/src/main/scala/com/oreilly/learningsparkexamples/scala/MLlib.scala I am able to build the project successfully. When I run it, it returns: features in spam: 8 features in ham: 7 and then freezes. According to the UI, the description of the job is count at DataValidators.scala.38. This corresponds to this line in the code: val model = lrLearner.run(trainingData) I've tried just about everything I can think of...changed numFeatures from 1 - 10,000, set executor memory to 1g, set up a new cluster, at this point I think I might have missed dependencies as that has usually been the problem in other spark apps I have tried to run. This is my pom file, that I have used for other successful spark apps. Please let me know if you think I need any additional dependencies or there are incompatibility issues, or a pom.xml that is better to use. Thank you! Cluster information: Spark version: 1.2.0-SNAPSHOT (in my older cluster it is 1.2.0) java version 1.7.0_25 Scala version: 2.10.4 hadoop version: hadoop 2.5.0-cdh5.3.3 (older cluster was 5.3.0) project xmlns = http://maven.apache.org/POM/4.0.0; xmlns:xsi=http://w3.org/2001/XMLSchema-instance; xsi:schemaLocation =http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd; groupId edu.berkely/groupId artifactId simple-project /artifactId modelVersion 4.0.0/modelVersion name Simple Project /name packaging jar /packaging version 1.0 /version repositories repository idcloudera/id url http://repository.cloudera.com/artifactory/cloudera-repos//url /repository repository idscala-tools.org/id nameScala-tools Maven2 Repository/name urlhttp://scala-tools.org/repo-releases/url /repository /repositories pluginRepositories pluginRepository idscala-tools.org/id nameScala-tools Maven2 Repository/name urlhttp://scala-tools.org/repo-releases/url /pluginRepository /pluginRepositories build plugins plugin groupIdorg.scala-tools/groupId artifactIdmaven-scala-plugin/artifactId executions execution idcompile/id goals goalcompile/goal /goals phasecompile/phase /execution execution idtest-compile/id goals goaltestCompile/goal /goals phasetest-compile/phase /execution execution phaseprocess-resources/phase goals goalcompile/goal /goals /execution /executions /plugin plugin artifactIdmaven-compiler-plugin/artifactId configuration source1.7/source target1.7/target /configuration /plugin /plugins /build dependencies dependency !--Spark dependency -- groupId org.apache.spark/groupId artifactIdspark-core_2.10/artifactId version1.2.0-cdh5.3.0/version /dependency dependency groupIdorg.apache.hadoop/groupId artifactIdhadoop-client/artifactId version2.5.0-mr1-cdh5.3.0/version /dependency dependency groupIdorg.scala-lang/groupId artifactIdscala-library/artifactId version2.10.4/version /dependency dependency groupIdorg.scala-lang/groupId artifactIdscala-compiler/artifactId version2.10.4/version /dependency dependency groupIdcom.101tec/groupId artifactIdzkclient/artifactId version0.3/version /dependency dependency groupIdcom.yammer.metrics/groupId artifactIdmetrics-core/artifactId version2.2.0/version /dependency dependency groupIdorg.apache.hadoop/groupId
gridsearch - python
Can anybody point me to an example, if available, about gridsearch with python? Thank you,
Re: problem writing to s3
Hi Akhil I can confirm that the problem goes away when jsonRaw and jsonClean are in different s3 buckets. thanks Daniel On Thu, Apr 23, 2015 at 1:27 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Can you try writing to a different S3 bucket and confirm that? Thanks Best Regards On Thu, Apr 23, 2015 at 12:11 AM, Daniel Mahler dmah...@gmail.com wrote: Hi Akhil, It works fine when outprefix is a hdfs:///localhost/... url. It looks to me as if there is something about spark writing to the same s3 bucket it is reading from. That is the only real difference between the 2 saveAsTextFile whet outprefix is on s3, inpath is also on s3 but in a different bucket, but jsonRaw and jsonClean are distinct directories in the same bucket. I do know know why that should be a problem though. I will rerun using s3 paths and send the log information. thanks Daniel thanks Daniel On Wed, Apr 22, 2015 at 1:45 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Can you look in your worker logs and see whats happening in there? Are you able to write the same to your HDFS? Thanks Best Regards On Wed, Apr 22, 2015 at 4:45 AM, Daniel Mahler dmah...@gmail.com wrote: I am having a strange problem writing to s3 that I have distilled to this minimal example: def jsonRaw = s${outprefix}-json-raw def jsonClean = s${outprefix}-json-clean val txt = sc.textFile(inpath)//.coalesce(shards, false) txt.count val res = txt.saveAsTextFile(jsonRaw) val txt2 = sc.textFile(jsonRaw +/part-*) txt2.count txt2.saveAsTextFile(jsonClean) This code should simply copy files from inpath to jsonRaw and then from jsonRaw to jsonClean. This code executes all the way down to the last line where it hangs after creating the output directory contatining a _temporary_$folder but no actual files not even temporary ones. `outputprefix` is and bucket url, both jsonRaw and jsonClean are in the same bucket. Both calls .count succeed and return the same number. This means Spark can read from inpath and can both read from and write to jsonRaw. Since jsonClean is in the same bucket as jsonRaw and the final line does create the directory, I cannot think of any reason why the files should not be written. If there were any access or url problems they should already manifest when writing jsonRaw. This problem is completely reproduceable with Spark 1.2.1 and 1.3.1 The console output from the last line is scala txt0.saveAsTextFile(jsonClean) 15/04/21 22:55:48 INFO storage.BlockManager: Removing broadcast 3 15/04/21 22:55:48 INFO storage.BlockManager: Removing block broadcast_3_piece0 15/04/21 22:55:48 INFO storage.MemoryStore: Block broadcast_3_piece0 of size 2024 dropped from memory (free 278251716) 15/04/21 22:55:48 INFO storage.BlockManagerInfo: Removed broadcast_3_piece0 on ip-10-51-181-81.ec2.internal:45199 in memory (size: 2024.0 B, free: 265.4 MB) 15/04/21 22:55:48 INFO storage.BlockManagerMaster: Updated info of block broadcast_3_piece0 15/04/21 22:55:48 INFO storage.BlockManager: Removing block broadcast_3 15/04/21 22:55:48 INFO storage.MemoryStore: Block broadcast_3 of size 2728 dropped from memory (free 27825) 15/04/21 22:55:48 INFO storage.BlockManagerInfo: Removed broadcast_3_piece0 on ip-10-166-129-153.ec2.internal:46671 in memory (size: 2024.0 B, free: 13.8 GB) 15/04/21 22:55:48 INFO storage.BlockManagerInfo: Removed broadcast_3_piece0 on ip-10-51-153-34.ec2.internal:51691 in memory (size: 2024.0 B, free: 13.8 GB) 15/04/21 22:55:48 INFO storage.BlockManagerInfo: Removed broadcast_3_piece0 on ip-10-158-142-155.ec2.internal:54690 in memory (size: 2024.0 B, free: 13.8 GB) 15/04/21 22:55:48 INFO storage.BlockManagerInfo: Removed broadcast_3_piece0 on ip-10-61-144-7.ec2.internal:44849 in memory (size: 2024.0 B, free: 13.8 GB) 15/04/21 22:55:48 INFO storage.BlockManagerInfo: Removed broadcast_3_piece0 on ip-10-69-77-180.ec2.internal:42417 in memory (size: 2024.0 B, free: 13.8 GB) 15/04/21 22:55:48 INFO spark.ContextCleaner: Cleaned broadcast 3 15/04/21 22:55:49 INFO spark.SparkContext: Starting job: saveAsTextFile at console:38 15/04/21 22:55:49 INFO scheduler.DAGScheduler: Got job 2 (saveAsTextFile at console:38) with 96 output partitions (allowLocal=false) 15/04/21 22:55:49 INFO scheduler.DAGScheduler: Final stage: Stage 2(saveAsTextFile at console:38) 15/04/21 22:55:49 INFO scheduler.DAGScheduler: Parents of final stage: List() 15/04/21 22:55:49 INFO scheduler.DAGScheduler: Missing parents: List() 15/04/21 22:55:49 INFO scheduler.DAGScheduler: Submitting Stage 2 (MapPartitionsRDD[5] at saveAsTextFile at console:38), which has no missing parents 15/04/21 22:55:49 INFO storage.MemoryStore: ensureFreeSpace(22248) called with curMem=48112, maxMem=278302556 15/04/21 22:55:49 INFO storage.MemoryStore: Block broadcast_4 stored as values in memory (estimated size 21.7 KB, free 265.3 MB) 15/04/21 22:55:49 INFO storage.MemoryStore: ensureFreeSpace(17352)
Re: Tasks run only on one machine
They are in HDFS so available on all workers On Apr 23, 2015, at 10:29 AM, Pat Ferrel p...@occamsmachete.com wrote: Physically? Not sure, they were written using the nano-batch rdds in a streaming job that is in a separate driver. The job is a Kafka consumer. Would that effect all derived rdds? If so is there something I can do to mix it up or does Spark know best about execution speed here? On Apr 23, 2015, at 10:23 AM, Sean Owen so...@cloudera.com wrote: Where are the file splits? meaning is it possible they were also (only) available on one node and that was also your driver? On Thu, Apr 23, 2015 at 1:21 PM, Pat Ferrel p...@occamsmachete.com wrote: Sure var columns = mc.textFile(source).map { line = line.split(delimiter) } Here “source” is a comma delimited list of files or directories. Both the textFile and .map tasks happen only on the machine they were launched from. Later other distributed operations happen but I suspect if I can figure out why the fist line is run only on the client machine the rest will clear up too. Here are some subsequent lines. if(filterColumn != -1) { columns = columns.filter { tokens = tokens(filterColumn) == filterBy } } val interactions = columns.map { tokens = tokens(rowIDColumn) - tokens(columnIDPosition) } interactions.cache() On Apr 23, 2015, at 10:14 AM, Jeetendra Gangele gangele...@gmail.com wrote: Will you be able to paste code here? On 23 April 2015 at 22:21, Pat Ferrel p...@occamsmachete.com wrote: Using Spark streaming to create a large volume of small nano-batch input files, ~4k per file, thousands of ‘part-x’ files. When reading the nano-batch files and doing a distributed calculation my tasks run only on the machine where it was launched. I’m launching in “yarn-client” mode. The rdd is created using sc.textFile(“list of thousand files”) What would cause the read to occur only on the machine that launched the driver. Do I need to do something to the RDD after reading? Has some partition factor been applied to all derived rdds? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Bug? Can't reference to the column by name after join two DataFrame on a same name key
Hi All, I use 1.3.1 When I have two DF and join them on a same name key, after that, I can't get the common key by name. Basically: select * from t1 inner join t2 on t1.col1 = t2.col1 And I am using purely DataFrame, spark SqlContext not HiveContext DataFrame df3 = df1.join(df2, df1.col(col).equalTo(df2.col(col))).select(col); because df1 and df2 join on the same key col, Then I can't reference the key col. I understand I should use a full qualified name for that column (like in SQL, use t1.col), but I don't know how should I address this in spark sql. Exception in thread main org.apache.spark.sql.AnalysisException: Reference 'id' is ambiguous, could be: id#8L, id#0L.; It looks that joined key can't be referenced by name or by df1.col name pattern. The https://issues.apache.org/jira/browse/SPARK-5278 refer to a hive case, so I am not sure whether it is the same issue, but I still have the issue in latest code. It looks like the result after join won't keep the parent DF information anywhere? I check the ticket: https://issues.apache.org/jira/browse/SPARK-6273 But not sure whether it is the same issue? Should I open a new ticket for this? Regards, Shuai
Is the Spark-1.3.1 support build with scala 2.8 ?
Is the Spark-1.3.1 support build with scala 2.8 ? Wether it can integrated with kafka_2.8.0-0.8.0 If build with scala 2.10 . Thanks.
Re: why does groupByKey return RDD[(K, Iterable[V])] not RDD[(K, CompactBuffer[V])] ?
If you return an iterable, you are not tying the API to a compactbuffer. Someday, the data could be fetched lazily and he API would not have to change. On Apr 23, 2015 6:59 PM, Dean Wampler deanwamp...@gmail.com wrote: I wasn't involved in this decision (I just make the fries), but CompactBuffer is designed for relatively small data sets that at least fit in memory. It's more or less an Array. In principle, returning an iterator could hide the actual data structure that might be needed to hold a much bigger data set, if necessary. HOWEVER, it actually returns a CompactBuffer. https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L444 Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Thu, Apr 23, 2015 at 5:46 PM, Hao Ren inv...@gmail.com wrote: Should I repost this to dev list ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/why-does-groupByKey-return-RDD-K-Iterable-V-not-RDD-K-CompactBuffer-V-tp22616p22640.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Understanding Spark/MLlib failures
Hi Andrew, I observed similar behavior under high GC pressure, when running ALS. What happened to me was that, there would be very long Full GC pauses (over 600 seconds at times). These would prevent the executors from sending heartbeats to the driver. Then the driver would think that the executor died, so it would kill it. The scheduler would look at the outputs and say: `org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 1` or `Fetch Failed`, then reschedule the job at a different executor. Then these executors would get even more overloaded, causing them to GC more often, and new jobs would be launched with even smaller tasks. Because these executors were being killed by the driver, new jobs with the same name (and less tasks) would be launched. However, it usually led to a spiral of death, where executors were constantly being killed, and the stage wasn't being completed, but restarted with different numbers of tasks. Some configuration parameters that helped me through this process were: spark.executor.memory // decrease the executor memory so that Full GC's take less time, however are more frequent spark.executor.heartbeatInterval // This I set at 60 for 600 seconds (10 minute GC!!) spark.core.connection.ack.wait.timeout // another timeout to set Hope these parameters help you. I haven't directly answered your questions, but there are bits and pieces in there that are hopefully helpful. Best, Burak On Thu, Apr 23, 2015 at 4:11 PM, aleverentz andylevere...@fico.com wrote: [My apologies if this is a re-post. I wasn't subscribed the first time I sent this message, and I'm hoping this second message will get through.] I’ve been using Spark 1.3.0 and MLlib for some machine learning tasks. In a fit of blind optimism, I decided to try running MLlib’s Principal Components Analayis (PCA) on a dataset with approximately 10,000 columns and 200,000 rows. The Spark job has been running for about 5 hours on a small cluster, and it has been stuck on a particular job (treeAggregate at RowMatrix.scala:119) for most of that time. The treeAggregate job is now on retry 5, and after each failure it seems that the next retry uses a smaller number of tasks. (Initially, there were around 80 tasks; later it was down to 50, then 42; now it’s down to 16.) The web UI shows the following error under failed stages: org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 1. This raises a few questions: 1. What does missing an output location for shuffle 1 mean? I’m guessing this cryptic error message is indicative of some more fundamental problem (out of memory? out of disk space?), but I’m not sure how to diagnose it. 2. Why do subsequent retries use fewer and fewer tasks? Does this mean that the algorithm is actually making progress? Or is the scheduler just performing some kind of repartitioning and starting over from scratch? (Also, If the algorithm is in fact making progress, should I expect it to finish eventually? Or do repeated failures generally indicate that the cluster is too small to perform the given task?) 3. Is it reasonable to expect that I could get PCA to run on this dataset using the same cluster simply by changing some configuration parameters? Or is a larger cluster with significantly more resources per node the only way around this problem? 4. In general, are there any tips for diagnosing performance issues like the one above? I've spent some time trying to get a few different algorithms to scale to larger and larger datasets, and whenever I run into a failure, I'd like to be able to identify the bottleneck that is preventing further scaling. Any general advice for doing that kind of detective work would be much appreciated. Thanks, ~ Andrew -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Understanding-Spark-MLlib-failures-tp22641.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: why does groupByKey return RDD[(K, Iterable[V])] not RDD[(K, CompactBuffer[V])] ?
because CompactBuffer is considered an implementation detail. It is also not public for the same reason. On Thu, Apr 23, 2015 at 6:46 PM, Hao Ren inv...@gmail.com wrote: Should I repost this to dev list ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/why-does-groupByKey-return-RDD-K-Iterable-V-not-RDD-K-CompactBuffer-V-tp22616p22640.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Understanding Spark/MLlib failures
[My apologies if this is a re-post. I wasn't subscribed the first time I sent this message, and I'm hoping this second message will get through.] I’ve been using Spark 1.3.0 and MLlib for some machine learning tasks. In a fit of blind optimism, I decided to try running MLlib’s Principal Components Analayis (PCA) on a dataset with approximately 10,000 columns and 200,000 rows. The Spark job has been running for about 5 hours on a small cluster, and it has been stuck on a particular job (treeAggregate at RowMatrix.scala:119) for most of that time. The treeAggregate job is now on retry 5, and after each failure it seems that the next retry uses a smaller number of tasks. (Initially, there were around 80 tasks; later it was down to 50, then 42; now it’s down to 16.) The web UI shows the following error under failed stages: org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 1. This raises a few questions: 1. What does missing an output location for shuffle 1 mean? I’m guessing this cryptic error message is indicative of some more fundamental problem (out of memory? out of disk space?), but I’m not sure how to diagnose it. 2. Why do subsequent retries use fewer and fewer tasks? Does this mean that the algorithm is actually making progress? Or is the scheduler just performing some kind of repartitioning and starting over from scratch? (Also, If the algorithm is in fact making progress, should I expect it to finish eventually? Or do repeated failures generally indicate that the cluster is too small to perform the given task?) 3. Is it reasonable to expect that I could get PCA to run on this dataset using the same cluster simply by changing some configuration parameters? Or is a larger cluster with significantly more resources per node the only way around this problem? 4. In general, are there any tips for diagnosing performance issues like the one above? I've spent some time trying to get a few different algorithms to scale to larger and larger datasets, and whenever I run into a failure, I'd like to be able to identify the bottleneck that is preventing further scaling. Any general advice for doing that kind of detective work would be much appreciated. Thanks, ~ Andrew -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Understanding-Spark-MLlib-failures-tp22641.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Understanding Spark/MLlib failures
Hi Andrew, The .principalComponents feature of RowMatrix is currently constrained to tall and skinny matrices. Your matrix is barely above the skinny requirement (10k columns), though the number of rows is fine. What are you looking to do with the principal components? If unnormalized PCA is OK for your application, you can instead run RowMatrix.computeSVD, and use the 'V' matrix, which can be used the same way as the principal components. The computeSVD method can handle square matrices, so it should be able to handle your matrix. Reza On Thu, Apr 23, 2015 at 4:11 PM, aleverentz andylevere...@fico.com wrote: [My apologies if this is a re-post. I wasn't subscribed the first time I sent this message, and I'm hoping this second message will get through.] I’ve been using Spark 1.3.0 and MLlib for some machine learning tasks. In a fit of blind optimism, I decided to try running MLlib’s Principal Components Analayis (PCA) on a dataset with approximately 10,000 columns and 200,000 rows. The Spark job has been running for about 5 hours on a small cluster, and it has been stuck on a particular job (treeAggregate at RowMatrix.scala:119) for most of that time. The treeAggregate job is now on retry 5, and after each failure it seems that the next retry uses a smaller number of tasks. (Initially, there were around 80 tasks; later it was down to 50, then 42; now it’s down to 16.) The web UI shows the following error under failed stages: org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 1. This raises a few questions: 1. What does missing an output location for shuffle 1 mean? I’m guessing this cryptic error message is indicative of some more fundamental problem (out of memory? out of disk space?), but I’m not sure how to diagnose it. 2. Why do subsequent retries use fewer and fewer tasks? Does this mean that the algorithm is actually making progress? Or is the scheduler just performing some kind of repartitioning and starting over from scratch? (Also, If the algorithm is in fact making progress, should I expect it to finish eventually? Or do repeated failures generally indicate that the cluster is too small to perform the given task?) 3. Is it reasonable to expect that I could get PCA to run on this dataset using the same cluster simply by changing some configuration parameters? Or is a larger cluster with significantly more resources per node the only way around this problem? 4. In general, are there any tips for diagnosing performance issues like the one above? I've spent some time trying to get a few different algorithms to scale to larger and larger datasets, and whenever I run into a failure, I'd like to be able to identify the bottleneck that is preventing further scaling. Any general advice for doing that kind of detective work would be much appreciated. Thanks, ~ Andrew -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Understanding-Spark-MLlib-failures-tp22641.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: why does groupByKey return RDD[(K, Iterable[V])] not RDD[(K, CompactBuffer[V])] ?
I wasn't involved in this decision (I just make the fries), but CompactBuffer is designed for relatively small data sets that at least fit in memory. It's more or less an Array. In principle, returning an iterator could hide the actual data structure that might be needed to hold a much bigger data set, if necessary. HOWEVER, it actually returns a CompactBuffer. https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L444 Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Thu, Apr 23, 2015 at 5:46 PM, Hao Ren inv...@gmail.com wrote: Should I repost this to dev list ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/why-does-groupByKey-return-RDD-K-Iterable-V-not-RDD-K-CompactBuffer-V-tp22616p22640.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: setting cost in linear SVM [Python]
If by C you mean the parameter C in LIBLINEAR, the corresponding parameter in MLlib is regParam: https://github.com/apache/spark/blob/master/python/pyspark/mllib/classification.py#L273, while regParam = 1/C. -Xiangrui On Wed, Apr 22, 2015 at 3:25 PM, Pagliari, Roberto rpagli...@appcomsci.com wrote: Is there a way to set the cost value C when using linear SVM? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Problem with using Spark ML
So I got the tip of trying to reduce step-size and that finally gave some more decent results, had hoped for the default params to give at least OK results and thought that the problem must be somewhere else in the code. Problem solved! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Problem-with-using-Spark-ML-tp22591p22628.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: StackOverflow Error when run ALS with 100 iterations
ALS.setCheckpointInterval was added in Spark 1.3.1. You need to upgrade Spark to use this feature. -Xiangrui On Wed, Apr 22, 2015 at 9:03 PM, amghost zhengweita...@outlook.com wrote: Hi, would you please how to checkpoint the training set rdd since all things are done in ALS.train method. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/StackOverflow-Error-when-run-ALS-with-100-iterations-tp4296p22619.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Distinct is very slow
Anyone any thought on this? On 22 April 2015 at 22:49, Jeetendra Gangele gangele...@gmail.com wrote: I made 7000 tasks in mapTopair and in distinct also I made same number of tasks. Still lots of shuffle read and write is happening due to application running for much longer time. Any idea? On 17 April 2015 at 11:55, Akhil Das ak...@sigmoidanalytics.com wrote: How many tasks are you seeing in your mapToPair stage? Is it 7000? then i suggest you giving a number similar/close to 7000 in your .distinct call, what is happening in your case is that, you are repartitioning your data to a smaller number (32) which would put a lot of load on processing i believe, you can try increasing it. Thanks Best Regards On Fri, Apr 17, 2015 at 1:48 AM, Jeetendra Gangele gangele...@gmail.com wrote: Akhil, any thought on this? On 16 April 2015 at 23:07, Jeetendra Gangele gangele...@gmail.com wrote: No I did not tried the partitioning below is the full code public static void matchAndMerge(JavaRDDVendorRecord matchRdd,JavaSparkContext jsc) throws IOException{ long start = System.currentTimeMillis(); JavaPairRDDLong, MatcherReleventData RddForMarch =matchRdd.zipWithIndex().mapToPair(new PairFunctionTuple2VendorRecord,Long, Long, MatcherReleventData() { @Override public Tuple2Long, MatcherReleventData call(Tuple2VendorRecord, Long t) throws Exception { MatcherReleventData matcherData = new MatcherReleventData(); Tuple2Long, MatcherReleventData tuple = new Tuple2Long, MatcherReleventData(t._2, matcherData.convertVendorDataToMatcherData(t._1)); return tuple; } }).cache(); log.info(after index+RddForMarch.take(1)); MapLong, MatcherReleventData tmp =RddForMarch.collectAsMap(); MapLong, MatcherReleventData matchData = new HashMapLong, MatcherReleventData(tmp); final BroadcastMapLong, MatcherReleventData dataMatchGlobal = jsc.broadcast(matchData); JavaPairRDDLong,String blockingRdd = RddForMarch.flatMapValues(new FunctionMatcherReleventData, IterableString(){ @Override public IterableString call(MatcherReleventData v1) throws Exception { ListString values = new ArrayListString(); HelperUtilities helper1 = new HelperUtilities(); MatcherKeys matchkeys=helper1.getBlockinkeys(v1); if(matchkeys.get_companyName() !=null){ values.add(matchkeys.get_companyName()); } if(matchkeys.get_phoneNumberr() !=null){ values.add(matchkeys.get_phoneNumberr()); } if(matchkeys.get_zipCode() !=null){ values.add(matchkeys.get_zipCode()); } if(matchkeys.getM_domain() !=null){ values.add(matchkeys.getM_domain()); } return values; } }); log.info(blocking RDD is+blockingRdd.count()); int count=0; log.info(Starting printing); for (Tuple2Long, String entry : blockingRdd.collect()) { log.info(entry._1() + : + entry._2()); count++; } log.info(total count+count); JavaPairRDDLong,Integer completeDataToprocess=blockingRdd.flatMapValues( new FunctionString, IterableInteger(){ @Override public IterableInteger call(String v1) throws Exception { return ckdao.getSingelkeyresult(v1); } }).distinct(32); log.info(after hbase count is+completeDataToprocess.count()); log.info(data for process+completeDataToprocess.take(1)); JavaPairRDDLong, Tuple2Integer, Double withScore =completeDataToprocess.mapToPair( new PairFunctionTuple2Long,Integer, Long, Tuple2Integer, Double(){ @Override public Tuple2Long, Tuple2Integer, Double call(Tuple2Long, Integer t) throws Exception { Scoring scoreObj = new Scoring(); double score =scoreObj.computeMatchScore(companyDAO.get(t._2()), dataMatchGlobal.getValue().get(t._1())); Tuple2Integer, Double maptuple = new Tuple2Integer, Double(t._2(), score); Tuple2Long, Tuple2Integer, Double tuple = new Tuple2Long, Tuple2Integer,Double(t._1(), maptuple); return tuple; } }); log.info(with score tuple is+withScore.take(1)); JavaPairRDDLong, Tuple2Integer,Double maxScoreRDD =withScore.reduceByKey( new Function2Tuple2Integer,Double, Tuple2Integer,Double, Tuple2Integer,Double(){ @Override public Tuple2Integer, Double call(Tuple2Integer, Double v1, Tuple2Integer, Double v2) throws Exception { int res =v1._2().compareTo(v2._2()); if(res 0){ Tuple2Integer, Double result = new Tuple2Integer, Double(v1._1(), v1._2()); return result; } else if(res0){ Tuple2Integer, Double result = new Tuple2Integer, Double(v2._1(), v2._2()); return result; } else{ Tuple2Integer, Double result = new Tuple2Integer, Double(v2._1(), v2._2()); return result; } } }); log.info(max score RDD+maxScoreRDD.take(10)); maxScoreRDD.foreach( new VoidFunctionTuple2Long,Tuple2Integer,Double(){ @Override public void call(Tuple2Long, Tuple2Integer, Double t) throws Exception { MatcherReleventData matchedData=dataMatchGlobal.getValue().get(t._1()); log.info(broadcast is+dataMatchGlobal.getValue().get(t._1())); //Set the score for better understanding of merge matchedData.setScore(t._2()._2());
RE: Error in creating spark RDD
Hi, SparkContext.newAPIHadoopRDD() is for working with new Hadoop mapreduce API. So, you should import import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat; Instead of import org.apache.accumulo.core.client.mapred.AccumuloInputFormat; -Original Message- From: madhvi [mailto:madhvi.gu...@orkash.com] Sent: Wednesday, April 22, 2015 5:13 PM To: user@spark.apache.org Subject: Error in creating spark RDD Hi, I am creating a spark RDD through accumulo writing like: JavaPairRDDKey, Value accumuloRDD = sc.newAPIHadoopRDD(accumuloJob.getConfiguration(),AccumuloInputFormat.class,Key.class, Value.class); But I am getting the following error and it is not getting compiled: Bound mismatch: The generic method newAPIHadoopRDD(Configuration, ClassF, ClassK, ClassV) of type JavaSparkContext is not applicable for the arguments (Configuration, ClassAccumuloInputFormat, ClassKey, ClassValue). The inferred type AccumuloInputFormat is not a valid substitute for the bounded parameter F extends InputFormatK,V I am using the following import statements: import org.apache.accumulo.core.client.mapred.AccumuloInputFormat; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; I am not getting what is the problem in this. Thanks Madhvi - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark RDD Lifecycle: whether RDD will be reclaimed out of scope
Hi, Yes, Spark automatically removes old RDDs from the cache when you make new ones. Unpersist forces it to remove them right away. On Thu, Apr 23, 2015 at 9:28 AM, Jeffery [via Apache Spark User List] ml-node+s1001560n22618...@n3.nabble.com wrote: Hi, Dear Spark Users/Devs: In a method, I create a new RDD, and cache it, whether Spark will unpersit the RDD automatically after the rdd is out of scope? I am thinking so, but want to make sure with you, the experts :) Thanks, Jeffery Yuan -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-RDD-Lifecycle-whether-RDD-will-be-reclaimed-out-of-scope-tp22618.html To start a new topic under Apache Spark User List, email ml-node+s1001560n1...@n3.nabble.com To unsubscribe from Apache Spark User List, click here http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=1code=cHJhbm5veUBzaWdtb2lkYW5hbHl0aWNzLmNvbXwxfC0xNTI2NTg4NjQ2 . NAML http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-RDD-Lifecycle-whether-RDD-will-be-reclaimed-out-of-scope-tp22618p22625.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Pipeline in pyspark
Hi, I have come across ways of building pipeline of input/transform and output pipelines with Java (Google Dataflow/Spark etc). I also understand that Spark itelf provides ways for creating a pipeline within mlib for MLtransforms (primarily fit) Both of the above are available in Java/Scala environment and the later being supported on Python as well. However, if my understanding is correct, pipelines within mltransforms donot create a complete dataflow transform for non-ml scenarios (ex. io transforms, dataframe/graph transforms). Correct me if otherwise. I would like to know, what is the best way to create spark dataflow pipeline in a generic way. I have a use case where I have my input files in different formats and would like to convert them to rdd and further build the dataframe transforms and stream/store them finally. I hope not to do Disk I/Os between pipeline tasks. I also came across luigi(http://luigi.readthedocs.org/en/latest/) on Python, but I found that it stores the contents onto disc and reloads it for the next phase of the pipeline. Appreciate if you can share your thoughts. -- Regards, Suraj
Re: IOUtils cannot write anything in Spark?
It seems like saveAsTextFile might do what you are looking for. On Wednesday, April 22, 2015, Xi Shen davidshe...@gmail.com wrote: Hi, I have a RDD of some processed data. I want to write these files to HDFS, but not for future M/R processing. I want to write plain old style text file. I tried: rdd foreach {d = val file = // create the file using a HDFS FileSystem val lines = d map { // format data into string } IOUtils.writeLines(lines, System.separator(), file) } Note, I was using the IOUtils from common-io, not from Hadoop package. The results are all file are created in myHDFS, but has no data at all... [image: --] Xi Shen [image: http://]about.me/davidshen http://about.me/davidshen?promo=email_sig http://about.me/davidshen -- Cell : 425-233-8271
IOUtils cannot write anything in Spark?
Hi, I have a RDD of some processed data. I want to write these files to HDFS, but not for future M/R processing. I want to write plain old style text file. I tried: rdd foreach {d = val file = // create the file using a HDFS FileSystem val lines = d map { // format data into string } IOUtils.writeLines(lines, System.separator(), file) } Note, I was using the IOUtils from common-io, not from Hadoop package. The results are all file are created in myHDFS, but has no data at all... [image: --] Xi Shen [image: http://]about.me/davidshen http://about.me/davidshen?promo=email_sig http://about.me/davidshen
Re: StandardScaler failing with OOM errors in PySpark
the feature dimension is 800k. yes, I believe the driver memory is likely the problem since it doesn't crash until the very last part of the tree aggregation. I'm running it via pyspark through YARN -- I have to run in client mode so I can't set spark.driver.memory -- I've tried setting the spark.yarn.am.memory and overhead parameters but it doesn't seem to have an effect. Thanks, Rok On Apr 23, 2015, at 7:47 AM, Xiangrui Meng men...@gmail.com wrote: What is the feature dimension? Did you set the driver memory? -Xiangrui On Tue, Apr 21, 2015 at 6:59 AM, rok rokros...@gmail.com wrote: I'm trying to use the StandardScaler in pyspark on a relatively small (a few hundred Mb) dataset of sparse vectors with 800k features. The fit method of StandardScaler crashes with Java heap space or Direct buffer memory errors. There should be plenty of memory around -- 10 executors with 2 cores each and 8 Gb per core. I'm giving the executors 9g of memory and have also tried lots of overhead (3g), thinking it might be the array creation in the aggregators that's causing issues. The bizarre thing is that this isn't always reproducible -- sometimes it actually works without problems. Should I be setting up executors differently? Thanks, Rok -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/StandardScaler-failing-with-OOM-errors-in-PySpark-tp22593.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
spark yarn-cluster job failing in batch processing
Hi All, I am trying to execute batch processing in yarn-cluster mode i.e. I have many sql insert queries,based on argument provided it will it will fetch the queries ,create context , schema RDD and insert in hive tables, Please Note- in standalone mode its working and in cluster mode working is I configured one query,also I have configured yarn.nodemanager.delete.debug-sec = 600 I am using below command- spark-submit --jars ./analiticlibs/utils-common-1.0.0.jar,./analiticlibs/mysql-connector-java-5.1.17.jar,./analiticlibs/log4j-1.2.17.jar --files datasource.properties,log4j.properties,hive-site.xml --deploy-mode cluster --master yarn --num-executors 1 --driver-memory 2g --driver-java-options -XX:MaxPermSize=1G --executor-memory 1g --executor-cores 1 --class com.java.analitics.jobs.StandaloneAggregationJob sparkanalitics-1.0.0.jar daily_agg 2015-04-21 Exception from Container log- Exception in thread Driver java.lang.ArrayIndexOutOfBoundsException: 2 at com.java.analitics.jobs.StandaloneAggregationJob.main(StandaloneAggregationJob.java:62) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:427) exception in our exception log file- diagnostics: Application application_1429800386537_0001 failed 2 times due to AM Container for appattempt_1429800386537_0001_02 exited with exitCode: 15 due to: Exception from container-launch. Container id: container_1429800386537_0001_02_01 Exit code: 15 Stack trace: ExitCodeException exitCode=15: at org.apache.hadoop.util.Shell.runCommand(Shell.java:538) at org.apache.hadoop.util.Shell.run(Shell.java:455) at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:702) at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:197) at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:299) at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:81) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Container exited with a non-zero exit code 15 .Failing this attempt.. Failing the application. ApplicationMaster host: N/A ApplicationMaster RPC port: -1 queue: root.hdfs start time: 1429800525569 final status: FAILED tracking URL: http://tejas.alcatel.com:8088/cluster/app/application_1429800386537_0001 user: hdfs 2015-04-23 20:19:27 DEBUG Client - stopping client from cache: org.apache.hadoop.ipc.Client@12f5f40b 2015-04-23 20:19:27 DEBUG Utils - Shutdown hook called need urgent support, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-yarn-cluster-job-failing-in-batch-processing-tp22626.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Is there a way to get the list of all jobs?
Hello, I am currently trying to monitor the progression of jobs. I created a class extending SparkListener, added a jobProgressListener to my sparkContext, and overrided the methods OnTaskStart, OnTaskEnd, OnJobStart and OnJobEnd, which leads to good results. Then, I would also like to monitor the progression of one job in comparison to the global progression of all jobs. I guess this is not directly possible, so I would like to retrieve the list of all jobs (or at least, the number of jobs), so that I can approximate the global progression by dividing the progression of one job by the total number of jobs. However, I do not find how to do this. I searched through the JobProgressListener API, but I only found methods to get the list of active jobs, or the list of already completed jobs. Is there a way to get the number or the list of jobs in the current version of Spark ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Is-there-a-way-to-get-the-list-of-all-jobs-tp22635.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Pipeline in pyspark
I do not think you can share data across spark contexts. So as long as you can pass it around you should be good. On 23 Apr 2015 17:12, Suraj Shetiya surajshet...@gmail.com wrote: Hi, I have come across ways of building pipeline of input/transform and output pipelines with Java (Google Dataflow/Spark etc). I also understand that Spark itelf provides ways for creating a pipeline within mlib for MLtransforms (primarily fit) Both of the above are available in Java/Scala environment and the later being supported on Python as well. However, if my understanding is correct, pipelines within mltransforms donot create a complete dataflow transform for non-ml scenarios (ex. io transforms, dataframe/graph transforms). Correct me if otherwise. I would like to know, what is the best way to create spark dataflow pipeline in a generic way. I have a use case where I have my input files in different formats and would like to convert them to rdd and further build the dataframe transforms and stream/store them finally. I hope not to do Disk I/Os between pipeline tasks. I also came across luigi(http://luigi.readthedocs.org/en/latest/) on Python, but I found that it stores the contents onto disc and reloads it for the next phase of the pipeline. Appreciate if you can share your thoughts. -- Regards, Suraj
Re: Hive table creation - possible bug in Spark 1.3?
Hi, Hive table creation need an extra step from 1.3. You can follow the following template df.registerTempTable(tableName) hc.sql(screate table $tableName as select * from $tableName) this will save the table in hive with given tableName. Regards, Madhukara Phatak http://datamantra.io/ On Thu, Apr 23, 2015 at 4:00 AM, Michael Armbrust mich...@databricks.com wrote: Sorry for the confusion. We should be more clear about the semantics in the documentation. (PRs welcome :) ) .saveAsTable does not create a hive table, but instead creates a Spark Data Source table. Here the metadata is persisted into Hive, but hive cannot read the tables (as this API support MLlib vectors, schema discovery, and other things that hive does not). If you want to create a hive table, use HiveQL and run a CREATE TABLE AS SELECT ... On Wed, Apr 22, 2015 at 12:50 AM, Ophir Cohen oph...@gmail.com wrote: I wrote few mails here regarding this issue. After further investigation I think there is a bug in Spark 1.3 in saving Hive tables. (hc is HiveContext) 1. Verify the needed configuration exists: scala hc.sql(set hive.exec.compress.output).collect res4: Array[org.apache.spark.sql.Row] = Array([hive.exec.compress.output=true]) scala hc.sql(set mapreduce.output.fileoutputformat.compress.codec).collect res5: Array[org.apache.spark.sql.Row] = Array([mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.SnappyCodec]) scala hc.sql(set mapreduce.output.fileoutputformat.compress.type).collect res6: Array[org.apache.spark.sql.Row] = Array([mapreduce.output.fileoutputformat.compress.type=BLOCK]) 2. Loading DataFrame and save as table (path point to exists file): val saDF = hc.parquetFile(path) saDF.count (count yield 229764 - i.e. the rdd exists) saDF.saveAsTable(test_hive_ms) Now for few interesting outputs: 1. Trying to query Hive CLI, the table exists but with wrong output format: Failed with exception java.io.IOException:java.io.IOException: hdfs:// 10.166.157.97:9000/user/hive/warehouse/test_hive_ms/part-r-1.parquet not a SequenceFile 2. Looking at the output files found that files are '.parquet' and not '.snappy' 3. Looking at the saveAsTable output shows that it actually store the table in both, wrong output format and without compression: 15/04/22 07:16:54 INFO metastore.HiveMetaStore: 0: create_table: Table(tableName:test_hive_ms, dbName:default, owner:hadoop, createTime:1429687014, lastAccessTime:0, retention:0, sd:StorageDescriptor(cols:[FieldSchema(name:col, type:arraystring, comment:from deserializer)], location:null, inputFormat:org.apache.hadoop.mapred.SequenceFileInputFormat, outputFormat:org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat, compressed:false, numBuckets:-1, serdeInfo:SerDeInfo(name:null, serializationLib:org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe, parameters:{serialization.format=1, path=hdfs:// 10.166.157.97:9000/user/hive/warehouse/test_hive_ms} http://10.166.157.97:9000/user/hive/warehouse/test_hive_ms%7D), bucketCols:[], sortCols:[], parameters:{}, skewedInfo:SkewedInfo(skewedColNames:[], skewedColValues:[], skewedColValueLocationMaps:{})), partitionKeys:[], parameters:{spark.sql.sources.schema.part.0={type:struct,fields:[{name:ADJDATE,type:long,nullable:true,metadata:{}},{name:sid,type:integer,nullable:true,metadata:{}},{name:ADJTYPE,type:integer,nullable:true,metadata:{}},{name:ENDADJDATE,type:long,nullable:true,metadata:{}},{name:ADJFACTOR,type:double,nullable:true,metadata:{}},{name:CUMADJFACTOR,type:double,nullable:true,metadata:{}}]}, EXTERNAL=FALSE, spark.sql.sources.schema.numParts=1, spark.sql.sources.provider=org.apache.spark.sql.parquet}, viewOriginalText:null, viewExpandedText:null, tableType:MANAGED_TABLE) So, the question is: do I miss some configuration here or should I open a bug? Thanks, Ophir
Re: Spark SQL performance issue.
Quick questions: why are you cache both rdd and table? Which stage of job is slow? On 23 Apr 2015 17:12, Nikolay Tikhonov tikhonovnico...@gmail.com wrote: Hi, I have Spark SQL performance issue. My code contains a simple JavaBean: public class Person implements Externalizable { private int id; private String name; private double salary; } Apply a schema to an RDD and register table. JavaRDDPerson rdds = ... rdds.cache(); DataFrame dataFrame = sqlContext.createDataFrame(rdds, Person.class); dataFrame.registerTempTable(person); sqlContext.cacheTable(person); Run sql query. sqlContext.sql(SELECT id, name, salary FROM person WHERE salary = YYY AND salary = XXX).collectAsList() I launch standalone cluster which contains 4 workers. Each node runs on machine with 8 CPU and 15 Gb memory. When I run the query on the environment over RDD which contains 1 million persons it takes 1 minute. Somebody can tell me how to tuning the performance? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-performance-issue-tp22627.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark SQL performance issue.
why are you cache both rdd and table? I try to cache all the data to avoid the bad performance for the first query. Is it right? Which stage of job is slow? The query is run many times on one sqlContext and each query execution takes 1 second. 2015-04-23 11:33 GMT+03:00 ayan guha guha.a...@gmail.com: Quick questions: why are you cache both rdd and table? Which stage of job is slow? On 23 Apr 2015 17:12, Nikolay Tikhonov tikhonovnico...@gmail.com wrote: Hi, I have Spark SQL performance issue. My code contains a simple JavaBean: public class Person implements Externalizable { private int id; private String name; private double salary; } Apply a schema to an RDD and register table. JavaRDDPerson rdds = ... rdds.cache(); DataFrame dataFrame = sqlContext.createDataFrame(rdds, Person.class); dataFrame.registerTempTable(person); sqlContext.cacheTable(person); Run sql query. sqlContext.sql(SELECT id, name, salary FROM person WHERE salary = YYY AND salary = XXX).collectAsList() I launch standalone cluster which contains 4 workers. Each node runs on machine with 8 CPU and 15 Gb memory. When I run the query on the environment over RDD which contains 1 million persons it takes 1 minute. Somebody can tell me how to tuning the performance? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-performance-issue-tp22627.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
A Spark Group by is running forever
I have a groupBy query after a map-side join leftOuterJoin. And this query is running for more than 2 hours. asks IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch TimeDurationGC TimeShuffle Read Size / RecordsWrite TimeShuffle Write Size / RecordsErrors 0 36 0 RUNNING PROCESS_LOCAL 17 / phxaishdc9dn1560.stratus.phx.ebay.com 2015/04/22 23:27:00 1.4 h 29 s 61.8 MB / 63144909 0.0 B / 0 The input looks to be only 60 MB. *Command* ./bin/spark-submit -v --master yarn-cluster --driver-class-path /apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/yarn/lib/guava-11.0.2.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar --jars /apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/hdfs/hadoop-hdfs-2.4.1-EBAY-2.jar,/home/dvasthimal/spark1.3/spark_reporting_dep_only-1.0-SNAPSHOT.jar *--num-executors 36 --driver-memory 12g --driver-java-options -XX:MaxPermSize=8G --executor-memory 12g* *--executor-cores 6* --queue hdmi-express --class com.ebay.ep.poc.spark.reporting.SparkApp /home/dvasthimal/spark1.3/spark_reporting-1.0-SNAPSHOT.jar startDate=2015-04-6 endDate=2015-04-7 input=/user/dvasthimal/epdatasets_small/exptsession subcommand=viewItem output=/user/dvasthimal/epdatasets/viewItem buffersize=128 maxbuffersize=1068 maxResultSize=2G Queries 1. val viEvents = details.map { vi = (vi.get(14).asInstanceOf[Long], vi) } 2. Brodcast Map - Join val lstgItemMap = listings.map { lstg = (lstg.getItemId().toLong, lstg) } .collectAsMapval broadCastMap = sc.broadcast(lstgItemMap) val viEventsWithListings: RDD[(Long, (DetailInputRecord, VISummary, Long))] = viEvents.mapPartitions({ // buisness logic )} 3. Left Outer val spsLevelMetricSum = DataUtil.getSpsLevelMetricSum(sc, startDate) val spsLvlMetric = spsLevelMetricSum.map { sps = (sps.getUserId.toLong, sps) } val viEventsWithListingsJoinSpsLevelMetric = viEventsWithListings .leftOuterJoin(spsLvlMetric).map { // buisness logic } Any thoughts ? 4. Group BY : val sellerSegments = viEventsWithListingsJoinSpsLevelMetric.groupBy { case (viDetail, vi, itemId) = (viDetail.get(0), viDetail.get(1).asInstanceOf[Long], viDetail.get(2), viDetail.get(8).asInstanceOf[Int]) } #4 is very slow. -- Deepak
Re: Custom paritioning of DSTream
Hello Evo, Ranjitiyer, I am also looking for the same thing. Using foreach is not useful for me as processing the RDD as a whole won't be distributed across workers and that would kill performance in my application :-/ Let me know if you find a solution for this. Regards -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Custom-paritioning-of-DSTream-tp22574p22630.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Custom paritioning of DSTream
You can use transform which yields RDDs from the DStream as on each of the RDDs you can then apply partitionBy - transform also returns another DSTream while foreach doesn't Btw what do you mean re foreach killing the performance by not distributing the workload - every function (provided it is not Action) applied to an RDD within foreach is distributed across the cluster since it gets applied to an RDD From: davidkl [via Apache Spark User List] [mailto:ml-node+s1001560n22630...@n3.nabble.com] Sent: Thursday, April 23, 2015 10:13 AM To: Evo Eftimov Subject: Re: Custom paritioning of DSTream Hello Evo, Ranjitiyer, I am also looking for the same thing. Using foreach is not useful for me as processing the RDD as a whole won't be distributed across workers and that would kill performance in my application :-/ Let me know if you find a solution for this. Regards _ If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Custom-paritioning-of-DS Tream-tp22574p22630.html To unsubscribe from Custom paritioning of DSTream, click here http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jt p?macro=unsubscribe_by_codenode=22574code=ZXZvLmVmdGltb3ZAaXNlY2MuY29tfDIy NTc0fDY0MDQ0NDg5Ng== . http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jt p?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml. namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.vi ew.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemai l.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aem ail.naml NAML -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Custom-paritioning-of-DSTream-tp22574p22631.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Hive table creation - possible bug in Spark 1.3?
Hi Michael, Here https://issues.apache.org/jira/browse/SPARK-7084 is the jira issue and PR https://github.com/apache/spark/pull/5654 for the same. Please have a look. Regards, Madhukara Phatak http://datamantra.io/ On Thu, Apr 23, 2015 at 1:22 PM, madhu phatak phatak@gmail.com wrote: Hi, Hive table creation need an extra step from 1.3. You can follow the following template df.registerTempTable(tableName) hc.sql(screate table $tableName as select * from $tableName) this will save the table in hive with given tableName. Regards, Madhukara Phatak http://datamantra.io/ On Thu, Apr 23, 2015 at 4:00 AM, Michael Armbrust mich...@databricks.com wrote: Sorry for the confusion. We should be more clear about the semantics in the documentation. (PRs welcome :) ) .saveAsTable does not create a hive table, but instead creates a Spark Data Source table. Here the metadata is persisted into Hive, but hive cannot read the tables (as this API support MLlib vectors, schema discovery, and other things that hive does not). If you want to create a hive table, use HiveQL and run a CREATE TABLE AS SELECT ... On Wed, Apr 22, 2015 at 12:50 AM, Ophir Cohen oph...@gmail.com wrote: I wrote few mails here regarding this issue. After further investigation I think there is a bug in Spark 1.3 in saving Hive tables. (hc is HiveContext) 1. Verify the needed configuration exists: scala hc.sql(set hive.exec.compress.output).collect res4: Array[org.apache.spark.sql.Row] = Array([hive.exec.compress.output=true]) scala hc.sql(set mapreduce.output.fileoutputformat.compress.codec).collect res5: Array[org.apache.spark.sql.Row] = Array([mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.SnappyCodec]) scala hc.sql(set mapreduce.output.fileoutputformat.compress.type).collect res6: Array[org.apache.spark.sql.Row] = Array([mapreduce.output.fileoutputformat.compress.type=BLOCK]) 2. Loading DataFrame and save as table (path point to exists file): val saDF = hc.parquetFile(path) saDF.count (count yield 229764 - i.e. the rdd exists) saDF.saveAsTable(test_hive_ms) Now for few interesting outputs: 1. Trying to query Hive CLI, the table exists but with wrong output format: Failed with exception java.io.IOException:java.io.IOException: hdfs:// 10.166.157.97:9000/user/hive/warehouse/test_hive_ms/part-r-1.parquet not a SequenceFile 2. Looking at the output files found that files are '.parquet' and not '.snappy' 3. Looking at the saveAsTable output shows that it actually store the table in both, wrong output format and without compression: 15/04/22 07:16:54 INFO metastore.HiveMetaStore: 0: create_table: Table(tableName:test_hive_ms, dbName:default, owner:hadoop, createTime:1429687014, lastAccessTime:0, retention:0, sd:StorageDescriptor(cols:[FieldSchema(name:col, type:arraystring, comment:from deserializer)], location:null, inputFormat:org.apache.hadoop.mapred.SequenceFileInputFormat, outputFormat:org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat, compressed:false, numBuckets:-1, serdeInfo:SerDeInfo(name:null, serializationLib:org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe, parameters:{serialization.format=1, path=hdfs:// 10.166.157.97:9000/user/hive/warehouse/test_hive_ms} http://10.166.157.97:9000/user/hive/warehouse/test_hive_ms%7D), bucketCols:[], sortCols:[], parameters:{}, skewedInfo:SkewedInfo(skewedColNames:[], skewedColValues:[], skewedColValueLocationMaps:{})), partitionKeys:[], parameters:{spark.sql.sources.schema.part.0={type:struct,fields:[{name:ADJDATE,type:long,nullable:true,metadata:{}},{name:sid,type:integer,nullable:true,metadata:{}},{name:ADJTYPE,type:integer,nullable:true,metadata:{}},{name:ENDADJDATE,type:long,nullable:true,metadata:{}},{name:ADJFACTOR,type:double,nullable:true,metadata:{}},{name:CUMADJFACTOR,type:double,nullable:true,metadata:{}}]}, EXTERNAL=FALSE, spark.sql.sources.schema.numParts=1, spark.sql.sources.provider=org.apache.spark.sql.parquet}, viewOriginalText:null, viewExpandedText:null, tableType:MANAGED_TABLE) So, the question is: do I miss some configuration here or should I open a bug? Thanks, Ophir
Re: Convert DStream to DataFrame
Thank you ver much, Tathagata! El miércoles, 22 de abril de 2015, Tathagata Das t...@databricks.com escribió: Aaah, that. That is probably a limitation of the SQLContext (cc'ing Yin for more information). On Wed, Apr 22, 2015 at 7:07 AM, Sergio Jiménez Barrio drarse.a...@gmail.com javascript:_e(%7B%7D,'cvml','drarse.a...@gmail.com'); wrote: Sorry, this is the error: [error] /home/sergio/Escritorio/hello/streaming.scala:77: Implementation restriction: case classes cannot have more than 22 parameters. 2015-04-22 16:06 GMT+02:00 Sergio Jiménez Barrio drarse.a...@gmail.com javascript:_e(%7B%7D,'cvml','drarse.a...@gmail.com');: I tried the solution of the guide, but I exceded the size of case class Row: 2015-04-22 15:22 GMT+02:00 Tathagata Das tathagata.das1...@gmail.com javascript:_e(%7B%7D,'cvml','tathagata.das1...@gmail.com');: Did you checkout the latest streaming programming guide? http://spark.apache.org/docs/latest/streaming-programming-guide.html#dataframe-and-sql-operations You also need to be aware of that to convert json RDDs to dataframe, sqlContext has to make a pass on the data to learn the schema. This will fail if a batch has no data. You have to safeguard against that. On Wed, Apr 22, 2015 at 6:19 AM, ayan guha guha.a...@gmail.com javascript:_e(%7B%7D,'cvml','guha.a...@gmail.com'); wrote: What about sqlcontext.createDataframe(rdd)? On 22 Apr 2015 23:04, Sergio Jiménez Barrio drarse.a...@gmail.com javascript:_e(%7B%7D,'cvml','drarse.a...@gmail.com'); wrote: Hi, I am using Kafka with Apache Stream to send JSON to Apache Spark: val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet) Now, I want parse the DStream created to DataFrame, but I don't know if Spark 1.3 have some easy way for this. ¿Any suggestion? I can get the message with: val lines = messages.map(_._2) Thank u for all. Sergio J. -- Atte. Sergio Jiménez
Re: Spark SQL performance issue.
Hi Can you share your Web UI, depicting your task level breakup.I can see many thing s that can be improved. 1. JavaRDDPerson rdds = ...rdds.cache(); -this caching is not needed as you are not reading the rdd for any action 2.Instead of collecting as list, if you can save as text file, it would be better. As it would avoid moving results to the driver. Thanks Arush On Thu, Apr 23, 2015 at 2:47 PM, Nikolay Tikhonov tikhonovnico...@gmail.com wrote: why are you cache both rdd and table? I try to cache all the data to avoid the bad performance for the first query. Is it right? Which stage of job is slow? The query is run many times on one sqlContext and each query execution takes 1 second. 2015-04-23 11:33 GMT+03:00 ayan guha guha.a...@gmail.com: Quick questions: why are you cache both rdd and table? Which stage of job is slow? On 23 Apr 2015 17:12, Nikolay Tikhonov tikhonovnico...@gmail.com wrote: Hi, I have Spark SQL performance issue. My code contains a simple JavaBean: public class Person implements Externalizable { private int id; private String name; private double salary; } Apply a schema to an RDD and register table. JavaRDDPerson rdds = ... rdds.cache(); DataFrame dataFrame = sqlContext.createDataFrame(rdds, Person.class); dataFrame.registerTempTable(person); sqlContext.cacheTable(person); Run sql query. sqlContext.sql(SELECT id, name, salary FROM person WHERE salary = YYY AND salary = XXX).collectAsList() I launch standalone cluster which contains 4 workers. Each node runs on machine with 8 CPU and 15 Gb memory. When I run the query on the environment over RDD which contains 1 million persons it takes 1 minute. Somebody can tell me how to tuning the performance? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-performance-issue-tp22627.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com *Arush Kharbanda* || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com
Re: Building Spark : Adding new DataType in Catalyst
I've already tried UDT in Spark 1.2 and 1.3 but I encountered Kryo Serialization Exception on Joining as tracked here https://datastax-oss.atlassian.net/browse/SPARKC-23 , i've talked to Michael Armbrust https://plus.google.com/u/1/109154927192908362223/posts about the Exception, he said I'll caution you that this is not a stable public API. So I moved to adding custom dataType into spark. I've got the answer of my this question from Iulian Dragoș https://plus.google.com/u/1/114220203404330592389/poststhat One way is to use export SPARK_PREPEND_CLASSES=true. This will instruct the launcher to prepend the target directories for each project to the spark assembly, this solved my problem. Thanks... -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Building-Spark-Adding-new-DataType-in-Catalyst-tp22604p22632.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: problem with spark thrift server
Hi What do you mean disable the driver? what are you trying to achieve. Thanks Arush On Thu, Apr 23, 2015 at 12:29 PM, guoqing0...@yahoo.com.hk guoqing0...@yahoo.com.hk wrote: Hi , I have a question about spark thrift server , i deployed the spark on yarn and found if the spark driver disable , the spark application will be crashed on yarn. appreciate for any suggestions and idea . Thank you! -- [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com *Arush Kharbanda* || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com
Re: Spark Streaming updatyeStateByKey throws OutOfMemory Error
HI TD, Some observations: 1. If I submit the application using spark-submit tool with *client as deploy mode* it works fine with single master and worker (driver, master and worker are running in same machine) 2. If I submit the application using spark-submit tool with client as deploy mode it *crashes after some time with StackOverflowError* *single master and 2 workers* (driver, master and 1 worker is running in same machine, other worker is in different machine) *15/04/23 05:42:04 Executor: Exception in task 0.0 in stage 23153.0 (TID 5412)* *java.lang.StackOverflowError* *at java.io.ObjectInputStream$BlockDataInputStream.readUTF(ObjectInputStream.java:2864)* *at java.io.ObjectInputStream.readUTF(ObjectInputStream.java:1072)* *at java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:671)* *at java.io.ObjectInputStream.readClassDescriptor(ObjectInputStream.java:830)* *at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1601)* *at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)* *at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)* *at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)* *at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)* *at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)* *at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)* *at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)* *at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)* *at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)* *at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)* *at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)* *at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)* *at scala.collection.immutable.$colon$colon.readObject(List.scala:362)* *at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)* *at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)* *at java.lang.reflect.Method.invoke(Method.java:606)* *at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)* *at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)* *at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)* *at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)* *at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)* *at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)* *at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)* *at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)* *at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)* *at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)* *at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)* *at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)* *at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)* *at scala.collection.immutable.$colon$colon.readObject(List.scala:362)* *at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)* *at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)* *at java.lang.reflect.Method.invoke(Method.java:606)* *at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)* *at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)* *at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)* *at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)* *at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)* *at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)* *at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)* *at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)* *at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)* *at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)* *at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)* *at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)* *at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)* *at scala.collection.immutable.$colon$colon.readObject(List.scala:366)* *at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)* 3. If I submit the
Re: Trouble working with Spark-CSV package (error: object databricks is not a member of package com)
Do you have commons-csv-1.1-bin.jar in your path somewhere ? I had to download and add this. Cheers k/ On Wed, Apr 22, 2015 at 11:01 AM, Mohammed Omer beancinemat...@gmail.com wrote: Afternoon all, I'm working with Scala 2.11.6, and Spark 1.3.1 built from source via: `mvn -Pyarn -Phadoop-2.4 -Dscala-2.11 -DskipTests clean package` The error is encountered when running spark shell via: `spark-shell --packages com.databricks:spark-csv_2.11:1.0.3` The full trace of the commands can be found at https://gist.github.com/momer/9d1ca583f9978ec9739d Not sure if I've done something wrong, or if the documentation is outdated, or...? Would appreciate any input or push in the right direction! Thank you, Mo
Re: Error in creating spark RDD
On Thursday 23 April 2015 12:22 PM, Akhil Das wrote: Here's a complete scala example https://github.com/bbux-proteus/spark-accumulo-examples/blob/1dace96a115f29c44325903195c8135edf828c86/src/main/scala/org/bbux/spark/AccumuloMetadataCount.scala Thanks Best Regards On Thu, Apr 23, 2015 at 12:19 PM, Akhil Das ak...@sigmoidanalytics.com mailto:ak...@sigmoidanalytics.com wrote: Change your import from mapred to mapreduce. like : import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat; Thanks Best Regards On Wed, Apr 22, 2015 at 2:42 PM, madhvi madhvi.gu...@orkash.com mailto:madhvi.gu...@orkash.com wrote: Hi, I am creating a spark RDD through accumulo writing like: JavaPairRDDKey, Value accumuloRDD = sc.newAPIHadoopRDD(accumuloJob.getConfiguration(),AccumuloInputFormat.class,Key.class, Value.class); But I am getting the following error and it is not getting compiled: Bound mismatch: The generic method newAPIHadoopRDD(Configuration, ClassF, ClassK, ClassV) of type JavaSparkContext is not applicable for the arguments (Configuration, ClassAccumuloInputFormat, ClassKey, ClassValue). The inferred type AccumuloInputFormat is not a valid substitute for the bounded parameter F extends InputFormatK,V I am using the following import statements: import org.apache.accumulo.core.client.mapred.AccumuloInputFormat; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; I am not getting what is the problem in this. Thanks Madhvi - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org mailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org mailto:user-h...@spark.apache.org Hi, Thanks.I got that solved:) madhvi
Re: Re: HiveContext setConf seems not stable
Hi all , My understanding for this problem is SQLConf will be overwrite by the hiveconfig in initialization phase when setConf(key: String, value: String) being called in the first time as below code snippets , so it is correctly in later. I`m not sure whether it is right , any point are welcome. Thanks. @transient protected[hive] lazy val hiveconf: HiveConf = { setConf(sessionState.getConf.getAllProperties) sessionState.getConf } protected def runHive(cmd: String, maxRows: Int = 1000): Seq[String] = synchronized { try { val cmd_trimmed: String = cmd.trim() val tokens: Array[String] = cmd_trimmed.split(\\s+) val cmd_1: String = cmd_trimmed.substring(tokens(0).length()).trim() val proc: CommandProcessor = HiveShim.getCommandProcessor(Array(tokens(0)), hiveconf)...}protected[sql] def runSqlHive(sql: String): Seq[String] = { val maxResults = 10 val results = runHive(sql, maxResults) // It is very confusing when you only get back some of the results... if (results.size == maxResults) sys.error(RESULTS POSSIBLY TRUNCATED) results }override def setConf(key: String, value: String): Unit = { super.setConf(key, value) runSqlHive(sSET $key=$value) } From: madhu phatak Date: 2015-04-23 02:17 To: Michael Armbrust CC: Ophir Cohen; Hao Ren; user Subject: Re: HiveContext setConf seems not stable Hi, calling getConf don't solve the issue. Even many hive specific queries are broken. Seems like no hive configurations are getting passed properly. Regards, Madhukara Phatak http://datamantra.io/ On Wed, Apr 22, 2015 at 2:19 AM, Michael Armbrust mich...@databricks.com wrote: As a workaround, can you call getConf first before any setConf? On Tue, Apr 21, 2015 at 1:58 AM, Ophir Cohen oph...@gmail.com wrote: I think I encounter the same problem, I'm trying to turn on the compression of Hive. I have the following lines: def initHiveContext(sc: SparkContext): HiveContext = { val hc: HiveContext = new HiveContext(sc) hc.setConf(hive.exec.compress.output, true) hc.setConf(mapreduce.output.fileoutputformat.compress.codec, org.apache.hadoop.io.compress.SnappyCodec) hc.setConf(mapreduce.output.fileoutputformat.compress.type, BLOCK) logger.info(hc.getConf(hive.exec.compress.output)) logger.info(hc.getConf(mapreduce.output.fileoutputformat.compress.codec)) logger.info(hc.getConf(mapreduce.output.fileoutputformat.compress.type)) hc } And the log for calling it twice: 15/04/21 08:37:39 INFO util.SchemaRDDUtils$: false 15/04/21 08:37:39 INFO util.SchemaRDDUtils$: org.apache.hadoop.io.compress.SnappyCodec 15/04/21 08:37:39 INFO util.SchemaRDDUtils$: BLOCK 15/04/21 08:37:39 INFO util.SchemaRDDUtils$: true 15/04/21 08:37:39 INFO util.SchemaRDDUtils$: org.apache.hadoop.io.compress.SnappyCodec 15/04/21 08:37:39 INFO util.SchemaRDDUtils$: BLOCK BTW It worked on 1.2.1... On Thu, Apr 2, 2015 at 11:47 AM, Hao Ren inv...@gmail.com wrote: Hi, Jira created: https://issues.apache.org/jira/browse/SPARK-6675 Thank you. On Wed, Apr 1, 2015 at 7:50 PM, Michael Armbrust mich...@databricks.com wrote: Can you open a JIRA please? On Wed, Apr 1, 2015 at 9:38 AM, Hao Ren inv...@gmail.com wrote: Hi, I find HiveContext.setConf does not work correctly. Here are some code snippets showing the problem: snippet 1: import org.apache.spark.sql.hive.HiveContext import org.apache.spark.{SparkConf, SparkContext} object Main extends App { val conf = new SparkConf() .setAppName(context-test) .setMaster(local[8]) val sc = new SparkContext(conf) val hc = new HiveContext(sc) hc.setConf(spark.sql.shuffle.partitions, 10) hc.setConf(hive.metastore.warehouse.dir, /home/spark/hive/warehouse_test) hc.getAllConfs filter(_._1.contains(warehouse.dir)) foreach println hc.getAllConfs filter(_._1.contains(shuffle.partitions)) foreach println } Results: (hive.metastore.warehouse.dir,/home/spark/hive/warehouse_test) (spark.sql.shuffle.partitions,10) snippet 2: ... hc.setConf(hive.metastore.warehouse.dir, /home/spark/hive/warehouse_test) hc.setConf(spark.sql.shuffle.partitions, 10) hc.getAllConfs filter(_._1.contains(warehouse.dir)) foreach println hc.getAllConfs filter(_._1.contains(shuffle.partitions)) foreach println ... Results: (hive.metastore.warehouse.dir,/user/hive/warehouse) (spark.sql.shuffle.partitions,10) You can see that I just permuted the two setConf call, then that leads to two different Hive configuration. It seems that HiveContext can not set a new value on
Is a higher-res or vector version of Spark logo available?
My employer (adform.com) would like to use the Spark logo in a recruitment event (to indicate that we are using Spark in our company). I looked in the Spark repo (https://github.com/apache/spark/tree/master/docs/img) but couldn't find a vector format. Is a higher-res or vector format version available anywhere? Enno
Streaming Kmeans usage in java
Do everyone do we have sample example how to use streaming k-means clustering with java. I have seen some example usage in scala. can anybody point me to the java example? regards jeetendra
How to start Thrift JDBC server as part of standalone spark application?
Hello, I would like to export RDD/DataFrames via JDBC SQL interface from the standalone application for currently stable Spark v1.3.1. I found one way of doing it but it requires the use of @DeveloperAPI method HiveThriftServer2.startWithContext(sqlContext) Is there a better, production level approach to do that? Full code snippet is below: // you can run it via: // ../spark/bin/spark-submit --master local[*] --class SimpleApp target/scala-2.10/simple-project_2.10-1.0.jar src/test/resources/1.json tableFromJson import org.apache.spark.sql.hive.thriftserver.HiveThriftServer2 import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.hive.HiveContext object SimpleApp { def main(args: Array[String]) { if (args.length != 2) { Console.err.println(Usage: app source_json_file table_name) System.exit(1) } val sourceFile = args(0) val tableName = args(1) val sparkConf = new SparkConf().setAppName(Simple Application) val sc = new SparkContext(sparkConf) val sqlContext = new HiveContext(sc) val df = sqlContext.jsonFile(sourceFile) df.registerTempTable(tableName) println(Registered temp table %s for data source %s.format(tableName, sourceFile)) HiveThriftServer2.startWithContext(sqlContext) } } Best, Vladimir Grigor
Re: How to debug Spark on Yarn?
For step 2, you can pipe application log to a file instead of copy-pasting. Cheers On Apr 22, 2015, at 10:48 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I submit a spark app to YARN and i get these messages 15/04/22 22:45:04 INFO yarn.Client: Application report for application_1429087638744_101363 (state: RUNNING) 15/04/22 22:45:04 INFO yarn.Client: Application report for application_1429087638744_101363 (state: RUNNING). ... 1) I can go to Spark UI and see the status of the APP but cannot see the logs as the job progresses. How can i see logs of executors as they progress ? 2) In case the App fails/completes then Spark UI vanishes and i get a YARN Job page that says job failed, there are no link to Spark UI again. Now i take the job ID and run yarn application logs appid and my console fills up (with huge scrolling) with logs of all executors. Then i copy and paste into a text editor and search for keywords Exception , Job aborted due to . Is this the right way to view logs ? -- Deepak
Re: MLlib - Collaborative Filtering - trainImplicit task size
All these warnings come from ALS iterations, from flatMap and also from aggregate, for instance the origin of the state where the flatMap is showing these warnings (w/ Spark 1.3.0, they are also shown in Spark 1.3.1): org.apache.spark.rdd.RDD.flatMap(RDD.scala:296) org.apache.spark.ml.recommendation.ALS$.org$apache$spark$ml$recommendation$ALS$$computeFactors(ALS.scala:1065) org.apache.spark.ml.recommendation.ALS$$anonfun$train$3.apply(ALS.scala:530) org.apache.spark.ml.recommendation.ALS$$anonfun$train$3.apply(ALS.scala:527) scala.collection.immutable.Range.foreach(Range.scala:141) org.apache.spark.ml.recommendation.ALS$.train(ALS.scala:527) org.apache.spark.mllib.recommendation.ALS.run(ALS.scala:203) And from the aggregate: org.apache.spark.rdd.RDD.aggregate(RDD.scala:968) org.apache.spark.ml.recommendation.ALS$.computeYtY(ALS.scala:1112) org.apache.spark.ml.recommendation.ALS$.org$apache$spark$ml$recommendation$ALS$$computeFactors(ALS.scala:1064) org.apache.spark.ml.recommendation.ALS$$anonfun$train$3.apply(ALS.scala:538) org.apache.spark.ml.recommendation.ALS$$anonfun$train$3.apply(ALS.scala:527) scala.collection.immutable.Range.foreach(Range.scala:141) org.apache.spark.ml.recommendation.ALS$.train(ALS.scala:527) org.apache.spark.mllib.recommendation.ALS.run(ALS.scala:203) On Thu, Apr 23, 2015 at 2:49 AM, Xiangrui Meng men...@gmail.com wrote: This is the size of the serialized task closure. Is stage 246 part of ALS iterations, or something before or after it? -Xiangrui On Tue, Apr 21, 2015 at 10:36 AM, Christian S. Perone christian.per...@gmail.com wrote: Hi Sean, thanks for the answer. I tried to call repartition() on the input with many different sizes and it still continues to show that warning message. On Tue, Apr 21, 2015 at 7:05 AM, Sean Owen so...@cloudera.com wrote: I think maybe you need more partitions in your input, which might make for smaller tasks? On Tue, Apr 21, 2015 at 2:56 AM, Christian S. Perone christian.per...@gmail.com wrote: I keep seeing these warnings when using trainImplicit: WARN TaskSetManager: Stage 246 contains a task of very large size (208 KB). The maximum recommended task size is 100 KB. And then the task size starts to increase. Is this a known issue ? Thanks ! -- Blog | Github | Twitter Forgive, O Lord, my little jokes on Thee, and I'll forgive Thy great big joke on me. -- Blog | Github | Twitter Forgive, O Lord, my little jokes on Thee, and I'll forgive Thy great big joke on me. -- Blog http://blog.christianperone.com | Github https://github.com/perone | Twitter https://twitter.com/tarantulae Forgive, O Lord, my little jokes on Thee, and I'll forgive Thy great big joke on me.
ML regression - spark context dies without error
Hi all, I have been testing Spark ML algorithms with bigger dataset, and ran into some problems with linear regression: It seems the executors stop without any apparent reason: 15/04/22 20:15:05 INFO BlockManagerInfo: Added rdd_12492_80 in memory on backend-node:48037 (size: 28.5 MB, free: 2.8 GB) 15/04/22 20:15:05 INFO BlockManagerInfo: Added rdd_12493_80 in memory on backend-node:48037 (size: 37.6 MB, free: 2.7 GB) 15/04/22 20:15:08 INFO BlockManagerInfo: Added rdd_12489_81 in memory on backend-node:48037 (size: 8.4 MB, free: 2.7 GB) [E 150422 20:15:12 java_gateway:483] Error while sending or receiving. Traceback (most recent call last): File /home/azureuser/spark-1.3.0-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 479, in send_command raise Py4JError(Answer from Java side is empty) Py4JError: Answer from Java side is empty Then sparkcontext stops, too : [E 150422 20:15:12 java_gateway:431] An error occurred while trying to connect to the Java server the problem is that it does not happen all the time, it only fails maybe once in every five attempts. any suggestions where can I get more detailed logs from? Thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/ML-regression-spark-context-dies-without-error-tp22633.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Contributors, read me! Updated Contributing to Spark wiki
Following several discussions about how to improve the contribution process in Spark, I've overhauled the guide to contributing. Anyone who is going to contribute needs to read it, as it has more formal guidance about the process: https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark We may push back harder now on pull requests and JIRAs that don't follow this guidance. It will help everyone spend less time to get changes in, and spend less time on duplicated effort, or changes that won't. A summary of key points is found in CONTRIBUTING.md, a prompt presented before opening pull requests (https://github.com/apache/spark/blob/master/CONTRIBUTING.md): - Is the change important and ready enough to ask the community to spend time reviewing? - Have you searched for existing, related JIRAs and pull requests? - Is this a new feature that can stand alone as a package on http://spark-packages.org ? - Is the change being proposed clearly explained and motivated? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: StandardScaler failing with OOM errors in PySpark
ok yes, I think I have narrowed it down to being a problem with driver memory settings. It looks like the application master/driver is not being launched with the settings specified: For the driver process on the main node I see -XX:MaxPermSize=128m -Xms512m -Xmx512m as options used to start the JVM, even though I specified 'spark.yarn.am.memory', '5g' 'spark.yarn.am.memoryOverhead', '2000' The info shows that these options were read: 15/04/23 13:47:47 INFO yarn.Client: Will allocate AM container, with 7120 MB memory including 2000 MB overhead Is there some reason why these options are being ignored and instead starting the driver with just 512Mb of heap? On Thu, Apr 23, 2015 at 8:06 AM, Rok Roskar rokros...@gmail.com wrote: the feature dimension is 800k. yes, I believe the driver memory is likely the problem since it doesn't crash until the very last part of the tree aggregation. I'm running it via pyspark through YARN -- I have to run in client mode so I can't set spark.driver.memory -- I've tried setting the spark.yarn.am.memory and overhead parameters but it doesn't seem to have an effect. Thanks, Rok On Apr 23, 2015, at 7:47 AM, Xiangrui Meng men...@gmail.com wrote: What is the feature dimension? Did you set the driver memory? -Xiangrui On Tue, Apr 21, 2015 at 6:59 AM, rok rokros...@gmail.com wrote: I'm trying to use the StandardScaler in pyspark on a relatively small (a few hundred Mb) dataset of sparse vectors with 800k features. The fit method of StandardScaler crashes with Java heap space or Direct buffer memory errors. There should be plenty of memory around -- 10 executors with 2 cores each and 8 Gb per core. I'm giving the executors 9g of memory and have also tried lots of overhead (3g), thinking it might be the array creation in the aggregators that's causing issues. The bizarre thing is that this isn't always reproducible -- sometimes it actually works without problems. Should I be setting up executors differently? Thanks, Rok -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/StandardScaler-failing-with-OOM-errors-in-PySpark-tp22593.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: gridsearch - python
https://issues.apache.org/jira/browse/SPARK-7022. Punya On Thu, Apr 23, 2015 at 5:47 PM Pagliari, Roberto rpagli...@appcomsci.com wrote: Can anybody point me to an example, if available, about gridsearch with python? Thank you,
RE: Re: problem with spark thrift server
Hi, can you describe a little bit how the ThriftServer crashed, or steps to reproduce that? It’s probably a bug of ThriftServer. Thanks, From: guoqing0...@yahoo.com.hk [mailto:guoqing0...@yahoo.com.hk] Sent: Friday, April 24, 2015 9:55 AM To: Arush Kharbanda Cc: user Subject: Re: Re: problem with spark thrift server Thanks for your reply , i would like to use Spark Thriftserver as JDBC SQL interface and the Spark application running on YARN . but the application was FINISHED when the Thriftserver crashed , all the cached table was lost . Thriftserver start command: start-thriftserver.sh --master yarn --executor-memory 20480m --executor-cores 2 --num-executors 20 --queue spark My question is whether the Thriftserver has anyother more stable mode on YARN , like active standby in the Thriftserver . Really appreciate for any suggestions and idea . Thanks. From: Arush Kharbandamailto:ar...@sigmoidanalytics.com Date: 2015-04-23 18:40 To: guoqing0...@yahoo.com.hkmailto:guoqing0...@yahoo.com.hk CC: usermailto:user@spark.apache.org Subject: Re: problem with spark thrift server Hi What do you mean disable the driver? what are you trying to achieve. Thanks Arush On Thu, Apr 23, 2015 at 12:29 PM, guoqing0...@yahoo.com.hkmailto:guoqing0...@yahoo.com.hk guoqing0...@yahoo.com.hkmailto:guoqing0...@yahoo.com.hk wrote: Hi , I have a question about spark thrift server , i deployed the spark on yarn and found if the spark driver disable , the spark application will be crashed on yarn. appreciate for any suggestions and idea . Thank you! -- [Sigmoid Analytics]http://htmlsig.com/www.sigmoidanalytics.com Arush Kharbanda || Technical Teamlead ar...@sigmoidanalytics.commailto:ar...@sigmoidanalytics.com || www.sigmoidanalytics.comhttp://www.sigmoidanalytics.com/
Re: Spark Streaming updatyeStateByKey throws OutOfMemory Error
*bump* On Thu, Apr 23, 2015 at 3:46 PM, Sourav Chandra sourav.chan...@livestream.com wrote: HI TD, Some observations: 1. If I submit the application using spark-submit tool with *client as deploy mode* it works fine with single master and worker (driver, master and worker are running in same machine) 2. If I submit the application using spark-submit tool with client as deploy mode it *crashes after some time with StackOverflowError* *single master and 2 workers* (driver, master and 1 worker is running in same machine, other worker is in different machine) *15/04/23 05:42:04 Executor: Exception in task 0.0 in stage 23153.0 (TID 5412)* *java.lang.StackOverflowError* *at java.io.ObjectInputStream$BlockDataInputStream.readUTF(ObjectInputStream.java:2864)* *at java.io.ObjectInputStream.readUTF(ObjectInputStream.java:1072)* *at java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:671)* *at java.io.ObjectInputStream.readClassDescriptor(ObjectInputStream.java:830)* *at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1601)* *at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)* *at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)* *at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)* *at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)* *at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)* *at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)* *at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)* *at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)* *at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)* *at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)* *at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)* *at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)* *at scala.collection.immutable.$colon$colon.readObject(List.scala:362)* *at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)* *at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)* *at java.lang.reflect.Method.invoke(Method.java:606)* *at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)* *at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)* *at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)* *at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)* *at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)* *at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)* *at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)* *at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)* *at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)* *at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)* *at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)* *at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)* *at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)* *at scala.collection.immutable.$colon$colon.readObject(List.scala:362)* *at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)* *at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)* *at java.lang.reflect.Method.invoke(Method.java:606)* *at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)* *at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)* *at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)* *at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)* *at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)* *at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)* *at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)* *at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)* *at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)* *at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)* *at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)* *at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)* *at
Re: Is the Spark-1.3.1 support build with scala 2.8 ?
Hi, AFAIK it's only build with 2.10 and 2.11. You should integrate kafka_2.10.0-0.8.0 to make it work. Regards, Madhukara Phatak http://datamantra.io/ On Fri, Apr 24, 2015 at 9:22 AM, guoqing0...@yahoo.com.hk guoqing0...@yahoo.com.hk wrote: Is the Spark-1.3.1 support build with scala 2.8 ? Wether it can integrated with kafka_2.8.0-0.8.0 If build with scala 2.10 . Thanks.
RE: gridsearch - python
I know grid search with cross validation is not supported. However, I was wondering if there is something availalable for the time being. Thanks, From: Punyashloka Biswal [mailto:punya.bis...@gmail.com] Sent: Thursday, April 23, 2015 9:06 PM To: Pagliari, Roberto; user@spark.apache.org Subject: Re: gridsearch - python https://issues.apache.org/jira/browse/SPARK-7022. Punya On Thu, Apr 23, 2015 at 5:47 PM Pagliari, Roberto rpagli...@appcomsci.commailto:rpagli...@appcomsci.com wrote: Can anybody point me to an example, if available, about gridsearch with python? Thank you,
Re: Re: Is the Spark-1.3.1 support build with scala 2.8 ?
Thank you very much for your suggestion. Regards, From: madhu phatak Date: 2015-04-24 13:06 To: guoqing0...@yahoo.com.hk CC: user Subject: Re: Is the Spark-1.3.1 support build with scala 2.8 ? Hi, AFAIK it's only build with 2.10 and 2.11. You should integrate kafka_2.10.0-0.8.0 to make it work. Regards, Madhukara Phatak http://datamantra.io/ On Fri, Apr 24, 2015 at 9:22 AM, guoqing0...@yahoo.com.hk guoqing0...@yahoo.com.hk wrote: Is the Spark-1.3.1 support build with scala 2.8 ? Wether it can integrated with kafka_2.8.0-0.8.0 If build with scala 2.10 . Thanks.
Spark SQL - Setting YARN Classpath for primordial class loader
Hi guys, Having a problem build a DataFrame in Spark SQL from a JDBC data source when running with --master yarn-client and adding the JDBC driver JAR with --jars. If I run with a local[*] master all works fine. ./bin/spark-shell --jars /tmp/libs/mysql-jdbc.jar --master yarn-client sqlContext.load(jdbc, Map(url - jdbc:mysql://mysqlsvr:3306/MyDB;user=usr;password=pwd, driver - com.mysql.jdbc.Driver, dbtable - MY_TBL”)) This throws a class not found exception when running with Spark SQL. But when trying to get the driver class on the workers or driver the class is found no problems. So I'm guessing this is some problem with the primordial class loader/Java security in the DriverManager and the class loader used in Spark SQL when running on YARN. Any ideas? The only thing I have found that works is merging my mysql adbc driver into the Spark assembly JAR thats shipped to YARN. Because adding with --jars doesn't work. Cheers!
Re: Spark SQL - Setting YARN Classpath for primordial class loader
You'd have to use spark.{driver,executor}.extraClassPath to modify the system class loader. But that also means you have to manually distribute the jar to the nodes in your cluster, into a common location. On Thu, Apr 23, 2015 at 6:38 PM, Night Wolf nightwolf...@gmail.com wrote: Hi guys, Having a problem build a DataFrame in Spark SQL from a JDBC data source when running with --master yarn-client and adding the JDBC driver JAR with --jars. If I run with a local[*] master all works fine. ./bin/spark-shell --jars /tmp/libs/mysql-jdbc.jar --master yarn-client sqlContext.load(jdbc, Map(url - jdbc:mysql://mysqlsvr:3306/MyDB;user=usr;password=pwd, driver - com.mysql.jdbc.Driver, dbtable - MY_TBL”)) This throws a class not found exception when running with Spark SQL. But when trying to get the driver class on the workers or driver the class is found no problems. So I'm guessing this is some problem with the primordial class loader/Java security in the DriverManager and the class loader used in Spark SQL when running on YARN. Any ideas? The only thing I have found that works is merging my mysql adbc driver into the Spark assembly JAR thats shipped to YARN. Because adding with --jars doesn't work. Cheers! -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark SQL - Setting YARN Classpath for primordial class loader
Thanks Marcelo, can this be a path on HDFS? On Fri, Apr 24, 2015 at 11:52 AM, Marcelo Vanzin van...@cloudera.com wrote: You'd have to use spark.{driver,executor}.extraClassPath to modify the system class loader. But that also means you have to manually distribute the jar to the nodes in your cluster, into a common location. On Thu, Apr 23, 2015 at 6:38 PM, Night Wolf nightwolf...@gmail.com wrote: Hi guys, Having a problem build a DataFrame in Spark SQL from a JDBC data source when running with --master yarn-client and adding the JDBC driver JAR with --jars. If I run with a local[*] master all works fine. ./bin/spark-shell --jars /tmp/libs/mysql-jdbc.jar --master yarn-client sqlContext.load(jdbc, Map(url - jdbc:mysql://mysqlsvr:3306/MyDB;user=usr;password=pwd, driver - com.mysql.jdbc.Driver, dbtable - MY_TBL”)) This throws a class not found exception when running with Spark SQL. But when trying to get the driver class on the workers or driver the class is found no problems. So I'm guessing this is some problem with the primordial class loader/Java security in the DriverManager and the class loader used in Spark SQL when running on YARN. Any ideas? The only thing I have found that works is merging my mysql adbc driver into the Spark assembly JAR thats shipped to YARN. Because adding with --jars doesn't work. Cheers! -- Marcelo
Re: Spark SQL - Setting YARN Classpath for primordial class loader
No, those have to be local paths. On Thu, Apr 23, 2015 at 6:53 PM, Night Wolf nightwolf...@gmail.com wrote: Thanks Marcelo, can this be a path on HDFS? On Fri, Apr 24, 2015 at 11:52 AM, Marcelo Vanzin van...@cloudera.com wrote: You'd have to use spark.{driver,executor}.extraClassPath to modify the system class loader. But that also means you have to manually distribute the jar to the nodes in your cluster, into a common location. On Thu, Apr 23, 2015 at 6:38 PM, Night Wolf nightwolf...@gmail.com wrote: Hi guys, Having a problem build a DataFrame in Spark SQL from a JDBC data source when running with --master yarn-client and adding the JDBC driver JAR with --jars. If I run with a local[*] master all works fine. ./bin/spark-shell --jars /tmp/libs/mysql-jdbc.jar --master yarn-client sqlContext.load(jdbc, Map(url - jdbc:mysql://mysqlsvr:3306/MyDB;user=usr;password=pwd, driver - com.mysql.jdbc.Driver, dbtable - MY_TBL”)) This throws a class not found exception when running with Spark SQL. But when trying to get the driver class on the workers or driver the class is found no problems. So I'm guessing this is some problem with the primordial class loader/Java security in the DriverManager and the class loader used in Spark SQL when running on YARN. Any ideas? The only thing I have found that works is merging my mysql adbc driver into the Spark assembly JAR thats shipped to YARN. Because adding with --jars doesn't work. Cheers! -- Marcelo -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Re: problem with spark thrift server
Thanks for your reply , i would like to use Spark Thriftserver as JDBC SQL interface and the Spark application running on YARN . but the application was FINISHED when the Thriftserver crashed , all the cached table was lost . Thriftserver start command: start-thriftserver.sh --master yarn --executor-memory 20480m --executor-cores 2 --num-executors 20 --queue spark My question is whether the Thriftserver has anyother more stable mode on YARN , like active standby in the Thriftserver . Really appreciate for any suggestions and idea . Thanks. From: Arush Kharbanda Date: 2015-04-23 18:40 To: guoqing0...@yahoo.com.hk CC: user Subject: Re: problem with spark thrift server Hi What do you mean disable the driver? what are you trying to achieve. Thanks Arush On Thu, Apr 23, 2015 at 12:29 PM, guoqing0...@yahoo.com.hk guoqing0...@yahoo.com.hk wrote: Hi , I have a question about spark thrift server , i deployed the spark on yarn and found if the spark driver disable , the spark application will be crashed on yarn. appreciate for any suggestions and idea . Thank you! -- Arush Kharbanda || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com
spark 1.3.0 strange log message
Dear All, When using spark 1.3.0 spark-submit with directing out and err to a log file, I saw some strange lines inside that looks like this: [Stage 0:(0 + 2) / 120] [Stage 0:(2 + 2) / 120] [Stage 0:== (6 + 2) / 120] [Stage 0:= (12 + 2) / 120] [Stage 0:= (20 + 2) / 120] [Stage 0:===(24 + 2) / 120] [Stage 0:== (32 + 2) / 120] [Stage 0:===(42 + 2) / 120] [Stage 0: (52 + 2) / 120] [Stage 0:===(59 + 2) / 120] [Stage 0:===(68 + 2) / 120] [Stage 0: (78 + 3) / 120] [Stage 0:= (88 + 4) / 120] [Stage 0:= (100 + 2) / 120] [Stage 0:==(110 + 2) / 120] Here is my log4j property: # Set everything to be logged to the console log4j.rootCategory=WARN, console log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.target=System.err log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n # Settings to quiet third party logs that are too verbose log4j.logger.org.eclipse.jetty=WARN log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO I want to know how to disable this kind of stage progress message? Best regards, Henry The privileged confidential information contained in this email is intended for use only by the addressees as indicated by the original sender of this email. If you are not the addressee indicated in this email or are not responsible for delivery of the email to such a person, please kindly reply to the sender indicating this fact and delete all copies of it from your computer and network server immediately. Your cooperation is highly appreciated. It is advised that any unauthorized use of confidential information of Winbond is strictly prohibited; and any information in this email irrelevant to the official business of Winbond shall be deemed as neither given nor endorsed by Winbond.
Re: spark 1.3.0 strange log message
Use this in spark conf: spark.ui.showConsoleProgress=false Best Regards, On Fri, Apr 24, 2015 at 11:23 AM, Henry Hung ythu...@winbond.com wrote: Dear All, When using spark 1.3.0 spark-submit with directing out and err to a log file, I saw some strange lines inside that looks like this: [Stage 0:(0 + 2) / 120] [Stage 0:(2 + 2) / 120] [Stage 0:== (6 + 2) / 120] [Stage 0:= (12 + 2) / 120] [Stage 0:= (20 + 2) / 120] [Stage 0:===(24 + 2) / 120] [Stage 0:== (32 + 2) / 120] [Stage 0:===(42 + 2) / 120] [Stage 0: (52 + 2) / 120] [Stage 0:===(59 + 2) / 120] [Stage 0:===(68 + 2) / 120] [Stage 0: (78 + 3) / 120] [Stage 0:= (88 + 4) / 120] [Stage 0:= (100 + 2) / 120] [Stage 0:==(110 + 2) / 120] Here is my log4j property: # Set everything to be logged to the console log4j.rootCategory=WARN, console log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.target=System.err log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n # Settings to quiet third party logs that are too verbose log4j.logger.org.eclipse.jetty=WARN log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO I want to know how to disable this kind of stage progress message? Best regards, Henry -- The privileged confidential information contained in this email is intended for use only by the addressees as indicated by the original sender of this email. If you are not the addressee indicated in this email or are not responsible for delivery of the email to such a person, please kindly reply to the sender indicating this fact and delete all copies of it from your computer and network server immediately. Your cooperation is highly appreciated. It is advised that any unauthorized use of confidential information of Winbond is strictly prohibited; and any information in this email irrelevant to the official business of Winbond shall be deemed as neither given nor endorsed by Winbond.