Python - SQL (geonames dataset)
I'm using Python to setup a dataframe, but for some reason it is not being made available to SQL. Code (from Zeppelin) below. I don't get any error when loading/prepping the data or dataframe. Any tips? (Originally I was not hardcoding the Row() structure, as my other tutorial added it by default, not sure why it didn't work here, but that might be besides the point.) Any guesses greatly appreciated as I dig my teeth in here for the first time. Thanks! --- %pyspark from pyspark.sql.types import Row, StructType, StructField, IntegerType, StringType, DecimalType from os import getcwd sqlContext = SQLContext(sc) datafile = sc.textFile(/Users/tyler/data/geonames/CA.txt) geonames = datafile.map(lambda s: s.split(\t)).map(lambda s: Row( geonameid=int(s[0]), asciiname=str(s[2]), latitude=float(s[4]), longitude=float(s[5]), elevation=str(s[16]), featureclass=str(s[6]), featurecode=str(s[7]), countrycode=str(s[8]) )) gndf = sqlContext.inferSchema(geonames) gndf.registerAsTable(geonames) #print gndf.count() print --- print gndf.columns print --- print gndf.first() print --- gndf.schema OUTPUT [u'asciiname', u'countrycode', u'elevation', u'featureclass', u'featurecode', u'geonameid', u'latitude', u'longitude'] --- Row(asciiname=u'100 Mile House', countrycode=u'CA', elevation=u'928', featureclass=u'P', featurecode=u'PPL', geonameid=5881639, latitude=51.64982, longitude=-121.28594) --- StructType(List(StructField(asciiname,StringType,true),StructField(countrycode,StringType,true),StructField(elevation,StringType,true),StructField(featureclass,StringType,true),StructField(featurecode,StringType,true),StructField(geonameid,LongType,true),StructField(latitude,DoubleType,true),StructField(longitude,DoubleType,true))) = %sql SELECT geonameid, count(1) value FROM geonames LIMIT 1 no such table List(geonames); line 2 pos 5
Re: Is it possible to set the akka specify properties (akka.extensions) in spark
Try SparkConf.set(spark.akka.extensions,Whatever), underneath i think spark won't ship properties which don't start with spark.* to the executors. Thanks Best Regards On Mon, May 11, 2015 at 8:33 AM, Terry Hole hujie.ea...@gmail.com wrote: Hi all, I'd like to monitor the akka using kamon, which need to set the akka.extension to a list like this in typesafe config format: akka { extensions = [kamon.system.SystemMetrics, kamon.statsd.StatsD] } But i can not find a way to do this, i have tried these: 1. SparkConf.set(akka.extensions, [kamon.system.SystemMetrics, kamon.statsd.StatsD]) 2. use application.conf and set it use java option -Dconfig.resource=/path/to/conf 3. Set akka.extensions [kamon.system.SystemMetrics, kamon.statsd.StatsD] in spark conf file None of these work. Do we have others ways to set this? Thanks!
Re: Cassandra number of Tasks
Did you try repartitioning? You might end up with a lot of time spending on GC though. Thanks Best Regards On Fri, May 8, 2015 at 11:59 PM, Vijay Pawnarkar vijaypawnar...@gmail.com wrote: I am using the Spark Cassandra connector to work with a table with 3 million records. Using .where() API to work with only a certain rows in this table. Where clause filters the data to 1 rows. CassandraJavaUtil.javaFunctions(sparkContext) .cassandraTable(KEY_SPACE, MY_TABLE, CassandraJavaUtil.mapRowTo(MyClass.class)).where(cqlDataFilter, cqlFilterParams) Also using parameter spark.cassandra.input.split.size=1000 As this job is processed by Spark cluster, it created 3000 partitions instead of 10. On spark cluster 3000 tasks are being executed. As the data in our table grows to 30 million rows, this will create 30,000 tasks instead of 10. Is there a better way to approach process these 10,000 records with 10 tasks. Thanks!
Re: Python - SQL (geonames dataset)
Try this Res = ssc.sql(your SQL without limit) Print red.first() Note: your SQL looks wrong as count will need a group by clause. Best Ayan On 11 May 2015 16:22, Tyler Mitchell tyler.mitch...@actian.com wrote: I'm using Python to setup a dataframe, but for some reason it is not being made available to SQL. Code (from Zeppelin) below. I don't get any error when loading/prepping the data or dataframe. Any tips? (Originally I was not hardcoding the Row() structure, as my other tutorial added it by default, not sure why it didn't work here, but that might be besides the point.) Any guesses greatly appreciated as I dig my teeth in here for the first time. Thanks! --- %pyspark from pyspark.sql.types import Row, StructType, StructField, IntegerType, StringType, DecimalType from os import getcwd sqlContext = SQLContext(sc) datafile = sc.textFile(/Users/tyler/data/geonames/CA.txt) geonames = datafile.map(lambda s: s.split(\t)).map(lambda s: Row( geonameid=int(s[0]), asciiname=str(s[2]), latitude=float(s[4]), longitude=float(s[5]), elevation=str(s[16]), featureclass=str(s[6]), featurecode=str(s[7]), countrycode=str(s[8]) )) gndf = sqlContext.inferSchema(geonames) gndf.registerAsTable(geonames) #print gndf.count() print --- print gndf.columns print --- print gndf.first() print --- gndf.schema OUTPUT [u'asciiname', u'countrycode', u'elevation', u'featureclass', u'featurecode', u'geonameid', u'latitude', u'longitude'] --- Row(asciiname=u'100 Mile House', countrycode=u'CA', elevation=u'928', featureclass=u'P', featurecode=u'PPL', geonameid=5881639, latitude=51.64982, longitude=-121.28594) --- StructType(List(StructField(asciiname,StringType,true),StructField(countrycode,StringType,true),StructField(elevation,StringType,true),StructField(featureclass,StringType,true),StructField(featurecode,StringType,true),StructField(geonameid,LongType,true),StructField(latitude,DoubleType,true),StructField(longitude,DoubleType,true))) = %sql SELECT geonameid, count(1) value FROM geonames LIMIT 1 no such table List(geonames); line 2 pos 5
Re: EVent generation
Have a look over here https://storm.apache.org/community.html Thanks Best Regards On Sun, May 10, 2015 at 3:21 PM, anshu shukla anshushuk...@gmail.com wrote: http://stackoverflow.com/questions/30149868/generate-events-tuples-using-csv-file-with-timestamps -- Thanks Regards, Anshu Shukla
Re: Spark can not access jar from HDFS !!
Hi All, Thanks for suggestions. What I tried is - hiveContext.sql (add jar ) and that helps to complete the create temporary function but while using this function I get ClassNotFound for the class handling this function. The same class is present in the jar added . Please note that the same works fine from the Hive Shell. Is there an issue with Spark while distributing jars across workers? May be that is causing the problem. Also can you please suggest the manual way of copying the jars to the workers, I just want to ascertain my assumption. Thanks, Ravi On Sun, May 10, 2015 at 1:40 AM Michael Armbrust mich...@databricks.com wrote: That code path is entirely delegated to hive. Does hive support this? You might try instead using sparkContext.addJar. On Sat, May 9, 2015 at 12:32 PM, Ravindra ravindra.baj...@gmail.com wrote: Hi All, I am trying to create custom udfs with hiveContext as given below - scala hiveContext.sql (CREATE TEMPORARY FUNCTION sample_to_upper AS 'com.abc.api.udf.MyUpper' USING JAR 'hdfs:///users/ravindra/customUDF2.jar') I have put the udf jar in the hdfs at the path given above. The same command works well in the hive shell but failing here in the spark shell. And it fails as given below. - 15/05/10 00:41:51 ERROR Task: FAILED: org.apache.hadoop.hive.ql.metadata.HiveException: Unable to load JAR hdfs:///users/ravindra/customUDF2.jar 15/05/10 00:41:51 INFO FunctionTask: create function: org.apache.hadoop.hive.ql.metadata.HiveException: Unable to load JAR hdfs:///users/ravindra/customUDF2.jar at org.apache.hadoop.hive.ql.exec.FunctionTask.addFunctionResources(FunctionTask.java:305) at org.apache.hadoop.hive.ql.exec.FunctionTask.createTemporaryFunction(FunctionTask.java:179) at org.apache.hadoop.hive.ql.exec.FunctionTask.execute(FunctionTask.java:81) at org.apache.hadoop.hive.ql.exec.Task.executeTask(Task.java:153) at org.apache.hadoop.hive.ql.exec.TaskRunner.runSequential(TaskRunner.java:85) at org.apache.hadoop.hive.ql.Driver.launchTask(Driver.java:1503) at org.apache.hadoop.hive.ql.Driver.execute(Driver.java:1270) at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1088) at org.apache.hadoop.hive.ql.Driver.run(Driver.java:911) at org.apache.hadoop.hive.ql.Driver.run(Driver.java:901) at org.apache.spark.sql.hive.HiveContext.runHive(HiveContext.scala:305) at org.apache.spark.sql.hive.HiveContext.runSqlHive(HiveContext.scala:276) at org.apache.spark.sql.hive.execution.NativeCommand.sideEffectResult$lzycompute(NativeCommand.scala:35) at org.apache.spark.sql.hive.execution.NativeCommand.sideEffectResult(NativeCommand.scala:35) at org.apache.spark.sql.execution.Command$class.execute(commands.scala:46) at org.apache.spark.sql.hive.execution.NativeCommand.execute(NativeCommand.scala:30) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425) at org.apache.spark.sql.SchemaRDDLike$class.$init$(SchemaRDDLike.scala:58) at org.apache.spark.sql.SchemaRDD.init(SchemaRDD.scala:108) at org.apache.spark.sql.hive.HiveContext.sql(HiveContext.scala:94) at $line17.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:18) at $line17.$read$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:23) at $line17.$read$$iwC$$iwC$$iwC$$iwC.init(console:25) at $line17.$read$$iwC$$iwC$$iwC.init(console:27) at $line17.$read$$iwC$$iwC.init(console:29) at $line17.$read$$iwC.init(console:31) at $line17.$read.init(console:33) at $line17.$read$.init(console:37) at $line17.$read$.clinit(console) at $line17.$eval$.init(console:7) at $line17.$eval$.clinit(console) at $line17.$eval.$print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:705) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:828) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:873) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:785) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:628) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:636) at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:641) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:968) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916) at
[SparkSQL 1.4.0] groupBy columns are always nullable?
I try to get the result schema of aggregate functions using DataFrame API. However, I find the result field of groupBy columns are always nullable even the source field is not nullable. I want to know if this is by design, thank you! Below is the simple code to show the issue. == import sqlContext.implicits._ import org.apache.spark.sql.functions._ case class Test(key: String, value: Long) val df = sc.makeRDD(Seq(Test(k1,2),Test(k1,1))).toDF val result = df.groupBy(key).agg($key, sum(value)) // From the output, you can see the key column is nullable, why?? result.printSchema //root // |-- key: string (nullable = true) // |-- SUM(value): long (nullable = true) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Is it possible to set the akka specify properties (akka.extensions) in spark
Hi, Akhil, I tried this. It did not work. I also tried SparkConf.set(akka. extensions,[\kamon.system.SystemMetrics\, \kamon.statsd.StatsD\]), it also did not work. Thanks On Mon, May 11, 2015 at 2:56 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Try SparkConf.set(spark.akka.extensions,Whatever), underneath i think spark won't ship properties which don't start with spark.* to the executors. Thanks Best Regards On Mon, May 11, 2015 at 8:33 AM, Terry Hole hujie.ea...@gmail.com wrote: Hi all, I'd like to monitor the akka using kamon, which need to set the akka.extension to a list like this in typesafe config format: akka { extensions = [kamon.system.SystemMetrics, kamon.statsd.StatsD] } But i can not find a way to do this, i have tried these: 1. SparkConf.set(akka.extensions, [kamon.system.SystemMetrics, kamon.statsd.StatsD]) 2. use application.conf and set it use java option -Dconfig.resource=/path/to/conf 3. Set akka.extensions [kamon.system.SystemMetrics, kamon.statsd.StatsD] in spark conf file None of these work. Do we have others ways to set this? Thanks!
Re: large volume spark job spends most of the time in AppendOnlyMap.changeValue
This is the stack trace of the worker thread: org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:150) org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32) org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130) org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:60) org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:46) org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) org.apache.spark.rdd.RDD.iterator(RDD.scala:244) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) org.apache.spark.rdd.RDD.iterator(RDD.scala:244) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) org.apache.spark.rdd.RDD.iterator(RDD.scala:244) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70) org.apache.spark.rdd.RDD.iterator(RDD.scala:242) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) org.apache.spark.scheduler.Task.run(Task.scala:64) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) On 8 May 2015 at 22:12, Josh Rosen rosenvi...@gmail.com wrote: Do you have any more specific profiling data that you can share? I'm curious to know where AppendOnlyMap.changeValue is being called from. On Fri, May 8, 2015 at 1:26 PM, Michal Haris michal.ha...@visualdna.com wrote: +dev On 6 May 2015 10:45, Michal Haris michal.ha...@visualdna.com wrote: Just wanted to check if somebody has seen similar behaviour or knows what we might be doing wrong. We have a relatively complex spark application which processes half a terabyte of data at various stages. We have profiled it in several ways and everything seems to point to one place where 90% of the time is spent: AppendOnlyMap.changeValue. The job scales and is relatively faster than its map-reduce alternative but it still feels slower than it should be. I am suspecting too much spill but I haven't seen any improvement by increasing number of partitions to 10k. Any idea would be appreciated. -- Michal Haris Technical Architect direct line: +44 (0) 207 749 0229 www.visualdna.com | t: +44 (0) 207 734 7033, -- Michal Haris Technical Architect direct line: +44 (0) 207 749 0229 www.visualdna.com | t: +44 (0) 207 734 7033,
how to load some of the files in a dir and monitor new file in that dir in spark streaming without missing?
I have one hdfs dir, which contains many files: /user/root/1.txt /user/root/2.txt /user/root/3.txt /user/root/4.txt and there is a daemon process which add one file per minute to this dir. (e.g., 5.txt, 6.txt, 7.txt...) I want to start a spark streaming job which load 3.txt, 4.txt and then detect all the new files after 4.txt. Please pay attention that because these files are large, processing these files will take a long time. So if I process 3.txt and 4.txt before launching the streaming task, maybe the 5.txt, 6.txt will be produced into this dir during processing 3.txt and 4.txt. And when the streaming task start, 5.txt and 6.txt will be missed for processing because it will only process from new file(from 7.txt) I'm not sure if I describe the problem clearly, if you have any question, please ask me -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/how-to-load-some-of-the-files-in-a-dir-and-monitor-new-file-in-that-dir-in-spark-streaming-without-m-tp22841.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
spark mllib kmeans
hi, it is possible to use a custom distance measure and a other data typ as vector? i want cluster temporal geo datas. best regards paul
RE: Spark SQL and java.lang.RuntimeException
Hi, Are you creating the table from hive? Which version of hive are you using? Thanks, Daoyuan -Original Message- From: Nick Travers [mailto:n.e.trav...@gmail.com] Sent: Sunday, May 10, 2015 10:34 AM To: user@spark.apache.org Subject: Spark SQL and java.lang.RuntimeException I'm getting the following error when reading a table from Hive. Note the spelling of the 'Primitve' in the stack trace. I can't seem to find it anywhere else online. It seems to only occur with this one particular table I am reading from. Occasionally the task will completely fail, other times it will not. I run into different variants of the exception, presumably for each of the different types of the columns (LONG, INT, BOOLEAN). Has anyone else run into this issue? I'm running Spark 1.3.0 with the standalone cluster manager. java.lang.RuntimeException: Primitve type LONG should not take parameters at org.apache.hadoop.hive.serde2.lazy.objectinspector.primitive.LazyPrimitiveObjectInspectorFactory.getLazyObjectInspector(LazyPrimitiveObjectInspectorFactory.java:136) at org.apache.hadoop.hive.serde2.lazy.objectinspector.primitive.LazyPrimitiveObjectInspectorFactory.getLazyObjectInspector(LazyPrimitiveObjectInspectorFactory.java:113) at org.apache.hadoop.hive.serde2.lazy.LazyFactory.createLazyObjectInspector(LazyFactory.java:224) at org.apache.hadoop.hive.serde2.lazy.LazyFactory.createColumnarStructInspector(LazyFactory.java:314) at org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe.initialize(ColumnarSerDe.java:88) at org.apache.spark.sql.hive.HadoopTableReader$$anonfun$2.apply(TableReader.scala:118) at org.apache.spark.sql.hive.HadoopTableReader$$anonfun$2.apply(TableReader.scala:115) at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634) at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-and-java-lang-RuntimeException-tp22831.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark mllib kmeans
Hi Paul, I would say that it should be possible, but you'll need a different distance measure which conforms to your coordinate system. 2015-05-11 14:59 GMT+02:00 Pa Rö paul.roewer1...@googlemail.com: hi, it is possible to use a custom distance measure and a other data typ as vector? i want cluster temporal geo datas. best regards paul
Reading Nested Fields in DataFrames
Hi , I am trying to read Nested Avro data in Spark 1.3 using DataFrames. I need help to retrieve the Inner element data in the Structure below. Below is the schema when I enter df.printSchema : |-- THROTTLING_PERCENTAGE: double (nullable = false) |-- IMPRESSION_TYPE: string (nullable = false) |-- campaignArray: array (nullable = false) ||-- element: struct (containsNull = false) |||-- COOKIE: string (nullable = false) |||-- CAMPAIGN_ID: long (nullable = false) How can I access CAMPAIGN_ID field in this schema ? Thanks, Ashish Kr. Singh
It takes too long (30 seconds) to create Spark Context with SPARK/YARN
I am running Spark jobs on YARN cluster. It took ~30 seconds to create a spark context, while it takes only 1-2 seconds running Spark in local mode. The master is set as yarn-client, and both the machine that submits the Spark job and the YARN cluster are in the same domain. Originally I suspected that the following config might play a role, especially spark.scheduler.maxRegisteredResourcesWaitingTime was set to 30 seconds. However, lowering it to 10 makes no difference. Modifications to other two parameters also has no effect. spark.scheduler.maxRegisteredResourcesWaitingTime=1000 (lowered from 3 default) spark.yarn.applicationMaster.waitTries=1 (lowered from 10 default) spark.yarn.scheduler.heartbeat.interval-ms = 1000 (lowered from 5000 default) What could be the possible reason? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/It-takes-too-long-30-seconds-to-create-Spark-Context-with-SPARK-YARN-tp22847.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Python - SQL can't find table (Zeppelin)
Thanks for the suggestion Ayan, it has not solved my problem but I did get sqlContext to execute the SQL and return dataframe object. SQL is running fine in the pyspark interpreter but not passing to SQL note (though it works fine for a different dataset) - guess I'll take this question to the Zeppelin list. Thanks again, more tips welcome if anyone sees anything funny... %pyspark from pyspark.sql.types import Row, StructType, StructField, IntegerType, StringType, DecimalType from os import getcwd sqlContext = SQLContext(sc) datafile = sc.textFile(/Users/mitty01/data/geonames/CA.txt) geonames = datafile.map(lambda s: s.split(\t)).map(lambda s: Row( geonameid=int(s[0]), asciiname=str(s[2]), latitude=float(s[4]), longitude=float(s[5]), elevation=str(s[16]), featureclass=str(s[6]), featurecode=str(s[7]), countrycode=str(s[8]) )) gndf = sqlContext.inferSchema(geonames) gndf.registerTempTable('geonames') #print gndf.count() print --- print gndf.columns print --- print gndf.first() print --- gndf.schema print --- sqlContext.sql(SELECT * FROM geonames LIMIT 10) OUTPUT --- [u'asciiname', u'countrycode', u'elevation', u'featureclass', u'featurecode', u'geonameid', u'latitude', u'longitude'] --- Row(asciiname=u'Swiftsure Bank', countrycode=u'CA', elevation=u'-', featureclass=u'U', featurecode=u'BNKU', geonameid=4030308, latitude=48.55321, longitude=-125.02235) --- StructType(List(StructField(asciiname,StringType,true),StructField(countrycode,StringType,true),StructField(elevation,StringType,true),StructField(featureclass,StringType,true),StructField(featurecode,StringType,true),StructField(geonameid,LongType,true),StructField(latitude,DoubleType,true),StructField(longitude,DoubleType,true))) --- DataFrame[asciiname: string, countrycode: string, elevation: string, featureclass: string, featurecode: string, geonameid: bigint, latitude: double, longitude: double] == %sql SELECT * FROM geonames LIMIT 1 no such table List(geonames); line 2 pos 5 From: ayan guha guha.a...@gmail.com Sent: May 11, 2015 12:27 AM To: Tyler Mitchell Cc: user Subject: Re: Python - SQL (geonames dataset) Try this Res = ssc.sql(your SQL without limit) Print red.first() Note: your SQL looks wrong as count will need a group by clause. Best Ayan On 11 May 2015 16:22, Tyler Mitchell tyler.mitch...@actian.commailto:tyler.mitch...@actian.com wrote: I'm using Python to setup a dataframe, but for some reason it is not being made available to SQL. Code (from Zeppelin) below. I don't get any error when loading/prepping the data or dataframe. Any tips? (Originally I was not hardcoding the Row() structure, as my other tutorial added it by default, not sure why it didn't work here, but that might be besides the point.) Any guesses greatly appreciated as I dig my teeth in here for the first time. Thanks! --- %pyspark from pyspark.sql.types import Row, StructType, StructField, IntegerType, StringType, DecimalType from os import getcwd sqlContext = SQLContext(sc) datafile = sc.textFile(/Users/tyler/data/geonames/CA.txt) geonames = datafile.map(lambda s: s.split(\t)).map(lambda s: Row( geonameid=int(s[0]), asciiname=str(s[2]), latitude=float(s[4]), longitude=float(s[5]), elevation=str(s[16]), featureclass=str(s[6]), featurecode=str(s[7]), countrycode=str(s[8]) )) gndf = sqlContext.inferSchema(geonames) gndf.registerAsTable(geonames) #print gndf.count() print --- print gndf.columns print --- print gndf.first() print --- gndf.schema OUTPUT [u'asciiname', u'countrycode', u'elevation', u'featureclass', u'featurecode', u'geonameid', u'latitude', u'longitude'] --- Row(asciiname=u'100 Mile House', countrycode=u'CA', elevation=u'928', featureclass=u'P', featurecode=u'PPL', geonameid=5881639, latitude=51.64982, longitude=-121.28594) --- StructType(List(StructField(asciiname,StringType,true),StructField(countrycode,StringType,true),StructField(elevation,StringType,true),StructField(featureclass,StringType,true),StructField(featurecode,StringType,true),StructField(geonameid,LongType,true),StructField(latitude,DoubleType,true),StructField(longitude,DoubleType,true))) = %sql SELECT geonameid, count(1) value FROM geonames LIMIT 1 no such table List(geonames); line 2 pos 5
Re: It takes too long (30 seconds) to create Spark Context with SPARK/YARN
Could you upload the spark assembly to HDFS and then set spark.yarn.jar to the path where you uploaded it? That can help minimize start-up time. How long if you start just a spark shell? On 5/11/15, 11:15 AM, stanley wangshua...@yahoo.com wrote: I am running Spark jobs on YARN cluster. It took ~30 seconds to create a spark context, while it takes only 1-2 seconds running Spark in local mode. The master is set as yarn-client, and both the machine that submits the Spark job and the YARN cluster are in the same domain. Originally I suspected that the following config might play a role, especially spark.scheduler.maxRegisteredResourcesWaitingTime was set to 30 seconds. However, lowering it to 10 makes no difference. Modifications to other two parameters also has no effect. spark.scheduler.maxRegisteredResourcesWaitingTime=1000 (lowered from 3 default) spark.yarn.applicationMaster.waitTries=1 (lowered from 10 default) spark.yarn.scheduler.heartbeat.interval-ms = 1000 (lowered from 5000 default) What could be the possible reason? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/It-takes-too-long-30-seconds-to-create-Spark-Context-with-SPARK-YARN-tp22847.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Does long-lived SparkContext hold on to executor resources?
I am building an analytics app with Spark. I plan to use long-lived SparkContexts to minimize the overhead for creating Spark contexts, which in turn reduces the analytics query response time. The number of queries that are run in the system is relatively small each day. Would long lived contexts hold on to the executor resources when there is no queries running? Is there a way to free executor resources in this type of use cases? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Does-long-lived-SparkContext-hold-on-to-executor-resources-tp22848.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
can we start a new thread in foreachRDD in spark streaming?
I want to start a child-thread in foreachRDD. My situation is: the job is reading from a hdfs dir continuously, and every 100 batches, I want to launch a model training task (I will make a snapshot of the rdds at that time and start the training task. the training task takes a very long time(2 hours), and I don't want the training task influence reading new batch of data. Is starting a new child thread a good solution? Could the child thread use SparkContext in the main thread and use the rdd in main thread? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/can-we-start-a-new-thread-in-foreachRDD-in-spark-streaming-tp22845.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Reading Nested Fields in DataFrames
Typically you would use . notation to access, same way you would access a map. On 12 May 2015 00:06, Ashish Kumar Singh ashish23...@gmail.com wrote: Hi , I am trying to read Nested Avro data in Spark 1.3 using DataFrames. I need help to retrieve the Inner element data in the Structure below. Below is the schema when I enter df.printSchema : |-- THROTTLING_PERCENTAGE: double (nullable = false) |-- IMPRESSION_TYPE: string (nullable = false) |-- campaignArray: array (nullable = false) ||-- element: struct (containsNull = false) |||-- COOKIE: string (nullable = false) |||-- CAMPAIGN_ID: long (nullable = false) How can I access CAMPAIGN_ID field in this schema ? Thanks, Ashish Kr. Singh
Re: SQL UserDefinedType can't be saved in parquet file when using assembly jar
In this example, every thing work expect save to parquet file. On Mon, May 11, 2015 at 4:39 PM, Jaonary Rabarisoa jaon...@gmail.com wrote: MyDenseVectorUDT do exist in the assembly jar and in this example all the code is in a single file to make sure every thing is included. On Tue, Apr 21, 2015 at 1:17 AM, Xiangrui Meng men...@gmail.com wrote: You should check where MyDenseVectorUDT is defined and whether it was on the classpath (or in the assembly jar) at runtime. Make sure the full class name (with package name) is used. Btw, UDTs are not public yet, so please use it with caution. -Xiangrui On Fri, Apr 17, 2015 at 12:45 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Dear all, Here is an example of code to reproduce the issue I mentioned in a previous mail about saving an UserDefinedType into a parquet file. The problem here is that the code works when I run it inside intellij idea but fails when I create the assembly jar and run it with spark-submit. I use the master version of Spark. @SQLUserDefinedType(udt = classOf[MyDenseVectorUDT]) class MyDenseVector(val data: Array[Double]) extends Serializable { override def equals(other: Any): Boolean = other match { case v: MyDenseVector = java.util.Arrays.equals(this.data, v.data) case _ = false } } class MyDenseVectorUDT extends UserDefinedType[MyDenseVector] { override def sqlType: DataType = ArrayType(DoubleType, containsNull = false) override def serialize(obj: Any): Seq[Double] = { obj match { case features: MyDenseVector = features.data.toSeq } } override def deserialize(datum: Any): MyDenseVector = { datum match { case data: Seq[_] = new MyDenseVector(data.asInstanceOf[Seq[Double]].toArray) } } override def userClass: Class[MyDenseVector] = classOf[MyDenseVector] } case class Toto(imageAnnotation: MyDenseVector) object TestUserDefinedType { case class Params(input: String = null, partitions: Int = 12, outputDir: String = images.parquet) def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName(ImportImageFolder).setMaster(local[4]) val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) import sqlContext.implicits._ val rawImages = sc.parallelize((1 to 5).map(x = Toto(new MyDenseVector(Array[Double](x.toDouble).toDF rawImages.printSchema() rawImages.show() rawImages.save(toto.parquet) // This fails with assembly jar sc.stop() } } My build.sbt is as follow : libraryDependencies ++= Seq( org.apache.spark %% spark-core % sparkVersion % provided, org.apache.spark %% spark-sql % sparkVersion, org.apache.spark %% spark-mllib % sparkVersion ) assemblyMergeStrategy in assembly := { case PathList(javax, servlet, xs @ _*) = MergeStrategy.first case PathList(org, apache, xs @ _*) = MergeStrategy.first case PathList(org, jboss, xs @ _*) = MergeStrategy.first // case PathList(ps @ _*) if ps.last endsWith .html = MergeStrategy.first // case application.conf= MergeStrategy.concat case m if m.startsWith(META-INF) = MergeStrategy.discard //case x = // val oldStrategy = (assemblyMergeStrategy in assembly).value // oldStrategy(x) case _ = MergeStrategy.first } As I said, this code works without problem when I execute it inside intellij idea. But when generate the assembly jar with sbt-assembly and use spark-submit I got the following error : 15/04/17 09:34:01 INFO ParquetOutputFormat: Writer version is: PARQUET_1_0 15/04/17 09:34:01 ERROR Executor: Exception in task 3.0 in stage 2.0 (TID 7) java.lang.IllegalArgumentException: Unsupported dataType: {type:struct,fields:[{name:imageAnnotation,type:{type:udt,class:MyDenseVectorUDT,pyClass:null,sqlType:{type:array,elementType:double,containsNull:false}},nullable:true,metadata:{}}]}, [1.1] failure: `TimestampType' expected but `{' found {type:struct,fields:[{name:imageAnnotation,type:{type:udt,class:MyDenseVectorUDT,pyClass:null,sqlType:{type:array,elementType:double,containsNull:false}},nullable:true,metadata:{}}]} ^ at org.apache.spark.sql.types.DataType$CaseClassStringParser$.apply(dataTypes.scala:163) at org.apache.spark.sql.types.DataType$.fromCaseClassString(dataTypes.scala:98) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$6.apply(ParquetTypes.scala:402) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$6.apply(ParquetTypes.scala:402) at scala.util.Try.getOrElse(Try.scala:77) at org.apache.spark.sql.parquet.ParquetTypesConverter$.convertFromString(ParquetTypes.scala:402) at
Re: SQL UserDefinedType can't be saved in parquet file when using assembly jar
MyDenseVectorUDT do exist in the assembly jar and in this example all the code is in a single file to make sure every thing is included. On Tue, Apr 21, 2015 at 1:17 AM, Xiangrui Meng men...@gmail.com wrote: You should check where MyDenseVectorUDT is defined and whether it was on the classpath (or in the assembly jar) at runtime. Make sure the full class name (with package name) is used. Btw, UDTs are not public yet, so please use it with caution. -Xiangrui On Fri, Apr 17, 2015 at 12:45 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Dear all, Here is an example of code to reproduce the issue I mentioned in a previous mail about saving an UserDefinedType into a parquet file. The problem here is that the code works when I run it inside intellij idea but fails when I create the assembly jar and run it with spark-submit. I use the master version of Spark. @SQLUserDefinedType(udt = classOf[MyDenseVectorUDT]) class MyDenseVector(val data: Array[Double]) extends Serializable { override def equals(other: Any): Boolean = other match { case v: MyDenseVector = java.util.Arrays.equals(this.data, v.data) case _ = false } } class MyDenseVectorUDT extends UserDefinedType[MyDenseVector] { override def sqlType: DataType = ArrayType(DoubleType, containsNull = false) override def serialize(obj: Any): Seq[Double] = { obj match { case features: MyDenseVector = features.data.toSeq } } override def deserialize(datum: Any): MyDenseVector = { datum match { case data: Seq[_] = new MyDenseVector(data.asInstanceOf[Seq[Double]].toArray) } } override def userClass: Class[MyDenseVector] = classOf[MyDenseVector] } case class Toto(imageAnnotation: MyDenseVector) object TestUserDefinedType { case class Params(input: String = null, partitions: Int = 12, outputDir: String = images.parquet) def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName(ImportImageFolder).setMaster(local[4]) val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) import sqlContext.implicits._ val rawImages = sc.parallelize((1 to 5).map(x = Toto(new MyDenseVector(Array[Double](x.toDouble).toDF rawImages.printSchema() rawImages.show() rawImages.save(toto.parquet) // This fails with assembly jar sc.stop() } } My build.sbt is as follow : libraryDependencies ++= Seq( org.apache.spark %% spark-core % sparkVersion % provided, org.apache.spark %% spark-sql % sparkVersion, org.apache.spark %% spark-mllib % sparkVersion ) assemblyMergeStrategy in assembly := { case PathList(javax, servlet, xs @ _*) = MergeStrategy.first case PathList(org, apache, xs @ _*) = MergeStrategy.first case PathList(org, jboss, xs @ _*) = MergeStrategy.first // case PathList(ps @ _*) if ps.last endsWith .html = MergeStrategy.first // case application.conf= MergeStrategy.concat case m if m.startsWith(META-INF) = MergeStrategy.discard //case x = // val oldStrategy = (assemblyMergeStrategy in assembly).value // oldStrategy(x) case _ = MergeStrategy.first } As I said, this code works without problem when I execute it inside intellij idea. But when generate the assembly jar with sbt-assembly and use spark-submit I got the following error : 15/04/17 09:34:01 INFO ParquetOutputFormat: Writer version is: PARQUET_1_0 15/04/17 09:34:01 ERROR Executor: Exception in task 3.0 in stage 2.0 (TID 7) java.lang.IllegalArgumentException: Unsupported dataType: {type:struct,fields:[{name:imageAnnotation,type:{type:udt,class:MyDenseVectorUDT,pyClass:null,sqlType:{type:array,elementType:double,containsNull:false}},nullable:true,metadata:{}}]}, [1.1] failure: `TimestampType' expected but `{' found {type:struct,fields:[{name:imageAnnotation,type:{type:udt,class:MyDenseVectorUDT,pyClass:null,sqlType:{type:array,elementType:double,containsNull:false}},nullable:true,metadata:{}}]} ^ at org.apache.spark.sql.types.DataType$CaseClassStringParser$.apply(dataTypes.scala:163) at org.apache.spark.sql.types.DataType$.fromCaseClassString(dataTypes.scala:98) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$6.apply(ParquetTypes.scala:402) at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$6.apply(ParquetTypes.scala:402) at scala.util.Try.getOrElse(Try.scala:77) at org.apache.spark.sql.parquet.ParquetTypesConverter$.convertFromString(ParquetTypes.scala:402) at org.apache.spark.sql.parquet.RowWriteSupport.init(ParquetTableSupport.scala:145) at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:278) at
Re: can we start a new thread in foreachRDD in spark streaming?
It depends on how you want to run your application. You can always save 100 batch as a data file and run another app to read those files. In that case you have separated contexts and you will find both application running simultaneously in the cluster but on different JVMs. But if you do not want to use separate process you can use the same context and then training tasks will run on same JVM as the streaming. Basically in first option you are using batch and real time pipeline of lambda architecture whereas in second option you are doing everything in real time pipeline. Best Ayan On 12 May 2015 00:08, hotdog lisend...@163.com wrote: I want to start a child-thread in foreachRDD. My situation is: the job is reading from a hdfs dir continuously, and every 100 batches, I want to launch a model training task (I will make a snapshot of the rdds at that time and start the training task. the training task takes a very long time(2 hours), and I don't want the training task influence reading new batch of data. Is starting a new child thread a good solution? Could the child thread use SparkContext in the main thread and use the rdd in main thread? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/can-we-start-a-new-thread-in-foreachRDD-in-spark-streaming-tp22845.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: EVent generation
I've had good success with splunk generator. https://github.com/coccyx/eventgen/blob/master/README.md On May 11, 2015, at 00:05, Akhil Das ak...@sigmoidanalytics.commailto:ak...@sigmoidanalytics.com wrote: Have a look over here https://storm.apache.org/community.html Thanks Best Regards On Sun, May 10, 2015 at 3:21 PM, anshu shukla anshushuk...@gmail.commailto:anshushuk...@gmail.com wrote: http://stackoverflow.com/questions/30149868/generate-events-tuples-using-csv-file-with-timestamps -- Thanks Regards, Anshu Shukla
Stratified sampling with DataFrames
Hi, I'm in Spark 1.3.0 and my data is in DataFrames. I need operations like sampleByKey(), sampleByKeyExact(). I saw the JIRA Add approximate stratified sampling to DataFrame ( https://issues.apache.org/jira/browse/SPARK-7157). That's targeted for Spark 1.5, till that comes through, whats the easiest way to accomplish the equivalent of sampleByKey() and sampleByKeyExact() on DataFrames. Thanks Regards MK
Met a problem when using spark to load parquet files with different version schemas
Hi, devs, I met a problem when using spark to read to parquet files with two different versions of schemas. For example, the first file has one field with int type, while the same field in the second file is a long. I thought spark would automatically generate a merged schema long, and use that schema to process both files. However, the following code cannot work: DataFrame df = sqlContext.parquetFile(inputPath); df.registerTempTable(data); sqlContext.sql(select count(msg.actual_eta) from data).collect(); Exception: parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file f1.parquet at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:228) at parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:201) BTW, I use spark 1.3.1, and already set spark.sql.parquet.useDataSourceApi to false. Any help would be appreciated. -Wei
Looking inside the 'mapPartitions' transformation, some confused observations
As we all know, a partition in Spark is actually an Iterator[T]. For some purpose, I want to treat each partition not an Iterator but one whole object. For example, treat Iterator[Int] to a breeze.linalg.DenseVector[Int]. Thus I use 'mapPartitions' API to achieve this, however, during the implementation, I observed some confused observations. I use Spark 1.3.0 on 10 executor nodes cluster, below is different attempts: /import breeze.linalg.DenseVector/ val a = sc.parallelize( 1 to 100, 10) val b = a.mapPartitions(iter ={val v = Array.ofDim[Int](iter.size) var ind = 0 while(iter.hasNext){ v(ind) = iter.next ind += 1 } println(v.mkString(,)) Iterator.single[DenseVector[Int]](DenseVector(v))} ) b.count() val c = a.mapPartitions(iter ={val v = Array.ofDim[Int](iter.size) iter.copyToArray(v, 0, 10) println(v.mkString(,)) Iterator.single[DenseVector[Int]](DenseVector(v))} ) c.count() val d = a.mapPartitions(iter ={val v = iter.toArray println(v.mkString(,)) Iterator.single[DenseVector[Int]](DenseVector(v))} ) d.count() I can see the printed output in the executor's stdout, actually only attempt 'd' satisfy my needs, and other attempts only get a zero desevector, which means the variable assignment from iterator to vector did not happen. Hope for explanations for these observations. Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Looking-inside-the-mapPartitions-transformation-some-confused-observations-tp22850.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Getting error running MLlib example with new cluster
Got it to work on the cluster by changing the master to yarn-cluster instead of local! I do have a couple follow up questions... This is the example I was trying to run:https://github.com/holdenk/learning-spark-examples/blob/master/src/main/scala/com/oreilly/learningsparkexamples/scala/MLlib.scala 1) The example still takes about 1 min 15 seconds to run (my cluster has 3 m3.large nodes). This seems really long for building a model based off data that is about 10 lines long. Is this normal? 2) Any guesses as to why it was able to run in the cluster, but not locally? Thanks for the help! On Mon, Apr 27, 2015 at 11:48 AM, Su She suhsheka...@gmail.com wrote: Hello Xiangrui, I am using this spark-submit command (as I do for all other jobs): /opt/cloudera/parcels/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/spark/bin/spark-submit --class MLlib --master local[2] --jars $(echo /home/ec2-user/sparkApps/learning-spark/lib/*.jar | tr ' ' ',') /home/ec2-user/sparkApps/learning-spark/target/simple-project-1.1.jar Thank you for the help! Best, Su On Mon, Apr 27, 2015 at 9:58 AM, Xiangrui Meng men...@gmail.com wrote: How did you run the example app? Did you use spark-submit? -Xiangrui On Thu, Apr 23, 2015 at 2:27 PM, Su She suhsheka...@gmail.com wrote: Sorry, accidentally sent the last email before finishing. I had asked this question before, but wanted to ask again as I think it is now related to my pom file or project setup. Really appreciate the help! I have been trying on/off for the past month to try to run this MLlib example: https://github.com/databricks/learning-spark/blob/master/src/main/scala/com/oreilly/learningsparkexamples/scala/MLlib.scala I am able to build the project successfully. When I run it, it returns: features in spam: 8 features in ham: 7 and then freezes. According to the UI, the description of the job is count at DataValidators.scala.38. This corresponds to this line in the code: val model = lrLearner.run(trainingData) I've tried just about everything I can think of...changed numFeatures from 1 - 10,000, set executor memory to 1g, set up a new cluster, at this point I think I might have missed dependencies as that has usually been the problem in other spark apps I have tried to run. This is my pom file, that I have used for other successful spark apps. Please let me know if you think I need any additional dependencies or there are incompatibility issues, or a pom.xml that is better to use. Thank you! Cluster information: Spark version: 1.2.0-SNAPSHOT (in my older cluster it is 1.2.0) java version 1.7.0_25 Scala version: 2.10.4 hadoop version: hadoop 2.5.0-cdh5.3.3 (older cluster was 5.3.0) project xmlns = http://maven.apache.org/POM/4.0.0; xmlns:xsi=http://w3.org/2001/XMLSchema-instance; xsi:schemaLocation =http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd; groupId edu.berkely/groupId artifactId simple-project /artifactId modelVersion 4.0.0/modelVersion name Simple Project /name packaging jar /packaging version 1.0 /version repositories repository idcloudera/id url http://repository.cloudera.com/artifactory/cloudera-repos//url /repository repository idscala-tools.org/id nameScala-tools Maven2 Repository/name urlhttp://scala-tools.org/repo-releases/url /repository /repositories pluginRepositories pluginRepository idscala-tools.org/id nameScala-tools Maven2 Repository/name urlhttp://scala-tools.org/repo-releases/url /pluginRepository /pluginRepositories build plugins plugin groupIdorg.scala-tools/groupId artifactIdmaven-scala-plugin/artifactId executions execution idcompile/id goals goalcompile/goal /goals phasecompile/phase /execution execution idtest-compile/id goals goaltestCompile/goal /goals phasetest-compile/phase /execution execution phaseprocess-resources/phase goals goalcompile/goal /goals /execution /executions /plugin plugin
Re: JAVA for SPARK certification
Note that O'Reilly Media has test prep materials in development. The exam does include questions in Scala, Python, Java, and SQL -- and frankly a number of the questions are about comparing or identifying equivalent Spark techniques between two of those different languages. The questions do not go into much detail for any of the languages Scala, Python, Java -- emphasis is on using Spark, not language nuances. Also, there is no coding required: the questions typically to have several code blocks and you select among them to identify the best answer. Overall the exam is targeted at an intermediate Spark user, with background as an application developer in either Python, Scala, or Java, and some familiarity with Big Data and distributed systems. Those were the requirements. Five different practice areas assess the following: 1. Understanding breadth of Spark API usage across Scala, Java, Python 2. Applying best practices to avoid runtime issues and performance bottlenecks 3. Distinguishing Spark features and practices from MapReduce usage 4. Integrating SQL, Streaming, ML, Graph atop the Spark unified engine 5. Solving typical use cases with Spark in Scala, Java, Python Understanding how Spark operates in a production environment is a major emphasis of the exam. Understanding about typical kinds of serialization exceptions is also important. If you look through earlier talks about Spark best practices from Matei Zaharia, Patrick Wendell, Reynold Xin, et al., those are great for prep. BTW, the Kryterion platform allows a person taking the test to mark questions for revisiting later. So if a problem seems difficult or non-intuitive, just mark it and move on the to the next one. Then come back to the marked ones later. Some questions are easier, some are harder, but the order is always randomized. There's a 90 minute limit; however, most people who pass finish within 60 minutes. So if you use mark for later review you'll likely have time at the end to revisit the harder questions. On Tue, May 5, 2015 at 7:05 AM, Gourav Sengupta gourav.sengu...@gmail.com wrote: Hi, I think all the required materials for reference are mentioned here: http://www.oreilly.com/data/sparkcert.html?cmp=ex-strata-na-lp-na_apache_spark_certification My question was regarding the proficiency level required for Java. There are detailed examples and code mentioned for JAVA, Python and Scala in most of the SCALA tutorials mentioned in the above link for reference. Regards, Gourav On Tue, May 5, 2015 at 3:03 PM, ayan guha guha.a...@gmail.com wrote: Very interested @Kartik/Zoltan. Please let me know how to connect on LI On Tue, May 5, 2015 at 11:47 PM, Zoltán Zvara zoltan.zv...@gmail.com wrote: I might join in to this conversation with an ask. Would someone point me to a decent exercise that would approximate the level of this exam (from above)? Thanks! On Tue, May 5, 2015 at 3:37 PM Kartik Mehta kartik.meht...@gmail.com wrote: Production - not whole lot of companies have implemented Spark in production and so though it is good to have, not must. If you are on LinkedIn, a group of folks including myself are preparing for Spark certification, learning in group makes learning easy and fun. Kartik On May 5, 2015 7:31 AM, ayan guha guha.a...@gmail.com wrote: And how important is to have production environment? On 5 May 2015 20:51, Stephen Boesch java...@gmail.com wrote: There are questions in all three languages. 2015-05-05 3:49 GMT-07:00 Kartik Mehta kartik.meht...@gmail.com: I too have similar question. My understanding is since Spark written in scala, having done in Scala will be ok for certification. If someone who has done certification can confirm. Thanks, Kartik On May 5, 2015 5:57 AM, Gourav Sengupta gourav.sengu...@gmail.com wrote: Hi, how important is JAVA for Spark certification? Will learning only Python and Scala not work? Regards, Gourav -- Best Regards, Ayan Guha
Re: Kafka stream fails: java.lang.NoClassDefFound com/yammer/metrics/core/Gauge
You can use '--jars ' option of spark-submit to ship metrics-core jar. Cheers On Mon, May 11, 2015 at 2:04 PM, Lee McFadden splee...@gmail.com wrote: Thanks Ted, The issue is that I'm using packages (see spark-submit definition) and I do not know how to add com.yammer.metrics:metrics-core to my classpath so Spark can see it. Should metrics-core not be part of the org.apache.spark:spark-streaming-kafka_2.10:1.3.1 package so it can work correctly? If not, any clues as to how I can add metrics-core to my project (bearing in mind that I'm using Python, not a JVM language) would be much appreciated. Thanks, and apologies for my newbness with Java/Scala. On Mon, May 11, 2015 at 1:42 PM Ted Yu yuzhih...@gmail.com wrote: com.yammer.metrics.core.Gauge is in metrics-core jar e.g., in master branch: [INFO] | \- org.apache.kafka:kafka_2.10:jar:0.8.1.1:compile [INFO] | +- com.yammer.metrics:metrics-core:jar:2.2.0:compile Please make sure metrics-core jar is on the classpath. On Mon, May 11, 2015 at 1:32 PM, Lee McFadden splee...@gmail.com wrote: Hi, We've been having some issues getting spark streaming running correctly using a Kafka stream, and we've been going around in circles trying to resolve this dependency. Details of our environment and the error below, if anyone can help resolve this it would be much appreciated. Submit command line: /home/ubuntu/spark/spark-1.3.1/bin/spark-submit \ --packages TargetHolding/pyspark-cassandra:0.1.4,org.apache.spark:spark-streaming-kafka_2.10:1.3.1 \ --conf spark.cassandra.connection.host=10.10.103.172,10.10.102.160,10.10.101.79 \ --master spark://127.0.0.1:7077 \ affected_hosts.py When we run the streaming job everything starts just fine, then we see the following in the logs: 15/05/11 19:50:46 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 70, ip-10-10-102-53.us-west-2.compute.internal): java.lang.NoClassDefFoundError: com/yammer/metrics/core/Gauge at kafka.consumer.ZookeeperConsumerConnector.createFetcher(ZookeeperConsumerConnector.scala:151) at kafka.consumer.ZookeeperConsumerConnector.init(ZookeeperConsumerConnector.scala:115) at kafka.consumer.ZookeeperConsumerConnector.init(ZookeeperConsumerConnector.scala:128) at kafka.consumer.Consumer$.create(ConsumerConnector.scala:89) at org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:100) at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121) at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:298) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:290) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ClassNotFoundException: com.yammer.metrics.core.Gauge at java.net.URLClassLoader$1.run(URLClassLoader.java:372) at java.net.URLClassLoader$1.run(URLClassLoader.java:361) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:360) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 17 more
Re: JAVA for SPARK certification
Awesome!! Thank you Mr. Nathan, Great to have a guide like you and helping us all, Regards, Kartik On May 11, 2015 5:07 PM, Paco Nathan cet...@gmail.com wrote: Note that O'Reilly Media has test prep materials in development. The exam does include questions in Scala, Python, Java, and SQL -- and frankly a number of the questions are about comparing or identifying equivalent Spark techniques between two of those different languages. The questions do not go into much detail for any of the languages Scala, Python, Java -- emphasis is on using Spark, not language nuances. Also, there is no coding required: the questions typically to have several code blocks and you select among them to identify the best answer. Overall the exam is targeted at an intermediate Spark user, with background as an application developer in either Python, Scala, or Java, and some familiarity with Big Data and distributed systems. Those were the requirements. Five different practice areas assess the following: 1. Understanding breadth of Spark API usage across Scala, Java, Python 2. Applying best practices to avoid runtime issues and performance bottlenecks 3. Distinguishing Spark features and practices from MapReduce usage 4. Integrating SQL, Streaming, ML, Graph atop the Spark unified engine 5. Solving typical use cases with Spark in Scala, Java, Python Understanding how Spark operates in a production environment is a major emphasis of the exam. Understanding about typical kinds of serialization exceptions is also important. If you look through earlier talks about Spark best practices from Matei Zaharia, Patrick Wendell, Reynold Xin, et al., those are great for prep. BTW, the Kryterion platform allows a person taking the test to mark questions for revisiting later. So if a problem seems difficult or non-intuitive, just mark it and move on the to the next one. Then come back to the marked ones later. Some questions are easier, some are harder, but the order is always randomized. There's a 90 minute limit; however, most people who pass finish within 60 minutes. So if you use mark for later review you'll likely have time at the end to revisit the harder questions. On Tue, May 5, 2015 at 7:05 AM, Gourav Sengupta gourav.sengu...@gmail.com wrote: Hi, I think all the required materials for reference are mentioned here: http://www.oreilly.com/data/sparkcert.html?cmp=ex-strata-na-lp-na_apache_spark_certification My question was regarding the proficiency level required for Java. There are detailed examples and code mentioned for JAVA, Python and Scala in most of the SCALA tutorials mentioned in the above link for reference. Regards, Gourav On Tue, May 5, 2015 at 3:03 PM, ayan guha guha.a...@gmail.com wrote: Very interested @Kartik/Zoltan. Please let me know how to connect on LI On Tue, May 5, 2015 at 11:47 PM, Zoltán Zvara zoltan.zv...@gmail.com wrote: I might join in to this conversation with an ask. Would someone point me to a decent exercise that would approximate the level of this exam (from above)? Thanks! On Tue, May 5, 2015 at 3:37 PM Kartik Mehta kartik.meht...@gmail.com wrote: Production - not whole lot of companies have implemented Spark in production and so though it is good to have, not must. If you are on LinkedIn, a group of folks including myself are preparing for Spark certification, learning in group makes learning easy and fun. Kartik On May 5, 2015 7:31 AM, ayan guha guha.a...@gmail.com wrote: And how important is to have production environment? On 5 May 2015 20:51, Stephen Boesch java...@gmail.com wrote: There are questions in all three languages. 2015-05-05 3:49 GMT-07:00 Kartik Mehta kartik.meht...@gmail.com: I too have similar question. My understanding is since Spark written in scala, having done in Scala will be ok for certification. If someone who has done certification can confirm. Thanks, Kartik On May 5, 2015 5:57 AM, Gourav Sengupta gourav.sengu...@gmail.com wrote: Hi, how important is JAVA for Spark certification? Will learning only Python and Scala not work? Regards, Gourav -- Best Regards, Ayan Guha
Re: Running Spark in local mode seems to ignore local[N]
Sean, How does this model actually work? Let's say we want to run one job as N threads executing one particular task, e.g. streaming data out of Kafka into a search engine. How do we configure our Spark job execution? Right now, I'm seeing this job running as a single thread. And it's quite a bit slower than just running a simple utility with a thread executor with a thread pool of N threads doing the same task. The performance I'm seeing of running the Kafka-Spark Streaming job is 7 times slower than that of the utility. What's pulling Spark back? Thanks. On Mon, May 11, 2015 at 4:55 PM, Sean Owen so...@cloudera.com wrote: You have one worker with one executor with 32 execution slots. On Mon, May 11, 2015 at 9:52 PM, dgoldenberg dgoldenberg...@gmail.com wrote: Hi, Is there anything special one must do, running locally and submitting a job like so: spark-submit \ --class com.myco.Driver \ --master local[*] \ ./lib/myco.jar In my logs, I'm only seeing log messages with the thread identifier of Executor task launch worker-0. There are 4 cores on the machine so I expected 4 threads to be at play. Running with local[32] did not yield 32 worker threads. Any recommendations? Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Running-Spark-in-local-mode-seems-to-ignore-local-N-tp22851.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Running Spark in local mode seems to ignore local[N]
BTW I think my comment was wrong as marcelo demonstrated. In standalone mode you'd have one worker, and you do have one executor, but his explanation is right. But, you certainly have execution slots for each core. Are you talking about your own user code? you can make threads, but that's nothing do with Spark then. If you run code on your driver, it's not distributed. If you run Spark over an RDD with 1 partition, only one task works on it. On Mon, May 11, 2015 at 10:16 PM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: Sean, How does this model actually work? Let's say we want to run one job as N threads executing one particular task, e.g. streaming data out of Kafka into a search engine. How do we configure our Spark job execution? Right now, I'm seeing this job running as a single thread. And it's quite a bit slower than just running a simple utility with a thread executor with a thread pool of N threads doing the same task. The performance I'm seeing of running the Kafka-Spark Streaming job is 7 times slower than that of the utility. What's pulling Spark back? Thanks. On Mon, May 11, 2015 at 4:55 PM, Sean Owen so...@cloudera.com wrote: You have one worker with one executor with 32 execution slots. On Mon, May 11, 2015 at 9:52 PM, dgoldenberg dgoldenberg...@gmail.com wrote: Hi, Is there anything special one must do, running locally and submitting a job like so: spark-submit \ --class com.myco.Driver \ --master local[*] \ ./lib/myco.jar In my logs, I'm only seeing log messages with the thread identifier of Executor task launch worker-0. There are 4 cores on the machine so I expected 4 threads to be at play. Running with local[32] did not yield 32 worker threads. Any recommendations? Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Running-Spark-in-local-mode-seems-to-ignore-local-N-tp22851.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Is the AMP lab done next February?
Relaying an answer from AMP director Mike Franklin: One year into the lab we got a 5 yr Expeditions in Computing Award as part of the White House Big Data initiative in 2012, so we extend the lab for a year. We intend to start winding it down at the end of 2016, while supporting existing projects and students who will be finishing up. The AMPLab faculty are starting discussions this summer about what research challenges we'd like to tackle next, and how best to organize to do so. An interesting thing to note is that the Spark project started at about this point in the AMPLab predecessor project (RADLab) so we have a track record of being able to make these transitions. On Sat, May 9, 2015 at 8:43 PM, Justin Pihony justin.pih...@gmail.com wrote: From my StackOverflow question https://stackoverflow.com/questions/29593139/is-the-amp-lab-done-next-february : Is there a way to track whether Berkeley's AMP lab will indeed shutdown next year? From their about site: The AMPLab is a five-year collaborative effort at UC Berkeley and it was started in February 2011. So, I was curious if this was a hard date, or if it will be extended (or has already been extended?) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Is-the-AMP-lab-done-next-February-tp22832.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Running Spark in local mode seems to ignore local[N]
Thanks, Sean. This was not yet digested data for me :) The number of partitions in a streaming RDD is determined by the block interval and the batch interval. I have seen the bit on spark.streaming.blockInterval in the doc but I didn't connect it with the batch interval and the number of partitions. On Mon, May 11, 2015 at 5:34 PM, Sean Owen so...@cloudera.com wrote: You might have a look at the Spark docs to start. 1 batch = 1 RDD, but 1 RDD can have many partitions. And should, for scale. You do not submit multiple jobs to get parallelism. The number of partitions in a streaming RDD is determined by the block interval and the batch interval. If you have a batch interval of 10s and block interval of 1s you'll get 10 partitions of data in the RDD. On Mon, May 11, 2015 at 10:29 PM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: Understood. We'll use the multi-threaded code we already have.. How are these execution slots filled up? I assume each slot is dedicated to one submitted task. If that's the case, how is each task distributed then, i.e. how is that task run in a multi-node fashion? Say 1000 batches/RDD's are extracted out of Kafka, how does that relate to the number of executors vs. task slots? Presumably we can fill up the slots with multiple instances of the same task... How do we know how many to launch? On Mon, May 11, 2015 at 5:20 PM, Sean Owen so...@cloudera.com wrote: BTW I think my comment was wrong as marcelo demonstrated. In standalone mode you'd have one worker, and you do have one executor, but his explanation is right. But, you certainly have execution slots for each core. Are you talking about your own user code? you can make threads, but that's nothing do with Spark then. If you run code on your driver, it's not distributed. If you run Spark over an RDD with 1 partition, only one task works on it. On Mon, May 11, 2015 at 10:16 PM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: Sean, How does this model actually work? Let's say we want to run one job as N threads executing one particular task, e.g. streaming data out of Kafka into a search engine. How do we configure our Spark job execution? Right now, I'm seeing this job running as a single thread. And it's quite a bit slower than just running a simple utility with a thread executor with a thread pool of N threads doing the same task. The performance I'm seeing of running the Kafka-Spark Streaming job is 7 times slower than that of the utility. What's pulling Spark back? Thanks. On Mon, May 11, 2015 at 4:55 PM, Sean Owen so...@cloudera.com wrote: You have one worker with one executor with 32 execution slots. On Mon, May 11, 2015 at 9:52 PM, dgoldenberg dgoldenberg...@gmail.com wrote: Hi, Is there anything special one must do, running locally and submitting a job like so: spark-submit \ --class com.myco.Driver \ --master local[*] \ ./lib/myco.jar In my logs, I'm only seeing log messages with the thread identifier of Executor task launch worker-0. There are 4 cores on the machine so I expected 4 threads to be at play. Running with local[32] did not yield 32 worker threads. Any recommendations? Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Running-Spark-in-local-mode-seems-to-ignore-local-N-tp22851.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Reading Nested Fields in DataFrames
Since there is an array here you are probably looking for HiveQL's LATERAL VIEW explode https://cwiki.apache.org/confluence/display/Hive/LanguageManual+LateralView . On Mon, May 11, 2015 at 7:12 AM, ayan guha guha.a...@gmail.com wrote: Typically you would use . notation to access, same way you would access a map. On 12 May 2015 00:06, Ashish Kumar Singh ashish23...@gmail.com wrote: Hi , I am trying to read Nested Avro data in Spark 1.3 using DataFrames. I need help to retrieve the Inner element data in the Structure below. Below is the schema when I enter df.printSchema : |-- THROTTLING_PERCENTAGE: double (nullable = false) |-- IMPRESSION_TYPE: string (nullable = false) |-- campaignArray: array (nullable = false) ||-- element: struct (containsNull = false) |||-- COOKIE: string (nullable = false) |||-- CAMPAIGN_ID: long (nullable = false) How can I access CAMPAIGN_ID field in this schema ? Thanks, Ashish Kr. Singh
Re: Met a problem when using spark to load parquet files with different version schemas
BTW, I use spark 1.3.1, and already set spark.sql.parquet.useDataSourceApi to false. Schema merging is only supported when this flag is set to true (setting it to false uses old code that will be removed once the new code is proven).
Re: Get a list of temporary RDD tables via Thrift
Temporary tables are not displayed by SHOW TABLES until Spark 1.3. On Mon, May 11, 2015 at 12:54 PM, Judy Nash judyn...@exchange.microsoft.com wrote: Hi, How can I get a list of temporary tables via Thrift? Have used thrift’s startWithContext and registered a temp table, but not seeing the temp table/rdd when running “show tables”. Thanks, Judy
Re: Looking inside the 'mapPartitions' transformation, some confused observations
I believe the issue in b and c is that you call iter.size which actually is going to flush the iterator so the subsequent attempt to put it into a vector will yield 0 items. You could use an ArrayBuilder for example and not need to rely on knowing the size of the iterator. On Mon, May 11, 2015 at 2:26 PM, myasuka myas...@live.com wrote: As we all know, a partition in Spark is actually an Iterator[T]. For some purpose, I want to treat each partition not an Iterator but one whole object. For example, treat Iterator[Int] to a breeze.linalg.DenseVector[Int]. Thus I use 'mapPartitions' API to achieve this, however, during the implementation, I observed some confused observations. I use Spark 1.3.0 on 10 executor nodes cluster, below is different attempts: /import breeze.linalg.DenseVector/ val a = sc.parallelize( 1 to 100, 10) val b = a.mapPartitions(iter ={val v = Array.ofDim[Int](iter.size) var ind = 0 while(iter.hasNext){ v(ind) = iter.next ind += 1 } println(v.mkString(,)) Iterator.single[DenseVector[Int]](DenseVector(v))} ) b.count() val c = a.mapPartitions(iter ={val v = Array.ofDim[Int](iter.size) iter.copyToArray(v, 0, 10) println(v.mkString(,)) Iterator.single[DenseVector[Int]](DenseVector(v))} ) c.count() val d = a.mapPartitions(iter ={val v = iter.toArray println(v.mkString(,)) Iterator.single[DenseVector[Int]](DenseVector(v))} ) d.count() I can see the printed output in the executor's stdout, actually only attempt 'd' satisfy my needs, and other attempts only get a zero desevector, which means the variable assignment from iterator to vector did not happen. Hope for explanations for these observations. Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Looking-inside-the-mapPartitions-transformation-some-confused-observations-tp22850.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Getting error running MLlib example with new cluster
That is mostly the YARN overhead. You're starting up a container for the AM and executors, at least. That still sounds pretty slow, but the defaults aren't tuned for fast startup. On May 11, 2015 7:00 PM, Su She suhsheka...@gmail.com wrote: Got it to work on the cluster by changing the master to yarn-cluster instead of local! I do have a couple follow up questions... This is the example I was trying to run: https://github.com/holdenk/learning-spark-examples/blob/master/src/main/scala/com/oreilly/learningsparkexamples/scala/MLlib.scala 1) The example still takes about 1 min 15 seconds to run (my cluster has 3 m3.large nodes). This seems really long for building a model based off data that is about 10 lines long. Is this normal? 2) Any guesses as to why it was able to run in the cluster, but not locally? Thanks for the help! On Mon, Apr 27, 2015 at 11:48 AM, Su She suhsheka...@gmail.com wrote: Hello Xiangrui, I am using this spark-submit command (as I do for all other jobs): /opt/cloudera/parcels/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/spark/bin/spark-submit --class MLlib --master local[2] --jars $(echo /home/ec2-user/sparkApps/learning-spark/lib/*.jar | tr ' ' ',') /home/ec2-user/sparkApps/learning-spark/target/simple-project-1.1.jar Thank you for the help! Best, Su On Mon, Apr 27, 2015 at 9:58 AM, Xiangrui Meng men...@gmail.com wrote: How did you run the example app? Did you use spark-submit? -Xiangrui On Thu, Apr 23, 2015 at 2:27 PM, Su She suhsheka...@gmail.com wrote: Sorry, accidentally sent the last email before finishing. I had asked this question before, but wanted to ask again as I think it is now related to my pom file or project setup. Really appreciate the help! I have been trying on/off for the past month to try to run this MLlib example: https://github.com/databricks/learning-spark/blob/master/src/main/scala/com/oreilly/learningsparkexamples/scala/MLlib.scala I am able to build the project successfully. When I run it, it returns: features in spam: 8 features in ham: 7 and then freezes. According to the UI, the description of the job is count at DataValidators.scala.38. This corresponds to this line in the code: val model = lrLearner.run(trainingData) I've tried just about everything I can think of...changed numFeatures from 1 - 10,000, set executor memory to 1g, set up a new cluster, at this point I think I might have missed dependencies as that has usually been the problem in other spark apps I have tried to run. This is my pom file, that I have used for other successful spark apps. Please let me know if you think I need any additional dependencies or there are incompatibility issues, or a pom.xml that is better to use. Thank you! Cluster information: Spark version: 1.2.0-SNAPSHOT (in my older cluster it is 1.2.0) java version 1.7.0_25 Scala version: 2.10.4 hadoop version: hadoop 2.5.0-cdh5.3.3 (older cluster was 5.3.0) project xmlns = http://maven.apache.org/POM/4.0.0; xmlns:xsi=http://w3.org/2001/XMLSchema-instance; xsi:schemaLocation =http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd; groupId edu.berkely/groupId artifactId simple-project /artifactId modelVersion 4.0.0/modelVersion name Simple Project /name packaging jar /packaging version 1.0 /version repositories repository idcloudera/id url http://repository.cloudera.com/artifactory/cloudera-repos//url /repository repository idscala-tools.org/id nameScala-tools Maven2 Repository/name urlhttp://scala-tools.org/repo-releases/url /repository /repositories pluginRepositories pluginRepository idscala-tools.org/id nameScala-tools Maven2 Repository/name urlhttp://scala-tools.org/repo-releases/url /pluginRepository /pluginRepositories build plugins plugin groupIdorg.scala-tools/groupId artifactIdmaven-scala-plugin/artifactId executions execution idcompile/id goals goalcompile/goal /goals phasecompile/phase /execution execution idtest-compile/id goals goaltestCompile/goal /goals phasetest-compile/phase
Re: large volume spark job spends most of the time in AppendOnlyMap.changeValue
Looks like it is spending a lot of time doing hash probing. It could be a number of the following: 1. hash probing itself is inherently expensive compared with rest of your workload 2. murmur3 doesn't work well with this key distribution 3. quadratic probing (triangular sequence) with a power-of-2 hash table works really badly for this workload. One way to test this is to instrument changeValue function to store the number of probes in total, and then log it. We added this probing capability to the new Bytes2Bytes hash map we built. We should consider just having it being reported as some built-in metrics to facilitate debugging. https://github.com/apache/spark/blob/b83091ae4589feea78b056827bc3b7659d271e41/unsafe/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java#L214 On Mon, May 11, 2015 at 4:21 AM, Michal Haris michal.ha...@visualdna.com wrote: This is the stack trace of the worker thread: org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:150) org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32) org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130) org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:60) org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:46) org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) org.apache.spark.rdd.RDD.iterator(RDD.scala:244) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) org.apache.spark.rdd.RDD.iterator(RDD.scala:244) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) org.apache.spark.rdd.RDD.iterator(RDD.scala:244) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70) org.apache.spark.rdd.RDD.iterator(RDD.scala:242) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) org.apache.spark.scheduler.Task.run(Task.scala:64) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) On 8 May 2015 at 22:12, Josh Rosen rosenvi...@gmail.com wrote: Do you have any more specific profiling data that you can share? I'm curious to know where AppendOnlyMap.changeValue is being called from. On Fri, May 8, 2015 at 1:26 PM, Michal Haris michal.ha...@visualdna.com wrote: +dev On 6 May 2015 10:45, Michal Haris michal.ha...@visualdna.com wrote: Just wanted to check if somebody has seen similar behaviour or knows what we might be doing wrong. We have a relatively complex spark application which processes half a terabyte of data at various stages. We have profiled it in several ways and everything seems to point to one place where 90% of the time is spent: AppendOnlyMap.changeValue. The job scales and is relatively faster than its map-reduce alternative but it still feels slower than it should be. I am suspecting too much spill but I haven't seen any improvement by increasing number of partitions to 10k. Any idea would be appreciated. -- Michal Haris Technical Architect direct line: +44 (0) 207 749 0229 www.visualdna.com | t: +44 (0) 207 734 7033, -- Michal Haris Technical Architect direct line: +44 (0) 207 749 0229 www.visualdna.com | t: +44 (0) 207 734 7033,
Re: [SparkSQL 1.4.0] groupBy columns are always nullable?
Hi Haopu, actually here `key` is nullable because this is your input's schema : scala result.printSchema root |-- key: string (nullable = true) |-- SUM(value): long (nullable = true) scala df.printSchema root |-- key: string (nullable = true) |-- value: long (nullable = false) I tried it with a schema where the key is not flagged as nullable, and the schema is actually respected. What you can argue however is that SUM(value) should also be not nullable since value is not nullable. @rxin do you think it would be reasonable to flag the Sum aggregation function as nullable (or not) depending on the input expression's schema ? Regards, Olivier. Le lun. 11 mai 2015 à 22:07, Reynold Xin r...@databricks.com a écrit : Not by design. Would you be interested in submitting a pull request? On Mon, May 11, 2015 at 1:48 AM, Haopu Wang hw...@qilinsoft.com wrote: I try to get the result schema of aggregate functions using DataFrame API. However, I find the result field of groupBy columns are always nullable even the source field is not nullable. I want to know if this is by design, thank you! Below is the simple code to show the issue. == import sqlContext.implicits._ import org.apache.spark.sql.functions._ case class Test(key: String, value: Long) val df = sc.makeRDD(Seq(Test(k1,2),Test(k1,1))).toDF val result = df.groupBy(key).agg($key, sum(value)) // From the output, you can see the key column is nullable, why?? result.printSchema //root // |-- key: string (nullable = true) // |-- SUM(value): long (nullable = true) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Kafka stream fails: java.lang.NoClassDefFound com/yammer/metrics/core/Gauge
com.yammer.metrics.core.Gauge is in metrics-core jar e.g., in master branch: [INFO] | \- org.apache.kafka:kafka_2.10:jar:0.8.1.1:compile [INFO] | +- com.yammer.metrics:metrics-core:jar:2.2.0:compile Please make sure metrics-core jar is on the classpath. On Mon, May 11, 2015 at 1:32 PM, Lee McFadden splee...@gmail.com wrote: Hi, We've been having some issues getting spark streaming running correctly using a Kafka stream, and we've been going around in circles trying to resolve this dependency. Details of our environment and the error below, if anyone can help resolve this it would be much appreciated. Submit command line: /home/ubuntu/spark/spark-1.3.1/bin/spark-submit \ --packages TargetHolding/pyspark-cassandra:0.1.4,org.apache.spark:spark-streaming-kafka_2.10:1.3.1 \ --conf spark.cassandra.connection.host=10.10.103.172,10.10.102.160,10.10.101.79 \ --master spark://127.0.0.1:7077 \ affected_hosts.py When we run the streaming job everything starts just fine, then we see the following in the logs: 15/05/11 19:50:46 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 70, ip-10-10-102-53.us-west-2.compute.internal): java.lang.NoClassDefFoundError: com/yammer/metrics/core/Gauge at kafka.consumer.ZookeeperConsumerConnector.createFetcher(ZookeeperConsumerConnector.scala:151) at kafka.consumer.ZookeeperConsumerConnector.init(ZookeeperConsumerConnector.scala:115) at kafka.consumer.ZookeeperConsumerConnector.init(ZookeeperConsumerConnector.scala:128) at kafka.consumer.Consumer$.create(ConsumerConnector.scala:89) at org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:100) at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121) at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:298) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:290) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ClassNotFoundException: com.yammer.metrics.core.Gauge at java.net.URLClassLoader$1.run(URLClassLoader.java:372) at java.net.URLClassLoader$1.run(URLClassLoader.java:361) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:360) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 17 more
Spark and RabbitMQ
Are there existing or under development versions/modules for streaming messages out of RabbitMQ with SparkStreaming, or perhaps a RabbitMQ RDD? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-and-RabbitMQ-tp22852.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Running Spark in local mode seems to ignore local[N]
You have one worker with one executor with 32 execution slots. On Mon, May 11, 2015 at 9:52 PM, dgoldenberg dgoldenberg...@gmail.com wrote: Hi, Is there anything special one must do, running locally and submitting a job like so: spark-submit \ --class com.myco.Driver \ --master local[*] \ ./lib/myco.jar In my logs, I'm only seeing log messages with the thread identifier of Executor task launch worker-0. There are 4 cores on the machine so I expected 4 threads to be at play. Running with local[32] did not yield 32 worker threads. Any recommendations? Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Running-Spark-in-local-mode-seems-to-ignore-local-N-tp22851.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Kafka stream fails: java.lang.NoClassDefFound com/yammer/metrics/core/Gauge
Thanks Ted, The issue is that I'm using packages (see spark-submit definition) and I do not know how to add com.yammer.metrics:metrics-core to my classpath so Spark can see it. Should metrics-core not be part of the org.apache.spark:spark-streaming-kafka_2.10:1.3.1 package so it can work correctly? If not, any clues as to how I can add metrics-core to my project (bearing in mind that I'm using Python, not a JVM language) would be much appreciated. Thanks, and apologies for my newbness with Java/Scala. On Mon, May 11, 2015 at 1:42 PM Ted Yu yuzhih...@gmail.com wrote: com.yammer.metrics.core.Gauge is in metrics-core jar e.g., in master branch: [INFO] | \- org.apache.kafka:kafka_2.10:jar:0.8.1.1:compile [INFO] | +- com.yammer.metrics:metrics-core:jar:2.2.0:compile Please make sure metrics-core jar is on the classpath. On Mon, May 11, 2015 at 1:32 PM, Lee McFadden splee...@gmail.com wrote: Hi, We've been having some issues getting spark streaming running correctly using a Kafka stream, and we've been going around in circles trying to resolve this dependency. Details of our environment and the error below, if anyone can help resolve this it would be much appreciated. Submit command line: /home/ubuntu/spark/spark-1.3.1/bin/spark-submit \ --packages TargetHolding/pyspark-cassandra:0.1.4,org.apache.spark:spark-streaming-kafka_2.10:1.3.1 \ --conf spark.cassandra.connection.host=10.10.103.172,10.10.102.160,10.10.101.79 \ --master spark://127.0.0.1:7077 \ affected_hosts.py When we run the streaming job everything starts just fine, then we see the following in the logs: 15/05/11 19:50:46 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 70, ip-10-10-102-53.us-west-2.compute.internal): java.lang.NoClassDefFoundError: com/yammer/metrics/core/Gauge at kafka.consumer.ZookeeperConsumerConnector.createFetcher(ZookeeperConsumerConnector.scala:151) at kafka.consumer.ZookeeperConsumerConnector.init(ZookeeperConsumerConnector.scala:115) at kafka.consumer.ZookeeperConsumerConnector.init(ZookeeperConsumerConnector.scala:128) at kafka.consumer.Consumer$.create(ConsumerConnector.scala:89) at org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:100) at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121) at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:298) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:290) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ClassNotFoundException: com.yammer.metrics.core.Gauge at java.net.URLClassLoader$1.run(URLClassLoader.java:372) at java.net.URLClassLoader$1.run(URLClassLoader.java:361) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:360) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 17 more
Get a list of temporary RDD tables via Thrift
Hi, How can I get a list of temporary tables via Thrift? Have used thrift's startWithContext and registered a temp table, but not seeing the temp table/rdd when running show tables. Thanks, Judy
Re: [SparkSQL 1.4.0] groupBy columns are always nullable?
Thanks for catching this. I didn't read carefully enough. It'd make sense to have the udaf result be non-nullable, if the exprs are indeed non-nullable. On Mon, May 11, 2015 at 1:32 PM, Olivier Girardot ssab...@gmail.com wrote: Hi Haopu, actually here `key` is nullable because this is your input's schema : scala result.printSchema root |-- key: string (nullable = true) |-- SUM(value): long (nullable = true) scala df.printSchema root |-- key: string (nullable = true) |-- value: long (nullable = false) I tried it with a schema where the key is not flagged as nullable, and the schema is actually respected. What you can argue however is that SUM(value) should also be not nullable since value is not nullable. @rxin do you think it would be reasonable to flag the Sum aggregation function as nullable (or not) depending on the input expression's schema ? Regards, Olivier. Le lun. 11 mai 2015 à 22:07, Reynold Xin r...@databricks.com a écrit : Not by design. Would you be interested in submitting a pull request? On Mon, May 11, 2015 at 1:48 AM, Haopu Wang hw...@qilinsoft.com wrote: I try to get the result schema of aggregate functions using DataFrame API. However, I find the result field of groupBy columns are always nullable even the source field is not nullable. I want to know if this is by design, thank you! Below is the simple code to show the issue. == import sqlContext.implicits._ import org.apache.spark.sql.functions._ case class Test(key: String, value: Long) val df = sc.makeRDD(Seq(Test(k1,2),Test(k1,1))).toDF val result = df.groupBy(key).agg($key, sum(value)) // From the output, you can see the key column is nullable, why?? result.printSchema //root // |-- key: string (nullable = true) // |-- SUM(value): long (nullable = true) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: [SparkSQL 1.4.0] groupBy columns are always nullable?
Not by design. Would you be interested in submitting a pull request? On Mon, May 11, 2015 at 1:48 AM, Haopu Wang hw...@qilinsoft.com wrote: I try to get the result schema of aggregate functions using DataFrame API. However, I find the result field of groupBy columns are always nullable even the source field is not nullable. I want to know if this is by design, thank you! Below is the simple code to show the issue. == import sqlContext.implicits._ import org.apache.spark.sql.functions._ case class Test(key: String, value: Long) val df = sc.makeRDD(Seq(Test(k1,2),Test(k1,1))).toDF val result = df.groupBy(key).agg($key, sum(value)) // From the output, you can see the key column is nullable, why?? result.printSchema //root // |-- key: string (nullable = true) // |-- SUM(value): long (nullable = true) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Specify Python interpreter
Hey there, I have installed a python interpreter in certain location, say /opt/local/anaconda. Is there anything that I can specify the Python interpreter while developing in iPython notebook? Maybe a property in the while creating the Sparkcontext? I know that I can put #!/opt/local/anaconda at the top of my Python code and use spark-submit to distribute it to the cluster. However, since I am using iPython notebook, this is not available as an option. Best, Bin
Kafka stream fails: java.lang.NoClassDefFound com/yammer/metrics/core/Gauge
Hi, We've been having some issues getting spark streaming running correctly using a Kafka stream, and we've been going around in circles trying to resolve this dependency. Details of our environment and the error below, if anyone can help resolve this it would be much appreciated. Submit command line: /home/ubuntu/spark/spark-1.3.1/bin/spark-submit \ --packages TargetHolding/pyspark-cassandra:0.1.4,org.apache.spark:spark-streaming-kafka_2.10:1.3.1 \ --conf spark.cassandra.connection.host=10.10.103.172,10.10.102.160,10.10.101.79 \ --master spark://127.0.0.1:7077 \ affected_hosts.py When we run the streaming job everything starts just fine, then we see the following in the logs: 15/05/11 19:50:46 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 70, ip-10-10-102-53.us-west-2.compute.internal): java.lang.NoClassDefFoundError: com/yammer/metrics/core/Gauge at kafka.consumer.ZookeeperConsumerConnector.createFetcher(ZookeeperConsumerConnector.scala:151) at kafka.consumer.ZookeeperConsumerConnector.init(ZookeeperConsumerConnector.scala:115) at kafka.consumer.ZookeeperConsumerConnector.init(ZookeeperConsumerConnector.scala:128) at kafka.consumer.Consumer$.create(ConsumerConnector.scala:89) at org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:100) at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121) at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:298) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:290) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ClassNotFoundException: com.yammer.metrics.core.Gauge at java.net.URLClassLoader$1.run(URLClassLoader.java:372) at java.net.URLClassLoader$1.run(URLClassLoader.java:361) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:360) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 17 more
Re: Running Spark in local mode seems to ignore local[N]
Are you actually running anything that requires all those slots? e.g., locally, I get this with local[16], but only after I run something that actually uses those 16 slots: Executor task launch worker-15 daemon prio=10 tid=0x7f4c80029800 nid=0x8ce waiting on condition [0x7f4c62493000] Executor task launch worker-14 daemon prio=10 tid=0x7f4c80027800 nid=0x8cd waiting on condition [0x7f4c62594000] Executor task launch worker-13 daemon prio=10 tid=0x7f4c80025800 nid=0x8cc waiting on condition [0x7f4c62695000] Executor task launch worker-12 daemon prio=10 tid=0x7f4c80023800 nid=0x8cb waiting on condition [0x7f4c62796000] Executor task launch worker-11 daemon prio=10 tid=0x7f4c80021800 nid=0x8ca waiting on condition [0x7f4c62897000] Executor task launch worker-10 daemon prio=10 tid=0x7f4c8001f800 nid=0x8c9 waiting on condition [0x7f4c62998000] Executor task launch worker-9 daemon prio=10 tid=0x7f4c8001d800 nid=0x8c8 waiting on condition [0x7f4c62a99000] Executor task launch worker-8 daemon prio=10 tid=0x7f4c8001b800 nid=0x8c7 waiting on condition [0x7f4c62b9a000] Executor task launch worker-7 daemon prio=10 tid=0x7f4c80019800 nid=0x8c6 waiting on condition [0x7f4c62c9b000] Executor task launch worker-6 daemon prio=10 tid=0x7f4c80018000 nid=0x8c5 waiting on condition [0x7f4c62d9c000] Executor task launch worker-5 daemon prio=10 tid=0x7f4c80011000 nid=0x8c4 waiting on condition [0x7f4c62e9d000] Executor task launch worker-4 daemon prio=10 tid=0x7f4c8000f800 nid=0x8c3 waiting on condition [0x7f4c62f9e000] Executor task launch worker-3 daemon prio=10 tid=0x7f4c8000e000 nid=0x8c2 waiting on condition [0x7f4c6309f000] Executor task launch worker-2 daemon prio=10 tid=0x7f4c8000c800 nid=0x8c1 waiting on condition [0x7f4c631a] Executor task launch worker-1 daemon prio=10 tid=0x7f4c80007800 nid=0x8c0 waiting on condition [0x7f4c632a1000] Executor task launch worker-0 daemon prio=10 tid=0x7f4c80015800 nid=0x8bf waiting on condition [0x7f4c635f4000] On Mon, May 11, 2015 at 1:52 PM, dgoldenberg dgoldenberg...@gmail.com wrote: Hi, Is there anything special one must do, running locally and submitting a job like so: spark-submit \ --class com.myco.Driver \ --master local[*] \ ./lib/myco.jar In my logs, I'm only seeing log messages with the thread identifier of Executor task launch worker-0. There are 4 cores on the machine so I expected 4 threads to be at play. Running with local[32] did not yield 32 worker threads. Any recommendations? Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Running-Spark-in-local-mode-seems-to-ignore-local-N-tp22851.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Marcelo
Re: Running Spark in local mode seems to ignore local[N]
Understood. We'll use the multi-threaded code we already have.. How are these execution slots filled up? I assume each slot is dedicated to one submitted task. If that's the case, how is each task distributed then, i.e. how is that task run in a multi-node fashion? Say 1000 batches/RDD's are extracted out of Kafka, how does that relate to the number of executors vs. task slots? Presumably we can fill up the slots with multiple instances of the same task... How do we know how many to launch? On Mon, May 11, 2015 at 5:20 PM, Sean Owen so...@cloudera.com wrote: BTW I think my comment was wrong as marcelo demonstrated. In standalone mode you'd have one worker, and you do have one executor, but his explanation is right. But, you certainly have execution slots for each core. Are you talking about your own user code? you can make threads, but that's nothing do with Spark then. If you run code on your driver, it's not distributed. If you run Spark over an RDD with 1 partition, only one task works on it. On Mon, May 11, 2015 at 10:16 PM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: Sean, How does this model actually work? Let's say we want to run one job as N threads executing one particular task, e.g. streaming data out of Kafka into a search engine. How do we configure our Spark job execution? Right now, I'm seeing this job running as a single thread. And it's quite a bit slower than just running a simple utility with a thread executor with a thread pool of N threads doing the same task. The performance I'm seeing of running the Kafka-Spark Streaming job is 7 times slower than that of the utility. What's pulling Spark back? Thanks. On Mon, May 11, 2015 at 4:55 PM, Sean Owen so...@cloudera.com wrote: You have one worker with one executor with 32 execution slots. On Mon, May 11, 2015 at 9:52 PM, dgoldenberg dgoldenberg...@gmail.com wrote: Hi, Is there anything special one must do, running locally and submitting a job like so: spark-submit \ --class com.myco.Driver \ --master local[*] \ ./lib/myco.jar In my logs, I'm only seeing log messages with the thread identifier of Executor task launch worker-0. There are 4 cores on the machine so I expected 4 threads to be at play. Running with local[32] did not yield 32 worker threads. Any recommendations? Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Running-Spark-in-local-mode-seems-to-ignore-local-N-tp22851.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Kafka stream fails: java.lang.NoClassDefFound com/yammer/metrics/core/Gauge
Ah yes, the Kafka + streaming code isn't in the assembly, is it? you'd have to provide it and all its dependencies with your app. You could also build this into your own app jar. Tools like Maven will add in the transitive dependencies. On Mon, May 11, 2015 at 10:04 PM, Lee McFadden splee...@gmail.com wrote: Thanks Ted, The issue is that I'm using packages (see spark-submit definition) and I do not know how to add com.yammer.metrics:metrics-core to my classpath so Spark can see it. Should metrics-core not be part of the org.apache.spark:spark-streaming-kafka_2.10:1.3.1 package so it can work correctly? If not, any clues as to how I can add metrics-core to my project (bearing in mind that I'm using Python, not a JVM language) would be much appreciated. Thanks, and apologies for my newbness with Java/Scala. On Mon, May 11, 2015 at 1:42 PM Ted Yu yuzhih...@gmail.com wrote: com.yammer.metrics.core.Gauge is in metrics-core jar e.g., in master branch: [INFO] | \- org.apache.kafka:kafka_2.10:jar:0.8.1.1:compile [INFO] | +- com.yammer.metrics:metrics-core:jar:2.2.0:compile Please make sure metrics-core jar is on the classpath. On Mon, May 11, 2015 at 1:32 PM, Lee McFadden splee...@gmail.com wrote: Hi, We've been having some issues getting spark streaming running correctly using a Kafka stream, and we've been going around in circles trying to resolve this dependency. Details of our environment and the error below, if anyone can help resolve this it would be much appreciated. Submit command line: /home/ubuntu/spark/spark-1.3.1/bin/spark-submit \ --packages TargetHolding/pyspark-cassandra:0.1.4,org.apache.spark:spark-streaming-kafka_2.10:1.3.1 \ --conf spark.cassandra.connection.host=10.10.103.172,10.10.102.160,10.10.101.79 \ --master spark://127.0.0.1:7077 \ affected_hosts.py When we run the streaming job everything starts just fine, then we see the following in the logs: 15/05/11 19:50:46 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 70, ip-10-10-102-53.us-west-2.compute.internal): java.lang.NoClassDefFoundError: com/yammer/metrics/core/Gauge at kafka.consumer.ZookeeperConsumerConnector.createFetcher(ZookeeperConsumerConnector.scala:151) at kafka.consumer.ZookeeperConsumerConnector.init(ZookeeperConsumerConnector.scala:115) at kafka.consumer.ZookeeperConsumerConnector.init(ZookeeperConsumerConnector.scala:128) at kafka.consumer.Consumer$.create(ConsumerConnector.scala:89) at org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:100) at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121) at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:298) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:290) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ClassNotFoundException: com.yammer.metrics.core.Gauge at java.net.URLClassLoader$1.run(URLClassLoader.java:372) at java.net.URLClassLoader$1.run(URLClassLoader.java:361) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:360) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 17 more - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Met a problem when using spark to load parquet files with different version schemas
Creating dataframes and union them looks reasonable. thanks, Wei On Mon, May 11, 2015 at 6:39 PM, Michael Armbrust mich...@databricks.com wrote: Ah, yeah sorry. I should have read closer and realized that what you are asking for is not supported. It might be possible to add simple coercions such as this one, but today, compatible schemas must only add/remove columns and cannot change types. You could try creating different dataframes and unionAll them. Coercions should be inserted automatically in that case. On Mon, May 11, 2015 at 3:37 PM, Wei Yan ywsk...@gmail.com wrote: Thanks for the reply, Michael. The problem is, if I set spark.sql.parquet.useDataSourceApi to true, spark cannot create a DataFrame. The exception shows it failed to merge incompatible schemas. I think here it means that, the int schema cannot be merged with the long one. Does it mean that the schema merging doesn't support the same field with different types? -Wei On Mon, May 11, 2015 at 3:10 PM, Michael Armbrust mich...@databricks.com wrote: BTW, I use spark 1.3.1, and already set spark.sql.parquet.useDataSourceApi to false. Schema merging is only supported when this flag is set to true (setting it to false uses old code that will be removed once the new code is proven).
Re: Spark SQL: STDDEV working in Spark Shell but not in a standalone app
I doubt that will make it as we are pretty slammed with other things and the author needs to address the comments / merge conflict still. I'll add that in general I recommend users use the HiveContext, even if they aren't using Hive at all. Its a strict super set of the functionality provided by SQLContext and the parser is much more powerful. On Mon, May 11, 2015 at 4:00 PM, Oleg Shirokikh o...@solver.com wrote: Michael – Thanks for the response – that’s right, I haven’t noticed that Spark Shell instantiates sqlContext as a HiveContext, not actual Spark SQL Context… I’ve seen the PR to add STDDEV to data frames.. Can I expect this to be added to Spark SQL in Spark 1.4 or it’s still uncertain? It would be really helpful to know in order to understand if I have to change existing code to use HiveContext instead of SQLContext (which would be undesired)… Thanks! *From:* Michael Armbrust [mailto:mich...@databricks.com] *Sent:* Saturday, May 09, 2015 11:32 AM *To:* Oleg Shirokikh *Cc:* user *Subject:* Re: Spark SQL: STDDEV working in Spark Shell but not in a standalone app Are you perhaps using a HiveContext in the shell but a SQLContext in your app? I don't think we natively implement stddev until 1.4.0 On Fri, May 8, 2015 at 4:44 PM, barmaley o...@solver.com wrote: Given a registered table from data frame, I'm able to execute queries like sqlContext.sql(SELECT STDDEV(col1) FROM table) from Spark Shell just fine. However, when I run exactly the same code in a standalone app on a cluster, it throws an exception: java.util.NoSuchElementException: key not found: STDDEV... Is STDDEV ia among default functions in Spark SQL? I'd appreciate if you could comment what's going on with the above. Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-STDDEV-working-in-Spark-Shell-but-not-in-a-standalone-app-tp22825.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Running Spark in local mode seems to ignore local[N]
Seems to be running OK with 4 threads, 16 threads... While running with 32 threads I started getting the below. 15/05/11 19:48:46 WARN executor.Executor: Issue communicating with driver in heartbeater org.apache.spark.SparkException: Error sending message [message = Heartbeat(driver,[Lscala.Tuple2;@7668b255,BlockManagerId(driver, localhost, 43318))] at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:209) at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:427) Caused by: akka.pattern.AskTimeoutException: Recipient[Actor[akka://sparkDriver/user/HeartbeatReceiver#-677986522]] had already been terminated. at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:132) at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:194) ... 1 more On Mon, May 11, 2015 at 5:34 PM, Sean Owen so...@cloudera.com wrote: You might have a look at the Spark docs to start. 1 batch = 1 RDD, but 1 RDD can have many partitions. And should, for scale. You do not submit multiple jobs to get parallelism. The number of partitions in a streaming RDD is determined by the block interval and the batch interval. If you have a batch interval of 10s and block interval of 1s you'll get 10 partitions of data in the RDD. On Mon, May 11, 2015 at 10:29 PM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: Understood. We'll use the multi-threaded code we already have.. How are these execution slots filled up? I assume each slot is dedicated to one submitted task. If that's the case, how is each task distributed then, i.e. how is that task run in a multi-node fashion? Say 1000 batches/RDD's are extracted out of Kafka, how does that relate to the number of executors vs. task slots? Presumably we can fill up the slots with multiple instances of the same task... How do we know how many to launch? On Mon, May 11, 2015 at 5:20 PM, Sean Owen so...@cloudera.com wrote: BTW I think my comment was wrong as marcelo demonstrated. In standalone mode you'd have one worker, and you do have one executor, but his explanation is right. But, you certainly have execution slots for each core. Are you talking about your own user code? you can make threads, but that's nothing do with Spark then. If you run code on your driver, it's not distributed. If you run Spark over an RDD with 1 partition, only one task works on it. On Mon, May 11, 2015 at 10:16 PM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: Sean, How does this model actually work? Let's say we want to run one job as N threads executing one particular task, e.g. streaming data out of Kafka into a search engine. How do we configure our Spark job execution? Right now, I'm seeing this job running as a single thread. And it's quite a bit slower than just running a simple utility with a thread executor with a thread pool of N threads doing the same task. The performance I'm seeing of running the Kafka-Spark Streaming job is 7 times slower than that of the utility. What's pulling Spark back? Thanks. On Mon, May 11, 2015 at 4:55 PM, Sean Owen so...@cloudera.com wrote: You have one worker with one executor with 32 execution slots. On Mon, May 11, 2015 at 9:52 PM, dgoldenberg dgoldenberg...@gmail.com wrote: Hi, Is there anything special one must do, running locally and submitting a job like so: spark-submit \ --class com.myco.Driver \ --master local[*] \ ./lib/myco.jar In my logs, I'm only seeing log messages with the thread identifier of Executor task launch worker-0. There are 4 cores on the machine so I expected 4 threads to be at play. Running with local[32] did not yield 32 worker threads. Any recommendations? Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Running-Spark-in-local-mode-seems-to-ignore-local-N-tp22851.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark : use the global config variables in executors
Note that `object` is equivalent to a class full of static fields / methods (in Java), so the data it holds will not be serialized, ever. What you want is a config class instead, so you can instantiate it, and that instance can be serialized. Then you can easily do (1) or (3). On Mon, May 11, 2015 at 7:55 AM, hotdog lisend...@163.com wrote: I have a global config Object in my spark app. Object Config { var lambda = 0.01 } and I will set the value of lambda according to user's input. Object MyApp { def main(args: String[]) { Config.lambda = args(0).toDouble ... rdd.map(_ * Config.lambda) } } and I found that the modification does not take effect in executors. The value of lambda is always 0.01. I guess the modification in driver's jvm will not effect the executor's. Do you have other solution ? I found a similar question in stackoverflow : http://stackoverflow.com/questions/29685330/how-to-set-and-get-static-variables-from-spark in @DanielL. 's answer, he gives three solutions: 1. Put the value inside a closure to be serialized to the executors to perform a task. **But I wonder how to write the closure and how to serialized it to the executors, could any one give me some code example?** 2.If the values are fixed or the configuration is available on the executor nodes (lives inside the jar, etc), then you can have a lazy val, guaranteeing initialization only once. **what if I declare the lambda as a lazy val variable? the modification in the driver will take effects in the executors? could you give me some code example?** 3.Create a broadcast variable with the data. I know this way, but it also need a local Broadcast[] variable which wraps the Config Object right? for example: val config = sc.broadcast(Config) and use `config.value.lambda` in executors , right ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-use-the-global-config-variables-in-executors-tp22846.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Marcelo
Re: Kafka stream fails: java.lang.NoClassDefFound com/yammer/metrics/core/Gauge
Ted, many thanks. I'm not used to Java dependencies so this was a real head-scratcher for me. Downloading the two metrics packages from the maven repository (metrics-core, metrics-annotation) and supplying it on the spark-submit command line worked. My final spark-submit for a python project using Kafka as an input source: /home/ubuntu/spark/spark-1.3.1/bin/spark-submit \ --packages TargetHolding/pyspark-cassandra:0.1.4,org.apache.spark:spark-streaming-kafka_2.10:1.3.1 \ --jars /home/ubuntu/jars/metrics-core-2.2.0.jar,/home/ubuntu/jars/metrics-annotation-2.2.0.jar \ --conf spark.cassandra.connection.host=10.10.103.172,10.10.102.160,10.10.101.79 \ --master spark://127.0.0.1:7077 \ affected_hosts.py Now we're seeing data from the stream. Thanks again! On Mon, May 11, 2015 at 2:43 PM Sean Owen so...@cloudera.com wrote: Ah yes, the Kafka + streaming code isn't in the assembly, is it? you'd have to provide it and all its dependencies with your app. You could also build this into your own app jar. Tools like Maven will add in the transitive dependencies. On Mon, May 11, 2015 at 10:04 PM, Lee McFadden splee...@gmail.com wrote: Thanks Ted, The issue is that I'm using packages (see spark-submit definition) and I do not know how to add com.yammer.metrics:metrics-core to my classpath so Spark can see it. Should metrics-core not be part of the org.apache.spark:spark-streaming-kafka_2.10:1.3.1 package so it can work correctly? If not, any clues as to how I can add metrics-core to my project (bearing in mind that I'm using Python, not a JVM language) would be much appreciated. Thanks, and apologies for my newbness with Java/Scala. On Mon, May 11, 2015 at 1:42 PM Ted Yu yuzhih...@gmail.com wrote: com.yammer.metrics.core.Gauge is in metrics-core jar e.g., in master branch: [INFO] | \- org.apache.kafka:kafka_2.10:jar:0.8.1.1:compile [INFO] | +- com.yammer.metrics:metrics-core:jar:2.2.0:compile Please make sure metrics-core jar is on the classpath. On Mon, May 11, 2015 at 1:32 PM, Lee McFadden splee...@gmail.com wrote: Hi, We've been having some issues getting spark streaming running correctly using a Kafka stream, and we've been going around in circles trying to resolve this dependency. Details of our environment and the error below, if anyone can help resolve this it would be much appreciated. Submit command line: /home/ubuntu/spark/spark-1.3.1/bin/spark-submit \ --packages TargetHolding/pyspark-cassandra:0.1.4,org.apache.spark:spark-streaming-kafka_2.10:1.3.1 \ --conf spark.cassandra.connection.host=10.10.103.172,10.10.102.160,10.10.101.79 \ --master spark://127.0.0.1:7077 \ affected_hosts.py When we run the streaming job everything starts just fine, then we see the following in the logs: 15/05/11 19:50:46 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 70, ip-10-10-102-53.us-west-2.compute.internal): java.lang.NoClassDefFoundError: com/yammer/metrics/core/Gauge at kafka.consumer.ZookeeperConsumerConnector.createFetcher(ZookeeperConsumerConnector.scala:151) at kafka.consumer.ZookeeperConsumerConnector.init(ZookeeperConsumerConnector.scala:115) at kafka.consumer.ZookeeperConsumerConnector.init(ZookeeperConsumerConnector.scala:128) at kafka.consumer.Consumer$.create(ConsumerConnector.scala:89) at org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:100) at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121) at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:298) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:290) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ClassNotFoundException: com.yammer.metrics.core.Gauge at java.net.URLClassLoader$1.run(URLClassLoader.java:372) at java.net.URLClassLoader$1.run(URLClassLoader.java:361) at java.security.AccessController.doPrivileged(Native Method)
Can standalone cluster manager provide I/O information on worker nodes?
Hi, Can standalone cluster manager provide I/O information on worker nodes? If not, possible to point out what's the proper file to modify to achieve that functionality? Besides, does Mesos support that? Regards. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
DStream Union vs. StreamingContext Union
Can someone explain to me the difference between DStream union and StreamingContext union? When do you use one vs the other? Thanks, Vadim ᐧ
RE: Spark SQL: STDDEV working in Spark Shell but not in a standalone app
Michael – Thanks for the response – that’s right, I haven’t noticed that Spark Shell instantiates sqlContext as a HiveContext, not actual Spark SQL Context… I’ve seen the PR to add STDDEV to data frames.. Can I expect this to be added to Spark SQL in Spark 1.4 or it’s still uncertain? It would be really helpful to know in order to understand if I have to change existing code to use HiveContext instead of SQLContext (which would be undesired)… Thanks! From: Michael Armbrust [mailto:mich...@databricks.com] Sent: Saturday, May 09, 2015 11:32 AM To: Oleg Shirokikh Cc: user Subject: Re: Spark SQL: STDDEV working in Spark Shell but not in a standalone app Are you perhaps using a HiveContext in the shell but a SQLContext in your app? I don't think we natively implement stddev until 1.4.0 On Fri, May 8, 2015 at 4:44 PM, barmaley o...@solver.commailto:o...@solver.com wrote: Given a registered table from data frame, I'm able to execute queries like sqlContext.sql(SELECT STDDEV(col1) FROM table) from Spark Shell just fine. However, when I run exactly the same code in a standalone app on a cluster, it throws an exception: java.util.NoSuchElementException: key not found: STDDEV... Is STDDEV ia among default functions in Spark SQL? I'd appreciate if you could comment what's going on with the above. Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-STDDEV-working-in-Spark-Shell-but-not-in-a-standalone-app-tp22825.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.orgmailto:user-h...@spark.apache.org
Re: Reading Nested Fields in DataFrames
Had the same question on stackoverflow recently http://stackoverflow.com/questions/30008127/how-to-read-a-nested-collection-in-spark Lomig Mégard had a detailed answer of how to do this without using LATERAL VIEW. On Mon, May 11, 2015 at 8:05 AM, Ashish Kumar Singh ashish23...@gmail.com wrote: Hi , I am trying to read Nested Avro data in Spark 1.3 using DataFrames. I need help to retrieve the Inner element data in the Structure below. Below is the schema when I enter df.printSchema : |-- THROTTLING_PERCENTAGE: double (nullable = false) |-- IMPRESSION_TYPE: string (nullable = false) |-- campaignArray: array (nullable = false) ||-- element: struct (containsNull = false) |||-- COOKIE: string (nullable = false) |||-- CAMPAIGN_ID: long (nullable = false) How can I access CAMPAIGN_ID field in this schema ? Thanks, Ashish Kr. Singh
How to get Master UI with ZooKeeper HA setup?
Hi, We have a 3-node master setup with ZooKeeper HA. Driver can find the master with spark://xxx:xxx,xxx:xxx,xxx:xxx But how can I find out the valid Master UI without looping through all 3 nodes? Thanks
Re: Met a problem when using spark to load parquet files with different version schemas
Ah, yeah sorry. I should have read closer and realized that what you are asking for is not supported. It might be possible to add simple coercions such as this one, but today, compatible schemas must only add/remove columns and cannot change types. You could try creating different dataframes and unionAll them. Coercions should be inserted automatically in that case. On Mon, May 11, 2015 at 3:37 PM, Wei Yan ywsk...@gmail.com wrote: Thanks for the reply, Michael. The problem is, if I set spark.sql.parquet.useDataSourceApi to true, spark cannot create a DataFrame. The exception shows it failed to merge incompatible schemas. I think here it means that, the int schema cannot be merged with the long one. Does it mean that the schema merging doesn't support the same field with different types? -Wei On Mon, May 11, 2015 at 3:10 PM, Michael Armbrust mich...@databricks.com wrote: BTW, I use spark 1.3.1, and already set spark.sql.parquet.useDataSourceApi to false. Schema merging is only supported when this flag is set to true (setting it to false uses old code that will be removed once the new code is proven).
Re: Kafka stream fails: java.lang.NoClassDefFound com/yammer/metrics/core/Gauge
I opened a ticket on this (without posting here first - bad etiquette, apologies) which was closed as 'fixed'. https://issues.apache.org/jira/browse/SPARK-7538 I don't believe that because I have my script running means this is fixed, I think it is still an issue. I downloaded the spark source, ran `mvn -DskipTests clean package `, then simply launched my python script (which shouldn't be introducing additional *java* dependencies itself?). Doesn't this mean these dependencies are missing from the spark build, since I didn't modify any files within the distribution and my application itself can't be introducing java dependency clashes? On Mon, May 11, 2015, 4:34 PM Lee McFadden splee...@gmail.com wrote: Ted, many thanks. I'm not used to Java dependencies so this was a real head-scratcher for me. Downloading the two metrics packages from the maven repository (metrics-core, metrics-annotation) and supplying it on the spark-submit command line worked. My final spark-submit for a python project using Kafka as an input source: /home/ubuntu/spark/spark-1.3.1/bin/spark-submit \ --packages TargetHolding/pyspark-cassandra:0.1.4,org.apache.spark:spark-streaming-kafka_2.10:1.3.1 \ --jars /home/ubuntu/jars/metrics-core-2.2.0.jar,/home/ubuntu/jars/metrics-annotation-2.2.0.jar \ --conf spark.cassandra.connection.host=10.10.103.172,10.10.102.160,10.10.101.79 \ --master spark://127.0.0.1:7077 \ affected_hosts.py Now we're seeing data from the stream. Thanks again! On Mon, May 11, 2015 at 2:43 PM Sean Owen so...@cloudera.com wrote: Ah yes, the Kafka + streaming code isn't in the assembly, is it? you'd have to provide it and all its dependencies with your app. You could also build this into your own app jar. Tools like Maven will add in the transitive dependencies. On Mon, May 11, 2015 at 10:04 PM, Lee McFadden splee...@gmail.com wrote: Thanks Ted, The issue is that I'm using packages (see spark-submit definition) and I do not know how to add com.yammer.metrics:metrics-core to my classpath so Spark can see it. Should metrics-core not be part of the org.apache.spark:spark-streaming-kafka_2.10:1.3.1 package so it can work correctly? If not, any clues as to how I can add metrics-core to my project (bearing in mind that I'm using Python, not a JVM language) would be much appreciated. Thanks, and apologies for my newbness with Java/Scala. On Mon, May 11, 2015 at 1:42 PM Ted Yu yuzhih...@gmail.com wrote: com.yammer.metrics.core.Gauge is in metrics-core jar e.g., in master branch: [INFO] | \- org.apache.kafka:kafka_2.10:jar:0.8.1.1:compile [INFO] | +- com.yammer.metrics:metrics-core:jar:2.2.0:compile Please make sure metrics-core jar is on the classpath. On Mon, May 11, 2015 at 1:32 PM, Lee McFadden splee...@gmail.com wrote: Hi, We've been having some issues getting spark streaming running correctly using a Kafka stream, and we've been going around in circles trying to resolve this dependency. Details of our environment and the error below, if anyone can help resolve this it would be much appreciated. Submit command line: /home/ubuntu/spark/spark-1.3.1/bin/spark-submit \ --packages TargetHolding/pyspark-cassandra:0.1.4,org.apache.spark:spark-streaming-kafka_2.10:1.3.1 \ --conf spark.cassandra.connection.host=10.10.103.172,10.10.102.160,10.10.101.79 \ --master spark://127.0.0.1:7077 \ affected_hosts.py When we run the streaming job everything starts just fine, then we see the following in the logs: 15/05/11 19:50:46 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 70, ip-10-10-102-53.us-west-2.compute.internal): java.lang.NoClassDefFoundError: com/yammer/metrics/core/Gauge at kafka.consumer.ZookeeperConsumerConnector.createFetcher(ZookeeperConsumerConnector.scala:151) at kafka.consumer.ZookeeperConsumerConnector.init(ZookeeperConsumerConnector.scala:115) at kafka.consumer.ZookeeperConsumerConnector.init(ZookeeperConsumerConnector.scala:128) at kafka.consumer.Consumer$.create(ConsumerConnector.scala:89) at org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:100) at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121) at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:298) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:290) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498) at
Re: Spark can not access jar from HDFS !!
After upgrading to spark 1.3, these statements on hivecontext are working fine. Thanks On Mon, May 11, 2015, 12:15 Ravindra ravindra.baj...@gmail.com wrote: Hi All, Thanks for suggestions. What I tried is - hiveContext.sql (add jar ) and that helps to complete the create temporary function but while using this function I get ClassNotFound for the class handling this function. The same class is present in the jar added . Please note that the same works fine from the Hive Shell. Is there an issue with Spark while distributing jars across workers? May be that is causing the problem. Also can you please suggest the manual way of copying the jars to the workers, I just want to ascertain my assumption. Thanks, Ravi On Sun, May 10, 2015 at 1:40 AM Michael Armbrust mich...@databricks.com wrote: That code path is entirely delegated to hive. Does hive support this? You might try instead using sparkContext.addJar. On Sat, May 9, 2015 at 12:32 PM, Ravindra ravindra.baj...@gmail.com wrote: Hi All, I am trying to create custom udfs with hiveContext as given below - scala hiveContext.sql (CREATE TEMPORARY FUNCTION sample_to_upper AS 'com.abc.api.udf.MyUpper' USING JAR 'hdfs:///users/ravindra/customUDF2.jar') I have put the udf jar in the hdfs at the path given above. The same command works well in the hive shell but failing here in the spark shell. And it fails as given below. - 15/05/10 00:41:51 ERROR Task: FAILED: org.apache.hadoop.hive.ql.metadata.HiveException: Unable to load JAR hdfs:///users/ravindra/customUDF2.jar 15/05/10 00:41:51 INFO FunctionTask: create function: org.apache.hadoop.hive.ql.metadata.HiveException: Unable to load JAR hdfs:///users/ravindra/customUDF2.jar at org.apache.hadoop.hive.ql.exec.FunctionTask.addFunctionResources(FunctionTask.java:305) at org.apache.hadoop.hive.ql.exec.FunctionTask.createTemporaryFunction(FunctionTask.java:179) at org.apache.hadoop.hive.ql.exec.FunctionTask.execute(FunctionTask.java:81) at org.apache.hadoop.hive.ql.exec.Task.executeTask(Task.java:153) at org.apache.hadoop.hive.ql.exec.TaskRunner.runSequential(TaskRunner.java:85) at org.apache.hadoop.hive.ql.Driver.launchTask(Driver.java:1503) at org.apache.hadoop.hive.ql.Driver.execute(Driver.java:1270) at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1088) at org.apache.hadoop.hive.ql.Driver.run(Driver.java:911) at org.apache.hadoop.hive.ql.Driver.run(Driver.java:901) at org.apache.spark.sql.hive.HiveContext.runHive(HiveContext.scala:305) at org.apache.spark.sql.hive.HiveContext.runSqlHive(HiveContext.scala:276) at org.apache.spark.sql.hive.execution.NativeCommand.sideEffectResult$lzycompute(NativeCommand.scala:35) at org.apache.spark.sql.hive.execution.NativeCommand.sideEffectResult(NativeCommand.scala:35) at org.apache.spark.sql.execution.Command$class.execute(commands.scala:46) at org.apache.spark.sql.hive.execution.NativeCommand.execute(NativeCommand.scala:30) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425) at org.apache.spark.sql.SchemaRDDLike$class.$init$(SchemaRDDLike.scala:58) at org.apache.spark.sql.SchemaRDD.init(SchemaRDD.scala:108) at org.apache.spark.sql.hive.HiveContext.sql(HiveContext.scala:94) at $line17.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:18) at $line17.$read$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:23) at $line17.$read$$iwC$$iwC$$iwC$$iwC.init(console:25) at $line17.$read$$iwC$$iwC$$iwC.init(console:27) at $line17.$read$$iwC$$iwC.init(console:29) at $line17.$read$$iwC.init(console:31) at $line17.$read.init(console:33) at $line17.$read$.init(console:37) at $line17.$read$.clinit(console) at $line17.$eval$.init(console:7) at $line17.$eval$.clinit(console) at $line17.$eval.$print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:705) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:828) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:873) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:785) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:628) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:636) at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:641) at
RE: Does long-lived SparkContext hold on to executor resources?
Also check out the spark.cleaner.ttl property. Otherwise, you will accumulate shuffle metadata in the memory of the driver. Sent with Good (www.good.com) -Original Message- From: Silvio Fiorito [silvio.fior...@granturing.commailto:silvio.fior...@granturing.com] Sent: Monday, May 11, 2015 01:03 PM Eastern Standard Time To: stanley; user@spark.apache.org Subject: Re: Does long-lived SparkContext hold on to executor resources? You want to look at dynamic resource allocation, here: http://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation On 5/11/15, 11:23 AM, stanley wangshua...@yahoo.com wrote: I am building an analytics app with Spark. I plan to use long-lived SparkContexts to minimize the overhead for creating Spark contexts, which in turn reduces the analytics query response time. The number of queries that are run in the system is relatively small each day. Would long lived contexts hold on to the executor resources when there is no queries running? Is there a way to free executor resources in this type of use cases? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Does-long-lived-SparkContext-hold-on-to-executor-resources-tp22848.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
Unread block data
Hi, I'm trying to compile and use Spark 1.3.1 with Hadoop 2.2.0. I compiled from course with the Maven command: mvn -Phadoop-2.2 -Dhadoop.version=2.2.0 -Phive -Phive-0.12.0 -Phive-thriftserver -DskipTests clean package When I run this with a local master (bin/spark-shell --master local[2]) I can read data from the HDFS. When I run as a four node cluster, with the same jars and setup on each node, I get these errors: scala val textInput = sc.textFile(hdfs:///testing/dict) textInput: org.apache.spark.rdd.RDD[String] = hdfs:///testing/dict MapPartitionsRDD[7] at textFile at console:24 scala textInput take 10 org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed 4 times, most recent failure: Lost task 0.3 in stage 3.0 (TID 17, hadoop-kn-t503.systems.private): java.lang.IllegalStateException: unread block data at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2421) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1382) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:68) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:94) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:185) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org http://org.apache.spark.scheduler.dagscheduler.org/ $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) Can anyone tell me what the issue is?
Re: Does long-lived SparkContext hold on to executor resources?
You want to look at dynamic resource allocation, here: http://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation On 5/11/15, 11:23 AM, stanley wangshua...@yahoo.com wrote: I am building an analytics app with Spark. I plan to use long-lived SparkContexts to minimize the overhead for creating Spark contexts, which in turn reduces the analytics query response time. The number of queries that are run in the system is relatively small each day. Would long lived contexts hold on to the executor resources when there is no queries running? Is there a way to free executor resources in this type of use cases? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Does-long-lived-SparkContext-hold-on-to-executor-resources-tp22848.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
TwitterPopularTags Long Processing Delay
Hi, I'm running TwitterPopularTags.scala on a single node. Everything works fine for a while (about 30min), but after a while I see a long processing delay for tasks, and it keeps increasing. Has anyone experienced the same issue? Here is my configurations: spark.driver.memory 5g spark.executor.memory 5g Thanks, Seyed -- Seyed Majid Zahedi https://www.cs.duke.edu/~zahedi/ - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org