Re: SparkSQL overwrite parquet file does not generate _common_metadata
I'm using 1.0.4 Thanks, -- Pei-Lun On Fri, Mar 27, 2015 at 2:32 PM, Cheng Lian lian.cs@gmail.com wrote: Hm, which version of Hadoop are you using? Actually there should also be a _metadata file together with _common_metadata. I was using Hadoop 2.4.1 btw. I'm not sure whether Hadoop version matters here, but I did observe cases where Spark behaves differently because of semantic differences of the same API in different Hadoop versions. Cheng On 3/27/15 11:33 AM, Pei-Lun Lee wrote: Hi Cheng, on my computer, execute res0.save(xxx, org.apache.spark.sql.SaveMode. Overwrite) produces: peilunlee@pllee-mini:~/opt/spark-1.3...rc3-bin-hadoop1$ ls -l xxx total 32 -rwxrwxrwx 1 peilunlee staff0 Mar 27 11:29 _SUCCESS* -rwxrwxrwx 1 peilunlee staff 272 Mar 27 11:29 part-r-1.parquet* -rwxrwxrwx 1 peilunlee staff 272 Mar 27 11:29 part-r-2.parquet* -rwxrwxrwx 1 peilunlee staff 272 Mar 27 11:29 part-r-3.parquet* -rwxrwxrwx 1 peilunlee staff 488 Mar 27 11:29 part-r-4.parquet* while res0.save(xxx) produces: peilunlee@pllee-mini:~/opt/spark-1.3...rc3-bin-hadoop1$ ls -l xxx total 40 -rwxrwxrwx 1 peilunlee staff0 Mar 27 11:29 _SUCCESS* -rwxrwxrwx 1 peilunlee staff 250 Mar 27 11:29 _common_metadata* -rwxrwxrwx 1 peilunlee staff 272 Mar 27 11:29 part-r-1.parquet* -rwxrwxrwx 1 peilunlee staff 272 Mar 27 11:29 part-r-2.parquet* -rwxrwxrwx 1 peilunlee staff 272 Mar 27 11:29 part-r-3.parquet* -rwxrwxrwx 1 peilunlee staff 488 Mar 27 11:29 part-r-4.parquet* On Thu, Mar 26, 2015 at 7:26 PM, Cheng Lian lian.cs@gmail.com wrote: I couldn’t reproduce this with the following spark-shell snippet: scala import sqlContext.implicits._ scala Seq((1, 2)).toDF(a, b) scala res0.save(xxx, org.apache.spark.sql.SaveMode.Overwrite) scala res0.save(xxx, org.apache.spark.sql.SaveMode.Overwrite) The _common_metadata file is typically much smaller than _metadata, because it doesn’t contain row group information, and thus can be faster to read than _metadata. Cheng On 3/26/15 12:48 PM, Pei-Lun Lee wrote: Hi, When I save parquet file with SaveMode.Overwrite, it never generate _common_metadata. Whether it overwrites an existing dir or not. Is this expected behavior? And what is the benefit of _common_metadata? Will reading performs better when it is present? Thanks, -- Pei-Lun
Re: spark-sql throws org.datanucleus.store.rdbms.connectionpool.DatastoreDriverNotFoundException
Ok. I modified as per your suggestions export SPARK_HOME=/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4 export SPARK_JAR=$SPARK_HOME/lib/spark-assembly-1.3.0-hadoop2.4.0.jar export HADOOP_CONF_DIR=/apache/hadoop/conf cd $SPARK_HOME ./bin/spark-sql -v --driver-class-path /apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/yarn/lib/guava-11.0.2.jar:/home/dvasthimal/spark1.3/spark-avro_2.10-1.0.0.jar:/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-api-jdo-3.2.6.jar:/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-core-3.2.10.jar:/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-rdbms-3.2.9.jar:/home/dvasthimal/spark1.3/mysql-connector-java-5.1.34.jar spark-sql prompt . I ran show tables , desc dw_bid. Each throw below exception. spark-sql desc dw_bid; 15/03/26 23:10:14 WARN conf.HiveConf: DEPRECATED: Configuration property hive.metastore.local no longer has any effect. Make sure to provide a valid value for hive.metastore.uris if you are connecting to a remote metastore. 15/03/26 23:10:14 WARN conf.HiveConf: DEPRECATED: hive.metastore.ds.retry.* no longer has any effect. Use hive.hmshandler.retry.* instead 15/03/26 23:10:14 INFO parse.ParseDriver: Parsing command: desc dw_bid 15/03/26 23:10:14 INFO parse.ParseDriver: Parse Completed 15/03/26 23:10:15 INFO metastore.HiveMetaStore: 0: get_table : db=default tbl=dw_bid 15/03/26 23:10:15 INFO HiveMetaStore.audit: ugi=dvasthi...@corp.ebay.com ip=unknown-ip-addr cmd=get_table : db=default tbl=dw_bid 15/03/26 23:10:15 INFO spark.SparkContext: Starting job: collect at SparkPlan.scala:83 15/03/26 23:10:15 INFO scheduler.DAGScheduler: Got job 0 (collect at SparkPlan.scala:83) with 1 output partitions (allowLocal=false) 15/03/26 23:10:15 INFO scheduler.DAGScheduler: Final stage: Stage 0(collect at SparkPlan.scala:83) 15/03/26 23:10:15 INFO scheduler.DAGScheduler: Parents of final stage: List() 15/03/26 23:10:15 INFO scheduler.DAGScheduler: Missing parents: List() 15/03/26 23:10:15 INFO scheduler.DAGScheduler: Submitting Stage 0 (MapPartitionsRDD[1] at map at SparkPlan.scala:83), which has no missing parents 15/03/26 23:10:16 INFO scheduler.TaskSchedulerImpl: Cancelling stage 0 15/03/26 23:10:16 INFO scheduler.DAGScheduler: Job 0 failed: collect at SparkPlan.scala:83, took 0.078101 s 15/03/26 23:10:16 ERROR thriftserver.SparkSQLDriver: Failed in [desc dw_bid] org.apache.spark.SparkException: Job aborted due to stage failure: Task serialization failed: java.lang.reflect.InvocationTargetException sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) java.lang.reflect.Constructor.newInstance(Constructor.java:526) org.apache.spark.io.CompressionCodec$.createCodec(CompressionCodec.scala:68) org.apache.spark.io.CompressionCodec$.createCodec(CompressionCodec.scala:60) org.apache.spark.broadcast.TorrentBroadcast.org $apache$spark$broadcast$TorrentBroadcast$$setConf(TorrentBroadcast.scala:73) org.apache.spark.broadcast.TorrentBroadcast.init(TorrentBroadcast.scala:79) org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34) org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29) org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62) org.apache.spark.SparkContext.broadcast(SparkContext.scala:1051) org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:839) org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:778) org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:762) org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1362) org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354) org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1203) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1191) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1191) at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:847) at org.apache.spark.scheduler.DAGScheduler.org
Re: Spark SQL configurations
If you can share the stacktrace, then we can give your proper guidelines. For running on YARN, everything is described here: https://spark.apache.org/docs/latest/running-on-yarn.html Thanks Best Regards On Fri, Mar 27, 2015 at 8:21 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: Hello, Can someone share me the list of commands (including export statements) that you use to run Spark SQL over YARN cluster. I am unable to get it running on my YARN cluster and running into exceptions. I understand i need to share specific exception. This is more like i want to know if i have not missed out anything before running Spark SQL. Regards, Deepak
Add partition support in saveAsParquet
Hi, Anyone has similar request? https://issues.apache.org/jira/browse/SPARK-6561 When we save a DataFrame into Parquet files, we also want to have it partitioned. The proposed API looks like this: def saveAsParquet(path: String, partitionColumns: Seq[String]) -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: FetchFailedException during shuffle
What operation are you doing? I'm assuming you have enabled rdd compression and you are having an empty stream which it tries to uncompress (as seen from the Exceptions) Thanks Best Regards On Fri, Mar 27, 2015 at 7:15 AM, Chen Song chen.song...@gmail.com wrote: Using spark 1.3.0 on cdh5.1.0, I was running a fetch failed exception. I searched in this email list but not found anything like this reported. What could be the reason for the error? org.apache.spark.shuffle.FetchFailedException: [EMPTY_INPUT] Cannot decompress empty stream at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:130) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:127) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:127) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at
Re: Serialization Problem in Spark Program
Awesome. Thanks Best Regards On Fri, Mar 27, 2015 at 7:26 AM, donhoff_h 165612...@qq.com wrote: Hi, Akhil Yes, it's the problem lies in. Thanks very much for point out my mistake. -- Original -- *From: * Akhil Das;ak...@sigmoidanalytics.com; *Send time:* Thursday, Mar 26, 2015 3:23 PM *To:* donhoff_h165612...@qq.com; *Cc:* useruser@spark.apache.org; *Subject: * Re: Serialization Problem in Spark Program Try registering your MyObject[] with Kryo. On 25 Mar 2015 13:17, donhoff_h 165612...@qq.com wrote: Hi, experts I wrote a very simple spark program to test the KryoSerialization function. The codes are as following: object TestKryoSerialization { def main(args: Array[String]) { val conf = new SparkConf() conf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer) conf.set(spark.kryo.registrationRequired,true) //I use this statement to force checking registration. conf.registerKryoClasses(Array(classOf[MyObject])) val sc = new SparkContext(conf) val rdd = sc.textFile(hdfs://dhao.hrb:8020/user/spark/tstfiles/charset/A_utf8.txt) val objs = rdd.map(new MyObject(_,1)).collect() for (x - objs ) { x.printMyObject } } The class MyObject is also a very simple Class, which is only used to test the serialization function: class MyObject { var myStr : String = var myInt : Int = 0 def this(inStr : String, inInt : Int) { this() this.myStr = inStr this.myInt = inInt } def printMyObject { println(MyString is : +myStr+\tMyInt is : +myInt) } } But when I ran the application, it reported the following error: java.lang.IllegalArgumentException: Class is not registered: dhao.test.Serialization.MyObject[] Note: To register this class use: kryo.register(dhao.test.Serialization.MyObject[].class); at com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:442) at com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:79) at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:472) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:565) at org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:161) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) I don't understand what cause this problem. I have used the conf.registerKryoClasses to register my class. Could anyone help me ? Thanks By the way, the spark version is 1.3.0.
Re: RDD Exception Handling
Like this? val krdd = testrdd.map(x = { try{var key = val tmp_tocks = x.split(sep1)(0)(key, x) }catch{ case e: Exception = println(Exception!! = + e + |||KS1 + x)(null, x) }}) Thanks Best Regards On Thu, Mar 26, 2015 at 7:45 PM, Kevin Conaway ke...@zoomdata.com wrote: How can we catch exceptions that are thrown from custom RDDs or custom map functions? We have a custom RDD that is throwing an exception that we would like to catch but the exception that is thrown back to the caller is a *org.apache.spark.SparkException* that does not contain any useful information about the original exception. The detail message is a string representation of the original stack trace but its hard to do anything useful with that. Below is a small class that exhibits the issue. It uses a map function instead of a custom RDD but the symptom is the same, the original *RuntimeException* is lost. I tested this with spark 1.2.1 and 1.3.0 public class SparkErrorExample { public static void main(String [] args) throws Exception { SparkConf sparkConf = new SparkConf().setAppName(SparkExample).setMaster(local[*]); JavaSparkContext ctx = new JavaSparkContext(sparkConf); JavaRDDString data = ctx.parallelize(Arrays.asList(1, 2, 3)); try { data.map(line - { throw new RuntimeException(); }).count(); } catch (Exception ex) { System.out.println(Exception class: + ex.getClass()); System.out.println(Exception message: + ex.getMessage()); System.out.println(Exception cause: + ex.getCause()); } } }
Can spark sql read existing tables created in hive
I have few tables that are created in Hive. I wan to transform data stored in these Hive tables using Spark SQL. Is this even possible ? So far i have seen that i can create new tables using Spark SQL dialect. However when i run show tables or do desc hive_table it says table not found. I am now wondering is this support present or not in Spark SQL ? -- Deepak
Re: saveAsTable with path not working as expected (pyspark + Scala)
We can set a path, refer to the unit tests. For example: df.saveAsTable(savedJsonTable, org.apache.spark.sql.json, append, path =tmpPath) https://github.com/apache/spark/blob/master/python/pyspark/sql/tests.py Investigating some more, I found that the table is being created at the specified location, but the error is still being thrown, and the table has not been stored. This is the code that I ran: a = [Row(key=k, value=str(k)) for k in range(100)] df = sc.parallelize(a).toDF() df.saveAsTable(savedJsonTable, org.apache.spark.sql.json, append, path=/tmp/test10) 15/03/27 10:45:13 ERROR RetryingHMSHandler: MetaException(message:file:/user/hive/warehouse/savedjsontable is not a directory or unable to create one) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_table_core(HiveMetaStore.java:1239) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_table_with_environment_context(HiveMetaStore.java:1294) ... sqlCtx.tables() DataFrame[tableName: string, isTemporary: boolean] exit() ~ cat /tmp/test10/part-0 {key:0,value:0} {key:1,value:1} {key:2,value:2} {key:3,value:3} {key:4,value:4} {key:5,value:5} Kind Regards, Tom On 27 March 2015 at 10:33, Yanbo Liang yblia...@gmail.com wrote: saveAsTable will use the default data source configured by spark.sql.sources.default. def saveAsTable(tableName: String): Unit = { saveAsTable(tableName, SaveMode.ErrorIfExists) } It can not set path if I understand correct. 2015-03-27 15:45 GMT+08:00 Tom Walwyn twal...@gmail.com: Hi, The behaviour is the same for me in Scala and Python, so posting here in Python. When I use DataFrame.saveAsTable with the path option, I expect an external Hive table to be created at the specified path. Specifically, when I call: df.saveAsTable(..., path=/tmp/test) I expect an external Hive table to be created pointing to /tmp/test which would contain the data in df. However, running locally on my Mac, I get an error indicating that Spark tried to create a managed table in the location of the Hive warehouse: ERROR RetryingHMSHandler: MetaException(message:file:/user/hive/warehouse/savetable is not a directory or unable to create one) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_table_core(HiveMetaStore.java:1239) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_table_with_environment_context(HiveMetaStore.java:1294) Am I wrong to expect that Spark create an external table in this case? What is the expected behaviour of saveAsTable with the path option? Setup: running spark locally with spark 1.3.0. Kind Regards, Tom
Re: Spark SQL lateral view explode doesn't work, and unable to save array types to Parquet
https://issues.apache.org/jira/browse/SPARK-6570 I also left in the call to saveAsParquetFile(), as it produced a similar exception (though there was no use of explode there). On Fri, Mar 27, 2015 at 7:20 AM, Cheng Lian lian.cs@gmail.com wrote: This should be a bug in the Explode.eval(), which always assumes the underlying SQL array is represented by a Scala Seq. Would you mind to open a JIRA ticket for this? Thanks! Cheng On 3/27/15 7:00 PM, Jon Chase wrote: Spark 1.3.0 Two issues: a) I'm unable to get a lateral view explode query to work on an array type b) I'm unable to save an array type to a Parquet file I keep running into this: java.lang.ClassCastException: [I cannot be cast to scala.collection.Seq Here's a stack trace from the explode issue: root |-- col1: string (nullable = false) |-- col2s: array (nullable = true) ||-- element: integer (containsNull = true) ERROR org.apache.spark.executor.Executor Exception in task 7.0 in stage 1.0 (TID 15) java.lang.ClassCastException: [I cannot be cast to scala.collection.Seq at org.apache.spark.sql.catalyst.expressions.Explode.eval(generators.scala:125) ~[spark-catalyst_2.10-1.3.0.jar:1.3.0] at org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:70) ~[spark-sql_2.10-1.3.0.jar:1.3.0] at org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:69) ~[spark-sql_2.10-1.3.0.jar:1.3.0] at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) ~[scala-library-2.10.4.jar:na] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) ~[scala-library-2.10.4.jar:na] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) ~[scala-library-2.10.4.jar:na] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) ~[scala-library-2.10.4.jar:na] at scala.collection.Iterator$class.foreach(Iterator.scala:727) ~[scala-library-2.10.4.jar:na] at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) ~[scala-library-2.10.4.jar:na] at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) ~[scala-library-2.10.4.jar:na] at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) ~[scala-library-2.10.4.jar:na] at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) ~[scala-library-2.10.4.jar:na] at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) ~[scala-library-2.10.4.jar:na] at scala.collection.AbstractIterator.to(Iterator.scala:1157) ~[scala-library-2.10.4.jar:na] at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) ~[scala-library-2.10.4.jar:na] at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) ~[scala-library-2.10.4.jar:na] at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) ~[scala-library-2.10.4.jar:na] at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) ~[scala-library-2.10.4.jar:na] at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813) ~[spark-core_2.10-1.3.0.jar:1.3.0] at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813) ~[spark-core_2.10-1.3.0.jar:1.3.0] at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497) ~[spark-core_2.10-1.3.0.jar:1.3.0] at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497) ~[spark-core_2.10-1.3.0.jar:1.3.0] at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) ~[spark-core_2.10-1.3.0.jar:1.3.0] at org.apache.spark.scheduler.Task.run(Task.scala:64) ~[spark-core_2.10-1.3.0.jar:1.3.0] at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) ~[spark-core_2.10-1.3.0.jar:1.3.0] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_31] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_31] at java.lang.Thread.run(Thread.java:745) [na:1.8.0_31] WARN o.a.spark.scheduler.TaskSetManager Lost task 7.0 in stage 1.0 (TID 15, localhost): java.lang.ClassCastException: [I cannot be cast to scala.collection.Seq at org.apache.spark.sql.catalyst.expressions.Explode.eval(generators.scala:125) at org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:70) at org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:69) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at
Spark streaming
Hi all, We have a workflow that pulls in data from csv files, then originally setup up of the workflow was to parse the data as it comes in (turn into array), then store it. This resulted in out of memory errors with larger files (as a result of increased GC?). It turns out if the data gets stored as a string first, then parsed, it issues does not occur. Why is that? Thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-tp22255.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark SQL lateral view explode doesn't work, and unable to save array types to Parquet
This should be a bug in the Explode.eval(), which always assumes the underlying SQL array is represented by a Scala Seq. Would you mind to open a JIRA ticket for this? Thanks! Cheng On 3/27/15 7:00 PM, Jon Chase wrote: Spark 1.3.0 Two issues: a) I'm unable to get a lateral view explode query to work on an array type b) I'm unable to save an array type to a Parquet file I keep running into this: java.lang.ClassCastException: [I cannot be cast to scala.collection.Seq Here's a stack trace from the explode issue: root |-- col1: string (nullable = false) |-- col2s: array (nullable = true) ||-- element: integer (containsNull = true) ERROR org.apache.spark.executor.Executor Exception in task 7.0 in stage 1.0 (TID 15) java.lang.ClassCastException: [I cannot be cast to scala.collection.Seq at org.apache.spark.sql.catalyst.expressions.Explode.eval(generators.scala:125) ~[spark-catalyst_2.10-1.3.0.jar:1.3.0] at org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:70) ~[spark-sql_2.10-1.3.0.jar:1.3.0] at org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:69) ~[spark-sql_2.10-1.3.0.jar:1.3.0] at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) ~[scala-library-2.10.4.jar:na] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) ~[scala-library-2.10.4.jar:na] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) ~[scala-library-2.10.4.jar:na] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) ~[scala-library-2.10.4.jar:na] at scala.collection.Iterator$class.foreach(Iterator.scala:727) ~[scala-library-2.10.4.jar:na] at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) ~[scala-library-2.10.4.jar:na] at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) ~[scala-library-2.10.4.jar:na] at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) ~[scala-library-2.10.4.jar:na] at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) ~[scala-library-2.10.4.jar:na] at scala.collection.TraversableOnce$class.to http://class.to(TraversableOnce.scala:273) ~[scala-library-2.10.4.jar:na] at scala.collection.AbstractIterator.to http://scala.collection.AbstractIterator.to(Iterator.scala:1157) ~[scala-library-2.10.4.jar:na] at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) ~[scala-library-2.10.4.jar:na] at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) ~[scala-library-2.10.4.jar:na] at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) ~[scala-library-2.10.4.jar:na] at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) ~[scala-library-2.10.4.jar:na] at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813) ~[spark-core_2.10-1.3.0.jar:1.3.0] at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813) ~[spark-core_2.10-1.3.0.jar:1.3.0] at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497) ~[spark-core_2.10-1.3.0.jar:1.3.0] at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497) ~[spark-core_2.10-1.3.0.jar:1.3.0] at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) ~[spark-core_2.10-1.3.0.jar:1.3.0] at org.apache.spark.scheduler.Task.run(Task.scala:64) ~[spark-core_2.10-1.3.0.jar:1.3.0] at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) ~[spark-core_2.10-1.3.0.jar:1.3.0] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_31] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_31] at java.lang.Thread.run(Thread.java:745) [na:1.8.0_31] WARN o.a.spark.scheduler.TaskSetManager Lost task 7.0 in stage 1.0 (TID 15, localhost): java.lang.ClassCastException: [I cannot be cast to scala.collection.Seq at org.apache.spark.sql.catalyst.expressions.Explode.eval(generators.scala:125) at org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:70) at org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:69) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to http://class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to
Re: Can spark sql read existing tables created in hive
Seems Spark SQL accesses some more columns apart from those created by hive. You can always recreate the tables, you would need to execute the table creation scripts but it would be good to avoid recreation. On Fri, Mar 27, 2015 at 3:20 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I did copy hive-conf.xml form Hive installation into spark-home/conf. IT does have all the meta store connection details, host, username, passwd, driver and others. Snippet == configuration property namejavax.jdo.option.ConnectionURL/name valuejdbc:mysql://host.vip.company.com:3306/HDB/value /property property namejavax.jdo.option.ConnectionDriverName/name valuecom.mysql.jdbc.Driver/value descriptionDriver class name for a JDBC metastore/description /property property namejavax.jdo.option.ConnectionUserName/name valuehiveuser/value descriptionusername to use against metastore database/description /property property namejavax.jdo.option.ConnectionPassword/name valuesome-password/value descriptionpassword to use against metastore database/description /property property namehive.metastore.local/name valuefalse/value descriptioncontrols whether to connect to remove metastore server or open a new metastore server in Hive Client JVM/description /property property namehive.metastore.warehouse.dir/name value/user/hive/warehouse/value descriptionlocation of default database for the warehouse/description /property .. When i attempt to read hive table, it does not work. dw_bid does not exists. I am sure there is a way to read tables stored in HDFS (Hive) from Spark SQL. Otherwise how would anyone do analytics since the source tables are always either persisted directly on HDFS or through Hive. On Fri, Mar 27, 2015 at 1:15 PM, Arush Kharbanda ar...@sigmoidanalytics.com wrote: Since hive and spark SQL internally use HDFS and Hive metastore. The only thing you want to change is the processing engine. You can try to bring your hive-site.xml to %SPARK_HOME%/conf/hive-site.xml.(Ensure that the hive site xml captures the metastore connection details). Its a hack, i havnt tried it. I have played around with the metastore and it should work. On Fri, Mar 27, 2015 at 12:04 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I have few tables that are created in Hive. I wan to transform data stored in these Hive tables using Spark SQL. Is this even possible ? So far i have seen that i can create new tables using Spark SQL dialect. However when i run show tables or do desc hive_table it says table not found. I am now wondering is this support present or not in Spark SQL ? -- Deepak -- [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com *Arush Kharbanda* || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com -- Deepak -- [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com *Arush Kharbanda* || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com
Spark SQL lateral view explode doesn't work, and unable to save array types to Parquet
Spark 1.3.0 Two issues: a) I'm unable to get a lateral view explode query to work on an array type b) I'm unable to save an array type to a Parquet file I keep running into this: java.lang.ClassCastException: [I cannot be cast to scala.collection.Seq Here's a stack trace from the explode issue: root |-- col1: string (nullable = false) |-- col2s: array (nullable = true) ||-- element: integer (containsNull = true) ERROR org.apache.spark.executor.Executor Exception in task 7.0 in stage 1.0 (TID 15) java.lang.ClassCastException: [I cannot be cast to scala.collection.Seq at org.apache.spark.sql.catalyst.expressions.Explode.eval(generators.scala:125) ~[spark-catalyst_2.10-1.3.0.jar:1.3.0] at org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:70) ~[spark-sql_2.10-1.3.0.jar:1.3.0] at org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:69) ~[spark-sql_2.10-1.3.0.jar:1.3.0] at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) ~[scala-library-2.10.4.jar:na] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) ~[scala-library-2.10.4.jar:na] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) ~[scala-library-2.10.4.jar:na] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) ~[scala-library-2.10.4.jar:na] at scala.collection.Iterator$class.foreach(Iterator.scala:727) ~[scala-library-2.10.4.jar:na] at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) ~[scala-library-2.10.4.jar:na] at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) ~[scala-library-2.10.4.jar:na] at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) ~[scala-library-2.10.4.jar:na] at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) ~[scala-library-2.10.4.jar:na] at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) ~[scala-library-2.10.4.jar:na] at scala.collection.AbstractIterator.to(Iterator.scala:1157) ~[scala-library-2.10.4.jar:na] at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) ~[scala-library-2.10.4.jar:na] at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) ~[scala-library-2.10.4.jar:na] at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) ~[scala-library-2.10.4.jar:na] at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) ~[scala-library-2.10.4.jar:na] at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813) ~[spark-core_2.10-1.3.0.jar:1.3.0] at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813) ~[spark-core_2.10-1.3.0.jar:1.3.0] at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497) ~[spark-core_2.10-1.3.0.jar:1.3.0] at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497) ~[spark-core_2.10-1.3.0.jar:1.3.0] at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) ~[spark-core_2.10-1.3.0.jar:1.3.0] at org.apache.spark.scheduler.Task.run(Task.scala:64) ~[spark-core_2.10-1.3.0.jar:1.3.0] at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) ~[spark-core_2.10-1.3.0.jar:1.3.0] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_31] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_31] at java.lang.Thread.run(Thread.java:745) [na:1.8.0_31] WARN o.a.spark.scheduler.TaskSetManager Lost task 7.0 in stage 1.0 (TID 15, localhost): java.lang.ClassCastException: [I cannot be cast to scala.collection.Seq at org.apache.spark.sql.catalyst.expressions.Explode.eval(generators.scala:125) at org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:70) at org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:69) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813) at
Re: SparkSQL overwrite parquet file does not generate _common_metadata
Thanks for the information. Verified that the _common_metadata and _metadata file are missing in this case when using Hadoop 1.0.4. Would you mind to open a JIRA for this? Cheng On 3/27/15 2:40 PM, Pei-Lun Lee wrote: I'm using 1.0.4 Thanks, -- Pei-Lun On Fri, Mar 27, 2015 at 2:32 PM, Cheng Lian lian.cs@gmail.com mailto:lian.cs@gmail.com wrote: Hm, which version of Hadoop are you using? Actually there should also be a _metadata file together with _common_metadata. I was using Hadoop 2.4.1 btw. I'm not sure whether Hadoop version matters here, but I did observe cases where Spark behaves differently because of semantic differences of the same API in different Hadoop versions. Cheng On 3/27/15 11:33 AM, Pei-Lun Lee wrote: Hi Cheng, on my computer, execute res0.save(xxx, org.apache.spark.sql.SaveMode.Overwrite) produces: peilunlee@pllee-mini:~/opt/spark-1.3...rc3-bin-hadoop1$ mailto:peilunlee@pllee-mini:%7E/opt/spark-1.3...rc3-bin-hadoop1$ ls -l xxx total 32 -rwxrwxrwx 1 peilunlee staff0 Mar 27 11:29 _SUCCESS* -rwxrwxrwx 1 peilunlee staff 272 Mar 27 11:29 part-r-1.parquet* -rwxrwxrwx 1 peilunlee staff 272 Mar 27 11:29 part-r-2.parquet* -rwxrwxrwx 1 peilunlee staff 272 Mar 27 11:29 part-r-3.parquet* -rwxrwxrwx 1 peilunlee staff 488 Mar 27 11:29 part-r-4.parquet* while res0.save(xxx) produces: peilunlee@pllee-mini:~/opt/spark-1.3...rc3-bin-hadoop1$ mailto:peilunlee@pllee-mini:%7E/opt/spark-1.3...rc3-bin-hadoop1$ ls -l xxx total 40 -rwxrwxrwx 1 peilunlee staff0 Mar 27 11:29 _SUCCESS* -rwxrwxrwx 1 peilunlee staff 250 Mar 27 11:29 _common_metadata* -rwxrwxrwx 1 peilunlee staff 272 Mar 27 11:29 part-r-1.parquet* -rwxrwxrwx 1 peilunlee staff 272 Mar 27 11:29 part-r-2.parquet* -rwxrwxrwx 1 peilunlee staff 272 Mar 27 11:29 part-r-3.parquet* -rwxrwxrwx 1 peilunlee staff 488 Mar 27 11:29 part-r-4.parquet* On Thu, Mar 26, 2015 at 7:26 PM, Cheng Lian lian.cs@gmail.com mailto:lian.cs@gmail.com wrote: I couldn’t reproduce this with the following spark-shell snippet: |scala import sqlContext.implicits._ scala Seq((1, 2)).toDF(a, b) scala res0.save(xxx, org.apache.spark.sql.SaveMode.Overwrite) scala res0.save(xxx, org.apache.spark.sql.SaveMode.Overwrite) | The _common_metadata file is typically much smaller than _metadata, because it doesn’t contain row group information, and thus can be faster to read than _metadata. Cheng On 3/26/15 12:48 PM, Pei-Lun Lee wrote: Hi, When I save parquet file with SaveMode.Overwrite, it never generate _common_metadata. Whether it overwrites an existing dir or not. Is this expected behavior? And what is the benefit of _common_metadata? Will reading performs better when it is present? Thanks, -- Pei-Lun
Decrease In Performance due to Auto Increase of Partitions in Spark
In our application where we load our historical data in 40 partitioned RDDs (no. of available cores X 2) and we have not implemented any custom partitioner. After applying transformations on these RDDs intermediate RDDs are created which have partitions greater than 40 and sometimes partitions are going up till 300. 1. Is Spark intelligent enough to manage the partitions of RDD? Please suggest why there is an increase in the no. of partitions? 2. We suspect that increasing the no. of partitions is causing decrease in performance. 3. If we create a custom Partitioner will it improve our performance? Thanks, Sayantini
Re: Can spark sql read existing tables created in hive
I did copy hive-conf.xml form Hive installation into spark-home/conf. IT does have all the meta store connection details, host, username, passwd, driver and others. Snippet == configuration property namejavax.jdo.option.ConnectionURL/name valuejdbc:mysql://host.vip.company.com:3306/HDB/value /property property namejavax.jdo.option.ConnectionDriverName/name valuecom.mysql.jdbc.Driver/value descriptionDriver class name for a JDBC metastore/description /property property namejavax.jdo.option.ConnectionUserName/name valuehiveuser/value descriptionusername to use against metastore database/description /property property namejavax.jdo.option.ConnectionPassword/name valuesome-password/value descriptionpassword to use against metastore database/description /property property namehive.metastore.local/name valuefalse/value descriptioncontrols whether to connect to remove metastore server or open a new metastore server in Hive Client JVM/description /property property namehive.metastore.warehouse.dir/name value/user/hive/warehouse/value descriptionlocation of default database for the warehouse/description /property .. When i attempt to read hive table, it does not work. dw_bid does not exists. I am sure there is a way to read tables stored in HDFS (Hive) from Spark SQL. Otherwise how would anyone do analytics since the source tables are always either persisted directly on HDFS or through Hive. On Fri, Mar 27, 2015 at 1:15 PM, Arush Kharbanda ar...@sigmoidanalytics.com wrote: Since hive and spark SQL internally use HDFS and Hive metastore. The only thing you want to change is the processing engine. You can try to bring your hive-site.xml to %SPARK_HOME%/conf/hive-site.xml.(Ensure that the hive site xml captures the metastore connection details). Its a hack, i havnt tried it. I have played around with the metastore and it should work. On Fri, Mar 27, 2015 at 12:04 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I have few tables that are created in Hive. I wan to transform data stored in these Hive tables using Spark SQL. Is this even possible ? So far i have seen that i can create new tables using Spark SQL dialect. However when i run show tables or do desc hive_table it says table not found. I am now wondering is this support present or not in Spark SQL ? -- Deepak -- [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com *Arush Kharbanda* || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com -- Deepak
Re: Strange JavaDeserialization error - java.lang.ClassNotFoundException: org/apache/spark/storage/StorageLevel
It happens only when StorageLevel is used with 1 replica ( StorageLevel. MEMORY_ONLY_2,StorageLevel.MEMORY_AND_DISK_2) , StorageLevel.MEMORY_ONLY , StorageLevel.MEMORY_AND_DISK works - the problems must be clearly somewhere between mesos-spark . From console I see that spark is trying to replicate to nodes - nodes show up in Mesos active tasks ... but they always fail with ClassNotFoundE. 2015-03-27 0:52 GMT+01:00 Tathagata Das t...@databricks.com: Could you try running a simpler spark streaming program with receiver (may be socketStream) and see if that works. TD On Thu, Mar 26, 2015 at 2:08 PM, Ondrej Smola ondrej.sm...@gmail.com wrote: Hi thanks for reply, yes I have custom receiver - but it has simple logic .. pop ids from redis queue - load docs based on ids from elastic and store them in spark. No classloader modifications. I am running multiple Spark batch jobs (with user supplied partitioning) and they have no problems, debug in local mode show no errors. 2015-03-26 21:47 GMT+01:00 Tathagata Das t...@databricks.com: Here are few steps to debug. 1. Try using replication from a Spark job: sc.parallelize(1 to 100, 100).persist(StorageLevel.MEMORY_ONLY_2).count() 2. If one works, then we know that there is probably nothing wrong with the Spark installation, and probably in the threads related to the receivers receiving the data. Are you writing a custom receiver? Are you somehow playing around with the class loader in the custom receiver? TD On Thu, Mar 26, 2015 at 10:59 AM, Ondrej Smola ondrej.sm...@gmail.com wrote: Hi, I am running spark streaming v 1.3.0 (running inside Docker) on Mesos 0.21.1. Spark streaming is started using Marathon - docker container gets deployed and starts streaming (from custom Actor). Spark binary is located on shared GlusterFS volume. Data is streamed from Elasticsearch/Redis. When new batch arrives Spark tries to replicate it but fails with following error : 15/03/26 14:50:00 INFO MemoryStore: Block broadcast_0 of size 2840 dropped from memory (free 278017782) 15/03/26 14:50:00 INFO BlockManager: Removing block broadcast_0_piece0 15/03/26 14:50:00 INFO MemoryStore: Block broadcast_0_piece0 of size 1658 dropped from memory (free 278019440) 15/03/26 14:50:00 INFO BlockManagerMaster: Updated info of block broadcast_0_piece0 15/03/26 14:50:00 ERROR TransportRequestHandler: Error while invoking RpcHandler#receive() on RPC id 7178767328921933569 java.lang.ClassNotFoundException: org/apache/spark/storage/StorageLevel at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:344) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:65) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:68) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:88) at org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:65) at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:124) at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:97) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:91) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44) at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130) at
failed to launch workers on spark
Hi all! I am trying to install spark on my standalone machine. I am able to run the master but when i try to run the slaves it gives me following error. Any help in this regard will highly be appreciated. _ localhost: failed to launch org.apache.spark.deploy.worker.Worker: localhost: at sun.launcher.LauncherHelper.getMainMethod(LauncherHelper.java:494) localhost: at sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:486) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/failed-to-launch-workers-on-spark-tp22254.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
saveAsTable with path not working as expected (pyspark + Scala)
Hi, The behaviour is the same for me in Scala and Python, so posting here in Python. When I use DataFrame.saveAsTable with the path option, I expect an external Hive table to be created at the specified path. Specifically, when I call: df.saveAsTable(..., path=/tmp/test) I expect an external Hive table to be created pointing to /tmp/test which would contain the data in df. However, running locally on my Mac, I get an error indicating that Spark tried to create a managed table in the location of the Hive warehouse: ERROR RetryingHMSHandler: MetaException(message:file:/user/hive/warehouse/savetable is not a directory or unable to create one) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_table_core(HiveMetaStore.java:1239) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_table_with_environment_context(HiveMetaStore.java:1294) Am I wrong to expect that Spark create an external table in this case? What is the expected behaviour of saveAsTable with the path option? Setup: running spark locally with spark 1.3.0. Kind Regards, Tom
Error in Delete Table
Hi. In HiveContext, when I put this statement DROP TABLE IF EXISTS TestTable If TestTable doesn't exist, spark returns an error: ERROR Hive: NoSuchObjectException(message:default.TestTable table not found) at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$get_table_result$get_table_resultStandardScheme.read(ThriftHiveMetastore.java:29338) at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$get_table_result$get_table_resultStandardScheme.read(ThriftHiveMetastore.java:29306) at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$get_table_result.read(ThriftHiveMetastore.java:29237) at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:78) at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_get_table(ThriftHiveMetastore.java:1036) at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.get_table(ThriftHiveMetastore.java:1022) at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.getTable(HiveMetaStoreClient.java:1008) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:90) at com.sun.proxy.$Proxy22.getTable(Unknown Source) at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:1000) at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:942) at org.apache.hadoop.hive.ql.exec.DDLTask.dropTableOrPartitions(DDLTask.java:3887) at org.apache.hadoop.hive.ql.exec.DDLTask.execute(DDLTask.java:310) at org.apache.hadoop.hive.ql.exec.Task.executeTask(Task.java:153) at org.apache.hadoop.hive.ql.exec.TaskRunner.runSequential(TaskRunner.java:85) at org.apache.hadoop.hive.ql.Driver.launchTask(Driver.java:1554) at org.apache.hadoop.hive.ql.Driver.execute(Driver.java:1321) at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1139) at org.apache.hadoop.hive.ql.Driver.run(Driver.java:962) at org.apache.hadoop.hive.ql.Driver.run(Driver.java:952) at org.apache.spark.sql.hive.HiveContext.runHive(HiveContext.scala:305) at org.apache.spark.sql.hive.HiveContext.runSqlHive(HiveContext.scala:276) at org.apache.spark.sql.hive.execution.DropTable.sideEffectResult$lzycompute(commands.scala:58) at org.apache.spark.sql.hive.execution.DropTable.sideEffectResult(commands.scala:56) at org.apache.spark.sql.execution.Command$class.execute(commands.scala:46) at org.apache.spark.sql.hive.execution.DropTable.execute(commands.scala:51) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425) at org.apache.spark.sql.SchemaRDDLike$class.$init$(SchemaRDDLike.scala:58) at org.apache.spark.sql.SchemaRDD.init(SchemaRDD.scala:108) at org.apache.spark.sql.hive.HiveContext.sql(HiveContext.scala:94) at GeoMain$$anonfun$HiveExecution$1.apply(GeoMain.scala:98) at GeoMain$$anonfun$HiveExecution$1.apply(GeoMain.scala:98) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at GeoMain$.HiveExecution(GeoMain.scala:96) at GeoMain$.main(GeoMain.scala:17) at GeoMain.main(GeoMain.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Thanks!! -- Regards. Miguel Ángel
Re: saveAsTable with path not working as expected (pyspark + Scala)
Another follow-up: saveAsTable works as expected when running on hadoop cluster with Hive installed. It's just locally that I'm getting this strange behaviour. Any ideas why this is happening? Kind Regards. Tom On 27 March 2015 at 11:29, Tom Walwyn twal...@gmail.com wrote: We can set a path, refer to the unit tests. For example: df.saveAsTable(savedJsonTable, org.apache.spark.sql.json, append, path=tmpPath) https://github.com/apache/spark/blob/master/python/pyspark/sql/tests.py Investigating some more, I found that the table is being created at the specified location, but the error is still being thrown, and the table has not been stored. This is the code that I ran: a = [Row(key=k, value=str(k)) for k in range(100)] df = sc.parallelize(a).toDF() df.saveAsTable(savedJsonTable, org.apache.spark.sql.json, append, path=/tmp/test10) 15/03/27 10:45:13 ERROR RetryingHMSHandler: MetaException(message:file:/user/hive/warehouse/savedjsontable is not a directory or unable to create one) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_table_core(HiveMetaStore.java:1239) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_table_with_environment_context(HiveMetaStore.java:1294) ... sqlCtx.tables() DataFrame[tableName: string, isTemporary: boolean] exit() ~ cat /tmp/test10/part-0 {key:0,value:0} {key:1,value:1} {key:2,value:2} {key:3,value:3} {key:4,value:4} {key:5,value:5} Kind Regards, Tom On 27 March 2015 at 10:33, Yanbo Liang yblia...@gmail.com wrote: saveAsTable will use the default data source configured by spark.sql.sources.default. def saveAsTable(tableName: String): Unit = { saveAsTable(tableName, SaveMode.ErrorIfExists) } It can not set path if I understand correct. 2015-03-27 15:45 GMT+08:00 Tom Walwyn twal...@gmail.com: Hi, The behaviour is the same for me in Scala and Python, so posting here in Python. When I use DataFrame.saveAsTable with the path option, I expect an external Hive table to be created at the specified path. Specifically, when I call: df.saveAsTable(..., path=/tmp/test) I expect an external Hive table to be created pointing to /tmp/test which would contain the data in df. However, running locally on my Mac, I get an error indicating that Spark tried to create a managed table in the location of the Hive warehouse: ERROR RetryingHMSHandler: MetaException(message:file:/user/hive/warehouse/savetable is not a directory or unable to create one) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_table_core(HiveMetaStore.java:1239) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_table_with_environment_context(HiveMetaStore.java:1294) Am I wrong to expect that Spark create an external table in this case? What is the expected behaviour of saveAsTable with the path option? Setup: running spark locally with spark 1.3.0. Kind Regards, Tom
Re: failed to launch workers on spark
mas mas.ha...@gmail.com writes: Hi all! I am trying to install spark on my standalone machine. I am able to run the master but when i try to run the slaves it gives me following error. Any help in this regard will highly be appreciated. _ localhost: failed to launch org.apache.spark.deploy.worker.Worker: localhost: at sun.launcher.LauncherHelper.getMainMethod(LauncherHelper.java:494) localhost: at sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:486) It will be great if you could explain the steps that you followed to install and run spark. Also it will be great if you could paste the whole Exception stack. Thanks and Regards Noorul - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Column not found in schema when querying partitioned table
Hello Jon, Are you able to connect to existing Hive and read tables created in hive ? Regards, deepak On Thu, Mar 26, 2015 at 4:16 PM, Jon Chase jon.ch...@gmail.com wrote: I've filed this as https://issues.apache.org/jira/browse/SPARK-6554 On Thu, Mar 26, 2015 at 6:29 AM, Jon Chase jon.ch...@gmail.com wrote: Spark 1.3.0, Parquet I'm having trouble referencing partition columns in my queries. In the following example, 'probeTypeId' is a partition column. For example, the directory structure looks like this: /mydata /probeTypeId=1 ...files... /probeTypeId=2 ...files... I see the column when I reference load a DF using the /mydata directory and call df.printSchema(): ... |-- probeTypeId: integer (nullable = true) ... Parquet is also aware of the column: optional int32 probeTypeId; And this works fine: sqlContext.sql(select probeTypeId from df limit 1); ...as does df.show() - it shows the correct values for the partition column. However, when I try to use a partition column in a where clause, I get an exception stating that the column was not found in the schema: sqlContext.sql(select probeTypeId from df where probeTypeId = 1 limit 1); ... ... org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.IllegalArgumentException: Column [probeTypeId] was not found in schema! at parquet.Preconditions.checkArgument(Preconditions.java:47) at parquet.filter2.predicate.SchemaCompatibilityValidator.getColumnDescriptor(SchemaCompatibilityValidator.java:172) at parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumn(SchemaCompatibilityValidator.java:160) at parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumnFilterPredicate(SchemaCompatibilityValidator.java:142) at parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:76) at parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:41) at parquet.filter2.predicate.Operators$Eq.accept(Operators.java:162) at parquet.filter2.predicate.SchemaCompatibilityValidator.validate(SchemaCompatibilityValidator.java:46) at parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:41) at parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:22) at parquet.filter2.compat.FilterCompat$FilterPredicateCompat.accept(FilterCompat.java:108) at parquet.filter2.compat.RowGroupFilter.filterRowGroups(RowGroupFilter.java:28) at parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:158) ... ... What am I doing wrong? Here's the full stack trace: using local[*] for master 06:05:55,675 |-INFO in ch.qos.logback.classic.joran.action.ConfigurationAction - debug attribute not set 06:05:55,683 |-INFO in ch.qos.logback.core.joran.action.AppenderAction - About to instantiate appender of type [ch.qos.logback.core.ConsoleAppender] 06:05:55,694 |-INFO in ch.qos.logback.core.joran.action.AppenderAction - Naming appender as [STDOUT] 06:05:55,721 |-INFO in ch.qos.logback.core.joran.action.NestedComplexPropertyIA - Assuming default type [ch.qos.logback.classic.encoder.PatternLayoutEncoder] for [encoder] property 06:05:55,768 |-INFO in ch.qos.logback.classic.joran.action.RootLoggerAction - Setting level of ROOT logger to INFO 06:05:55,768 |-INFO in ch.qos.logback.core.joran.action.AppenderRefAction - Attaching appender named [STDOUT] to Logger[ROOT] 06:05:55,769 |-INFO in ch.qos.logback.classic.joran.action.ConfigurationAction - End of configuration. 06:05:55,770 |-INFO in ch.qos.logback.classic.joran.JoranConfigurator@6aaceffd - Registering current configuration as safe fallback point INFO org.apache.spark.SparkContext Running Spark version 1.3.0 WARN o.a.hadoop.util.NativeCodeLoader Unable to load native-hadoop library for your platform... using builtin-java classes where applicable INFO org.apache.spark.SecurityManager Changing view acls to: jon INFO org.apache.spark.SecurityManager Changing modify acls to: jon INFO org.apache.spark.SecurityManager SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(jon); users with modify permissions: Set(jon) INFO akka.event.slf4j.Slf4jLogger Slf4jLogger started INFO Remoting Starting remoting INFO Remoting Remoting started; listening on addresses :[akka.tcp:// sparkDriver@192.168.1.134:62493] INFO org.apache.spark.util.Utils Successfully started service 'sparkDriver' on port 62493. INFO org.apache.spark.SparkEnv Registering MapOutputTracker INFO org.apache.spark.SparkEnv Registering BlockManagerMaster INFO o.a.spark.storage.DiskBlockManager Created local directory at
Re: Error while querying hive table from spark shell
Did you resolve this ? I am facing the same error On Wed, Feb 11, 2015 at 1:02 PM, Arush Kharbanda ar...@sigmoidanalytics.com wrote: Seems that the HDFS path for the table dosnt contains any file/data. Does the metastore contain the right path for HDFS data. You can find the HDFS path in TBLS in your metastore. On Wed, Feb 11, 2015 at 12:20 PM, kundan kumar iitr.kun...@gmail.com wrote: Hi , I am getting the following error when I am trying query a hive table from spark shell. I have placed my hive-site.xml in the spark/conf directory. Please suggest how to resolve this error. scala sqlContext.sql(select count(*) from offers_new).collect().foreach(println) 15/02/11 01:48:01 WARN conf.HiveConf: DEPRECATED: hive.metastore.ds.retry.* no longer has any effect. Use hive.hmshandler.retry.* instead 15/02/11 01:48:01 INFO parse.ParseDriver: Parsing command: select count(*) from offers_new 15/02/11 01:48:01 INFO parse.ParseDriver: Parse Completed 15/02/11 01:48:01 INFO metastore.HiveMetaStore: 0: Opening raw store with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore 15/02/11 01:48:01 INFO metastore.ObjectStore: ObjectStore, initialize called org.apache.hadoop.hive.ql.metadata.HiveException: Unable to fetch table offers_new at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:984) at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:950) at org.apache.spark.sql.hive.HiveMetastoreCatalog.lookupRelation(HiveMetastoreCatalog.scala:70) at org.apache.spark.sql.hive.HiveContext$$anon$1.org $apache$spark$sql$catalyst$analysis$OverrideCatalog$$super$lookupRelation(HiveContext.scala:253) at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:141) at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:141) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$class.lookupRelation(Catalog.scala:141) at org.apache.spark.sql.hive.HiveContext$$anon$1.lookupRelation(HiveContext.scala:253) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$5.applyOrElse(Analyzer.scala:143) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$5.applyOrElse(Analyzer.scala:138) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:144) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:162) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildrenDown(TreeNode.scala:191) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:147) at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:135) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:138) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:137) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:61) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:59) at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111) at scala.collection.immutable.List.foldLeft(List.scala:84) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:59) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:51) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.sql.catalyst.rules.RuleExecutor.apply(RuleExecutor.scala:51) at org.apache.spark.sql.SQLContext$QueryExecution.analyzed$lzycompute(SQLContext.scala:411) at org.apache.spark.sql.SQLContext$QueryExecution.analyzed(SQLContext.scala:411) at org.apache.spark.sql.SQLContext$QueryExecution.withCachedData$lzycompute(SQLContext.scala:412) at org.apache.spark.sql.SQLContext$QueryExecution.withCachedData(SQLContext.scala:412) at
Re: Can spark sql read existing tables created in hive
Since hive and spark SQL internally use HDFS and Hive metastore. The only thing you want to change is the processing engine. You can try to bring your hive-site.xml to %SPARK_HOME%/conf/hive-site.xml.(Ensure that the hive site xml captures the metastore connection details). Its a hack, i havnt tried it. I have played around with the metastore and it should work. On Fri, Mar 27, 2015 at 12:04 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I have few tables that are created in Hive. I wan to transform data stored in these Hive tables using Spark SQL. Is this even possible ? So far i have seen that i can create new tables using Spark SQL dialect. However when i run show tables or do desc hive_table it says table not found. I am now wondering is this support present or not in Spark SQL ? -- Deepak -- [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com *Arush Kharbanda* || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com
Re: Parallel actions from driver
This is exactly my case also, it worked, thanks Sean. On 26 March 2015 at 23:35, Sean Owen so...@cloudera.com wrote: You can do this much more simply, I think, with Scala's parallel collections (try .par). There's nothing wrong with doing this, no. Here, something is getting caught in your closure, maybe unintentionally, that's not serializable. It's not directly related to the parallelism. On Thu, Mar 26, 2015 at 3:54 PM, Aram Mkrtchyan aram.mkrtchyan...@gmail.com wrote: Hi. I'm trying to trigger DataFrame's save method in parallel from my driver. For that purposes I use ExecutorService and Futures, here's my code: val futures = [1,2,3].map( t = pool.submit( new Runnable { override def run(): Unit = { val commons = events.filter(_._1 == t).map(_._2.common) saveAsParquetFile(sqlContext, commons, s$t/common) EventTypes.all.foreach { et = val eventData = events.filter(ev = ev._1 == t ev._2.eventType == et).map(_._2.data) saveAsParquetFile(sqlContext, eventData, s$t/$et) } } })) futures.foreach(_.get) It throws Task is not Serializable exception. Is it legal to use threads in driver to trigger actions? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- RGRDZ Harut
Spark SQL and DataSources API roadmap
Hello, Is there any published community roadmap for SparkSQL and the DataSources API? Regards, Ashish
Re: Spark SQL lateral view explode doesn't work, and unable to save array types to Parquet
Forgot to mention that, would you mind to also provide the full stack trace of the exception thrown in the saveAsParquetFile call? Thanks! Cheng On 3/27/15 7:35 PM, Jon Chase wrote: https://issues.apache.org/jira/browse/SPARK-6570 I also left in the call to saveAsParquetFile(), as it produced a similar exception (though there was no use of explode there). On Fri, Mar 27, 2015 at 7:20 AM, Cheng Lian lian.cs@gmail.com mailto:lian.cs@gmail.com wrote: This should be a bug in the Explode.eval(), which always assumes the underlying SQL array is represented by a Scala Seq. Would you mind to open a JIRA ticket for this? Thanks! Cheng On 3/27/15 7:00 PM, Jon Chase wrote: Spark 1.3.0 Two issues: a) I'm unable to get a lateral view explode query to work on an array type b) I'm unable to save an array type to a Parquet file I keep running into this: java.lang.ClassCastException: [I cannot be cast to scala.collection.Seq Here's a stack trace from the explode issue: root |-- col1: string (nullable = false) |-- col2s: array (nullable = true) ||-- element: integer (containsNull = true) ERROR org.apache.spark.executor.Executor Exception in task 7.0 in stage 1.0 (TID 15) java.lang.ClassCastException: [I cannot be cast to scala.collection.Seq at org.apache.spark.sql.catalyst.expressions.Explode.eval(generators.scala:125) ~[spark-catalyst_2.10-1.3.0.jar:1.3.0] at org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:70) ~[spark-sql_2.10-1.3.0.jar:1.3.0] at org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:69) ~[spark-sql_2.10-1.3.0.jar:1.3.0] at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) ~[scala-library-2.10.4.jar:na] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) ~[scala-library-2.10.4.jar:na] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) ~[scala-library-2.10.4.jar:na] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) ~[scala-library-2.10.4.jar:na] at scala.collection.Iterator$class.foreach(Iterator.scala:727) ~[scala-library-2.10.4.jar:na] at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) ~[scala-library-2.10.4.jar:na] at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) ~[scala-library-2.10.4.jar:na] at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) ~[scala-library-2.10.4.jar:na] at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) ~[scala-library-2.10.4.jar:na] at scala.collection.TraversableOnce$class.to http://class.to(TraversableOnce.scala:273) ~[scala-library-2.10.4.jar:na] at scala.collection.AbstractIterator.to http://scala.collection.AbstractIterator.to(Iterator.scala:1157) ~[scala-library-2.10.4.jar:na] at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) ~[scala-library-2.10.4.jar:na] at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) ~[scala-library-2.10.4.jar:na] at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) ~[scala-library-2.10.4.jar:na] at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) ~[scala-library-2.10.4.jar:na] at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813) ~[spark-core_2.10-1.3.0.jar:1.3.0] at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813) ~[spark-core_2.10-1.3.0.jar:1.3.0] at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497) ~[spark-core_2.10-1.3.0.jar:1.3.0] at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497) ~[spark-core_2.10-1.3.0.jar:1.3.0] at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) ~[spark-core_2.10-1.3.0.jar:1.3.0] at org.apache.spark.scheduler.Task.run(Task.scala:64) ~[spark-core_2.10-1.3.0.jar:1.3.0] at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) ~[spark-core_2.10-1.3.0.jar:1.3.0] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_31] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_31] at java.lang.Thread.run(Thread.java:745) [na:1.8.0_31] WARN o.a.spark.scheduler.TaskSetManager Lost task 7.0 in stage 1.0 (TID 15, localhost): java.lang.ClassCastException: [I cannot be cast to scala.collection.Seq at org.apache.spark.sql.catalyst.expressions.Explode.eval(generators.scala:125) at org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:70) at
saving schemaRDD to cassandra
Hi experts! I would like to know is there anyway to store schemaRDD to cassandra? if yes then how to store in existing cassandra column family and new column family? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/saving-schemaRDD-to-cassandra-tp22256.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Can spark sql read existing tables created in hive
I can recreate tables but what about data. It looks like this is a obvious feature that Spark SQL must be having. People will want to transform tons of data stored in HDFS through Hive from Spark SQL. Spark programming guide suggests its possible. Spark SQL also supports reading and writing data stored in Apache Hive http://hive.apache.org/. Configuration of Hive is done by placing your hive-site.xml file in conf/. https://spark.apache.org/docs/1.3.0/sql-programming-guide.html#hive-tables For some reason its not working. On Fri, Mar 27, 2015 at 3:35 PM, Arush Kharbanda ar...@sigmoidanalytics.com wrote: Seems Spark SQL accesses some more columns apart from those created by hive. You can always recreate the tables, you would need to execute the table creation scripts but it would be good to avoid recreation. On Fri, Mar 27, 2015 at 3:20 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I did copy hive-conf.xml form Hive installation into spark-home/conf. IT does have all the meta store connection details, host, username, passwd, driver and others. Snippet == configuration property namejavax.jdo.option.ConnectionURL/name valuejdbc:mysql://host.vip.company.com:3306/HDB/value /property property namejavax.jdo.option.ConnectionDriverName/name valuecom.mysql.jdbc.Driver/value descriptionDriver class name for a JDBC metastore/description /property property namejavax.jdo.option.ConnectionUserName/name valuehiveuser/value descriptionusername to use against metastore database/description /property property namejavax.jdo.option.ConnectionPassword/name valuesome-password/value descriptionpassword to use against metastore database/description /property property namehive.metastore.local/name valuefalse/value descriptioncontrols whether to connect to remove metastore server or open a new metastore server in Hive Client JVM/description /property property namehive.metastore.warehouse.dir/name value/user/hive/warehouse/value descriptionlocation of default database for the warehouse/description /property .. When i attempt to read hive table, it does not work. dw_bid does not exists. I am sure there is a way to read tables stored in HDFS (Hive) from Spark SQL. Otherwise how would anyone do analytics since the source tables are always either persisted directly on HDFS or through Hive. On Fri, Mar 27, 2015 at 1:15 PM, Arush Kharbanda ar...@sigmoidanalytics.com wrote: Since hive and spark SQL internally use HDFS and Hive metastore. The only thing you want to change is the processing engine. You can try to bring your hive-site.xml to %SPARK_HOME%/conf/hive-site.xml.(Ensure that the hive site xml captures the metastore connection details). Its a hack, i havnt tried it. I have played around with the metastore and it should work. On Fri, Mar 27, 2015 at 12:04 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I have few tables that are created in Hive. I wan to transform data stored in these Hive tables using Spark SQL. Is this even possible ? So far i have seen that i can create new tables using Spark SQL dialect. However when i run show tables or do desc hive_table it says table not found. I am now wondering is this support present or not in Spark SQL ? -- Deepak -- [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com *Arush Kharbanda* || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com -- Deepak -- [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com *Arush Kharbanda* || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com -- Deepak
Re: Strange JavaDeserialization error - java.lang.ClassNotFoundException: org/apache/spark/storage/StorageLevel
More info when using *spark.mesos.coarse* everything works as expected. I think this must be a bug in spark-mesos integration. 2015-03-27 9:23 GMT+01:00 Ondrej Smola ondrej.sm...@gmail.com: It happens only when StorageLevel is used with 1 replica ( StorageLevel. MEMORY_ONLY_2,StorageLevel.MEMORY_AND_DISK_2) , StorageLevel.MEMORY_ONLY , StorageLevel.MEMORY_AND_DISK works - the problems must be clearly somewhere between mesos-spark . From console I see that spark is trying to replicate to nodes - nodes show up in Mesos active tasks ... but they always fail with ClassNotFoundE. 2015-03-27 0:52 GMT+01:00 Tathagata Das t...@databricks.com: Could you try running a simpler spark streaming program with receiver (may be socketStream) and see if that works. TD On Thu, Mar 26, 2015 at 2:08 PM, Ondrej Smola ondrej.sm...@gmail.com wrote: Hi thanks for reply, yes I have custom receiver - but it has simple logic .. pop ids from redis queue - load docs based on ids from elastic and store them in spark. No classloader modifications. I am running multiple Spark batch jobs (with user supplied partitioning) and they have no problems, debug in local mode show no errors. 2015-03-26 21:47 GMT+01:00 Tathagata Das t...@databricks.com: Here are few steps to debug. 1. Try using replication from a Spark job: sc.parallelize(1 to 100, 100).persist(StorageLevel.MEMORY_ONLY_2).count() 2. If one works, then we know that there is probably nothing wrong with the Spark installation, and probably in the threads related to the receivers receiving the data. Are you writing a custom receiver? Are you somehow playing around with the class loader in the custom receiver? TD On Thu, Mar 26, 2015 at 10:59 AM, Ondrej Smola ondrej.sm...@gmail.com wrote: Hi, I am running spark streaming v 1.3.0 (running inside Docker) on Mesos 0.21.1. Spark streaming is started using Marathon - docker container gets deployed and starts streaming (from custom Actor). Spark binary is located on shared GlusterFS volume. Data is streamed from Elasticsearch/Redis. When new batch arrives Spark tries to replicate it but fails with following error : 15/03/26 14:50:00 INFO MemoryStore: Block broadcast_0 of size 2840 dropped from memory (free 278017782) 15/03/26 14:50:00 INFO BlockManager: Removing block broadcast_0_piece0 15/03/26 14:50:00 INFO MemoryStore: Block broadcast_0_piece0 of size 1658 dropped from memory (free 278019440) 15/03/26 14:50:00 INFO BlockManagerMaster: Updated info of block broadcast_0_piece0 15/03/26 14:50:00 ERROR TransportRequestHandler: Error while invoking RpcHandler#receive() on RPC id 7178767328921933569 java.lang.ClassNotFoundException: org/apache/spark/storage/StorageLevel at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:344) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:65) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:68) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:88) at org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:65) at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:124) at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:97) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:91) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44) at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at
Checking Data Integrity in Spark
Hello, I want to check if there is any way to check the data integrity of the data files. The use case is perform data integrity check on large files 100+ columns and reject records (write it another file) that does not meet criteria's (such as NOT NULL, date format, etc). Since there are lot of columns/integrity rules we should able to data integrity check through configurations (like xml, json, etc); Please share your thoughts.. Thanks Sathish
Re: Decrease In Performance due to Auto Increase of Partitions in Spark
Each RDD is composed of multiple blocks known as partitions, when you apply transformation over it, then it can grow in size depending on the operation (as the # objects/references increase) and that is probably the reason why you are seeing increased number of partitions. I don't think increased number of partitions will cause any performance decrease as it helps to evenly distribute the tasks across machines and per core. If you don't want more partitions, then you can do a .repartition over the transformed RDD. Custom partitioner can improve the performance depending on the usecase that you are having. Thanks Best Regards On Fri, Mar 27, 2015 at 4:39 PM, sayantini sayantiniba...@gmail.com wrote: In our application where we load our historical data in 40 partitioned RDDs (no. of available cores X 2) and we have not implemented any custom partitioner. After applying transformations on these RDDs intermediate RDDs are created which have partitions greater than 40 and sometimes partitions are going up till 300. 1. Is Spark intelligent enough to manage the partitions of RDD? Please suggest why there is an increase in the no. of partitions? 2. We suspect that increasing the no. of partitions is causing decrease in performance. 3. If we create a custom Partitioner will it improve our performance? Thanks, Sayantini
Re: Hive Table not from from Spark SQL
I tried the following 1) ./bin/spark-submit -v --master yarn-cluster --driver-class-path /home/dvasthimal/spark1.3/mysql-connector-java-5.1.34.jar:/apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/yarn/lib/guava-11.0.2.jar: *$SPARK_HOME/conf/hive-site.xml* --jars /home/dvasthimal/spark1.3/spark-avro_2.10-1.0.0.jar,/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-api-jdo-3.2.6.jar,/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-core-3.2.10.jar,/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-rdbms-3.2.9.jar --num-executors 1 --driver-memory 4g --driver-java-options -XX:MaxPermSize=2G --executor-memory 2g --executor-cores 1 --queue hdmi-express --class com.ebay.ep.poc.spark.reporting.SparkApp spark_reporting-1.0-SNAPSHOT.jar startDate=2015-02-16 endDate=2015-02-16 input=/user/dvasthimal/epdatasets/successdetail1/part-r-0.avro subcommand=successevents2 output=/user/dvasthimal/epdatasets/successdetail2 This throws dw_bid not found. Looks like Spark SQL is unable to read my existing Hive metastore and creates its own and hence complains that table is not found. 2) ./bin/spark-submit -v --master yarn-cluster --driver-class-path /home/dvasthimal/spark1.3/mysql-connector-java-5.1.34.jar:/apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/yarn/lib/guava-11.0.2.jar --jars /home/dvasthimal/spark1.3/spark-avro_2.10-1.0.0.jar,/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-api-jdo-3.2.6.jar,/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-core-3.2.10.jar,/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-rdbms-3.2.9.jar: *$SPARK_HOME/conf/hive-site.xml* --num-executors 1 --driver-memory 4g --driver-java-options -XX:MaxPermSize=2G --executor-memory 2g --executor-cores 1 --queue hdmi-express --class com.ebay.ep.poc.spark.reporting.SparkApp spark_reporting-1.0-SNAPSHOT.jar startDate=2015-02-16 endDate=2015-02-16 input=/user/dvasthimal/epdatasets/successdetail1/part-r-0.avro subcommand=successevents2 output=/user/dvasthimal/epdatasets/successdetail2 This time i do not get above error, however i get MySQL driver not found exception. Looks like this is even before its able to communicate to Hive. Caused by: org.datanucleus.exceptions.NucleusException: Attempt to invoke the BONECP plugin to create a ConnectionPool gave an error : The specified datastore driver (com.mysql.jdbc.Driver) was not found in the CLASSPATH. Please check your CLASSPATH specification, and the name of the driver. In both above cases, i do have hive-site.xml in Spark/conf folder. 3) ./bin/spark-submit -v --master yarn-cluster --driver-class-path /home/dvasthimal/spark1.3/mysql-connector-java-5.1.34.jar:/apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/yarn/lib/guava-11.0.2.jar --jars /home/dvasthimal/spark1.3/spark-avro_2.10-1.0.0.jar,/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-api-jdo-3.2.6.jar,/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-core-3.2.10.jar,/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-rdbms-3.2.9.jar--num-executors 1 --driver-memory 4g --driver-java-options -XX:MaxPermSize=2G --executor-memory 2g --executor-cores 1 --queue hdmi-express --class com.ebay.ep.poc.spark.reporting.SparkApp spark_reporting-1.0-SNAPSHOT.jar startDate=2015-02-16 endDate=2015-02-16 input=/user/dvasthimal/epdatasets/successdetail1/part-r-0.avro subcommand=successevents2 output=/user/dvasthimal/epdatasets/successdetail2 I do not specify hive-site.xml in --jars or --driver-class-path. Its present in spark/conf folder as per https://spark.apache.org/docs/1.3.0/sql-programming-guide.html#hive-tables. In this case i get same error as #1. dw_bid table not found. I want Spark SQL to know that there are tables in Hive and read that data. As per guide it looks like Spark SQL has that support. Please suggest. Regards, Deepak On Thu, Mar 26, 2015 at 9:01 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: Stack Trace: 15/03/26 08:25:42 INFO ql.Driver: OK 15/03/26 08:25:42 INFO log.PerfLogger: PERFLOG method=releaseLocks from=org.apache.hadoop.hive.ql.Driver 15/03/26 08:25:42 INFO log.PerfLogger: /PERFLOG method=releaseLocks start=1427383542966 end=1427383542966 duration=0 from=org.apache.hadoop.hive.ql.Driver 15/03/26 08:25:42 INFO log.PerfLogger: /PERFLOG method=Driver.run start=1427383535203 end=1427383542966 duration=7763 from=org.apache.hadoop.hive.ql.Driver 15/03/26 08:25:42 INFO metastore.HiveMetaStore: 0: get_tables: db=default pat=.* 15/03/26 08:25:42 INFO HiveMetaStore.audit: ugi=dvasthimal ip=unknown-ip-addr cmd=get_tables: db=default
Re: Spark SQL lateral view explode doesn't work, and unable to save array types to Parquet
Done. I also updated the name on the ticket to include both issues. Spark SQL arrays: explode() fails and cannot save array type to Parquet https://issues.apache.org/jira/browse/SPARK-6570 On Fri, Mar 27, 2015 at 8:14 AM, Cheng Lian lian.cs@gmail.com wrote: Forgot to mention that, would you mind to also provide the full stack trace of the exception thrown in the saveAsParquetFile call? Thanks! Cheng On 3/27/15 7:35 PM, Jon Chase wrote: https://issues.apache.org/jira/browse/SPARK-6570 I also left in the call to saveAsParquetFile(), as it produced a similar exception (though there was no use of explode there). On Fri, Mar 27, 2015 at 7:20 AM, Cheng Lian lian.cs@gmail.com wrote: This should be a bug in the Explode.eval(), which always assumes the underlying SQL array is represented by a Scala Seq. Would you mind to open a JIRA ticket for this? Thanks! Cheng On 3/27/15 7:00 PM, Jon Chase wrote: Spark 1.3.0 Two issues: a) I'm unable to get a lateral view explode query to work on an array type b) I'm unable to save an array type to a Parquet file I keep running into this: java.lang.ClassCastException: [I cannot be cast to scala.collection.Seq Here's a stack trace from the explode issue: root |-- col1: string (nullable = false) |-- col2s: array (nullable = true) ||-- element: integer (containsNull = true) ERROR org.apache.spark.executor.Executor Exception in task 7.0 in stage 1.0 (TID 15) java.lang.ClassCastException: [I cannot be cast to scala.collection.Seq at org.apache.spark.sql.catalyst.expressions.Explode.eval(generators.scala:125) ~[spark-catalyst_2.10-1.3.0.jar:1.3.0] at org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:70) ~[spark-sql_2.10-1.3.0.jar:1.3.0] at org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:69) ~[spark-sql_2.10-1.3.0.jar:1.3.0] at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) ~[scala-library-2.10.4.jar:na] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) ~[scala-library-2.10.4.jar:na] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) ~[scala-library-2.10.4.jar:na] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) ~[scala-library-2.10.4.jar:na] at scala.collection.Iterator$class.foreach(Iterator.scala:727) ~[scala-library-2.10.4.jar:na] at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) ~[scala-library-2.10.4.jar:na] at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) ~[scala-library-2.10.4.jar:na] at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) ~[scala-library-2.10.4.jar:na] at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) ~[scala-library-2.10.4.jar:na] at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) ~[scala-library-2.10.4.jar:na] at scala.collection.AbstractIterator.to(Iterator.scala:1157) ~[scala-library-2.10.4.jar:na] at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) ~[scala-library-2.10.4.jar:na] at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) ~[scala-library-2.10.4.jar:na] at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) ~[scala-library-2.10.4.jar:na] at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) ~[scala-library-2.10.4.jar:na] at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813) ~[spark-core_2.10-1.3.0.jar:1.3.0] at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813) ~[spark-core_2.10-1.3.0.jar:1.3.0] at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497) ~[spark-core_2.10-1.3.0.jar:1.3.0] at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497) ~[spark-core_2.10-1.3.0.jar:1.3.0] at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) ~[spark-core_2.10-1.3.0.jar:1.3.0] at org.apache.spark.scheduler.Task.run(Task.scala:64) ~[spark-core_2.10-1.3.0.jar:1.3.0] at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) ~[spark-core_2.10-1.3.0.jar:1.3.0] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_31] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_31] at java.lang.Thread.run(Thread.java:745) [na:1.8.0_31] WARN o.a.spark.scheduler.TaskSetManager Lost task 7.0 in stage 1.0 (TID 15, localhost): java.lang.ClassCastException: [I cannot be cast to scala.collection.Seq at org.apache.spark.sql.catalyst.expressions.Explode.eval(generators.scala:125) at org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:70) at org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:69) at
Re: Checking Data Integrity in Spark
Its not possible to configure Spark to do checks based on xmls. You would need to write jobs to do the validations you need. On Fri, Mar 27, 2015 at 5:13 PM, Sathish Kumaran Vairavelu vsathishkuma...@gmail.com wrote: Hello, I want to check if there is any way to check the data integrity of the data files. The use case is perform data integrity check on large files 100+ columns and reject records (write it another file) that does not meet criteria's (such as NOT NULL, date format, etc). Since there are lot of columns/integrity rules we should able to data integrity check through configurations (like xml, json, etc); Please share your thoughts.. Thanks Sathish -- [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com *Arush Kharbanda* || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com
Re: Spark streaming
Show us the code. This shouldn't happen for the simple process you described Sent from my rotary phone. On Mar 27, 2015, at 5:47 AM, jamborta jambo...@gmail.com wrote: Hi all, We have a workflow that pulls in data from csv files, then originally setup up of the workflow was to parse the data as it comes in (turn into array), then store it. This resulted in out of memory errors with larger files (as a result of increased GC?). It turns out if the data gets stored as a string first, then parsed, it issues does not occur. Why is that? Thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-tp22255.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark SQL lateral view explode doesn't work, and unable to save array types to Parquet
Thanks for the detailed information! On 3/27/15 9:16 PM, Jon Chase wrote: Done. I also updated the name on the ticket to include both issues. Spark SQL arrays: explode() fails and cannot save array type to Parquet https://issues.apache.org/jira/browse/SPARK-6570 On Fri, Mar 27, 2015 at 8:14 AM, Cheng Lian lian.cs@gmail.com mailto:lian.cs@gmail.com wrote: Forgot to mention that, would you mind to also provide the full stack trace of the exception thrown in the saveAsParquetFile call? Thanks! Cheng On 3/27/15 7:35 PM, Jon Chase wrote: https://issues.apache.org/jira/browse/SPARK-6570 I also left in the call to saveAsParquetFile(), as it produced a similar exception (though there was no use of explode there). On Fri, Mar 27, 2015 at 7:20 AM, Cheng Lian lian.cs@gmail.com mailto:lian.cs@gmail.com wrote: This should be a bug in the Explode.eval(), which always assumes the underlying SQL array is represented by a Scala Seq. Would you mind to open a JIRA ticket for this? Thanks! Cheng On 3/27/15 7:00 PM, Jon Chase wrote: Spark 1.3.0 Two issues: a) I'm unable to get a lateral view explode query to work on an array type b) I'm unable to save an array type to a Parquet file I keep running into this: java.lang.ClassCastException: [I cannot be cast to scala.collection.Seq Here's a stack trace from the explode issue: root |-- col1: string (nullable = false) |-- col2s: array (nullable = true) ||-- element: integer (containsNull = true) ERROR org.apache.spark.executor.Executor Exception in task 7.0 in stage 1.0 (TID 15) java.lang.ClassCastException: [I cannot be cast to scala.collection.Seq at org.apache.spark.sql.catalyst.expressions.Explode.eval(generators.scala:125) ~[spark-catalyst_2.10-1.3.0.jar:1.3.0] at org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:70) ~[spark-sql_2.10-1.3.0.jar:1.3.0] at org.apache.spark.sql.execution.Generate$$anonfun$2$$anonfun$apply$1.apply(Generate.scala:69) ~[spark-sql_2.10-1.3.0.jar:1.3.0] at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) ~[scala-library-2.10.4.jar:na] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) ~[scala-library-2.10.4.jar:na] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) ~[scala-library-2.10.4.jar:na] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) ~[scala-library-2.10.4.jar:na] at scala.collection.Iterator$class.foreach(Iterator.scala:727) ~[scala-library-2.10.4.jar:na] at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) ~[scala-library-2.10.4.jar:na] at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) ~[scala-library-2.10.4.jar:na] at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) ~[scala-library-2.10.4.jar:na] at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) ~[scala-library-2.10.4.jar:na] at scala.collection.TraversableOnce$class.to http://class.to(TraversableOnce.scala:273) ~[scala-library-2.10.4.jar:na] at scala.collection.AbstractIterator.to http://scala.collection.AbstractIterator.to(Iterator.scala:1157) ~[scala-library-2.10.4.jar:na] at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) ~[scala-library-2.10.4.jar:na] at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) ~[scala-library-2.10.4.jar:na] at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) ~[scala-library-2.10.4.jar:na] at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) ~[scala-library-2.10.4.jar:na] at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813) ~[spark-core_2.10-1.3.0.jar:1.3.0] at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813) ~[spark-core_2.10-1.3.0.jar:1.3.0] at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497) ~[spark-core_2.10-1.3.0.jar:1.3.0] at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497) ~[spark-core_2.10-1.3.0.jar:1.3.0] at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) ~[spark-core_2.10-1.3.0.jar:1.3.0] at org.apache.spark.scheduler.Task.run(Task.scala:64) ~[spark-core_2.10-1.3.0.jar:1.3.0]
Re: Spark streaming
jamborta : Please also describe the format of your csv files. Cheers On Fri, Mar 27, 2015 at 6:42 AM, DW @ Gmail deanwamp...@gmail.com wrote: Show us the code. This shouldn't happen for the simple process you described Sent from my rotary phone. On Mar 27, 2015, at 5:47 AM, jamborta jambo...@gmail.com wrote: Hi all, We have a workflow that pulls in data from csv files, then originally setup up of the workflow was to parse the data as it comes in (turn into array), then store it. This resulted in out of memory errors with larger files (as a result of increased GC?). It turns out if the data gets stored as a string first, then parsed, it issues does not occur. Why is that? Thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-tp22255.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: JavaKinesisWordCountASLYARN Example not working on EMR
Ankur, The JavaKinesisWordCountASLYARN is no longer valid and was added just to the EMR build back in 1.1.0 to demonstrate Spark Streaming with Kinesis in YARN, just follow the stock example as seen in JavaKinesisWordCountASL as it is better form anyway given it is best not to hard code the master setting. Thanks Christopher From: Ankur Jain [mailto:ankur.j...@yash.com] Sent: Wednesday, March 25, 2015 10:24 PM To: Arush Kharbanda Cc: user@spark.apache.org Subject: RE: JavaKinesisWordCountASLYARN Example not working on EMR I had installed spark via bootstrap in EMR. https://github.com/awslabs/emr-bootstrap-actions/tree/master/spark However when I run spark without yarn (local) and that one is working fine….. Thanks Ankur From: Arush Kharbanda [mailto:ar...@sigmoidanalytics.com] Sent: Wednesday, March 25, 2015 7:31 PM To: Ankur Jain Cc: user@spark.apache.org Subject: Re: JavaKinesisWordCountASLYARN Example not working on EMR Did you built for kineses using profile -Pkinesis-asl On Wed, Mar 25, 2015 at 7:18 PM, ankur.jain ankur.j...@yash.commailto:ankur.j...@yash.com wrote: Hi, I am trying to run a Spark on YARN program provided by Spark in the examples directory using Amazon Kinesis on EMR cluster : I am using Spark 1.3.0 and EMR AMI: 3.5.0 I've setup the Credentials export AWS_ACCESS_KEY_ID=XX export AWS_SECRET_KEY=XXX *A) This is the Kinesis Word Count Producer which ran Successfully : * run-example org.apache.spark.examples.streaming.KinesisWordCountProducerASL mySparkStream https://kinesis.us-east-1.amazonaws.com 1 5 *B) This one is the Normal Consumer using Spark Streaming which also ran Successfully: * run-example org.apache.spark.examples.streaming.JavaKinesisWordCountASL mySparkStream https://kinesis.us-east-1.amazonaws.com *C) And this is the YARN based program which is NOT WORKING: * run-example org.apache.spark.examples.streaming.JavaKinesisWordCountASLYARN mySparkStream https://kinesis.us-east-1.amazonaws.com\ Spark assembly has been built with Hive, including Datanucleus jars on classpath 15/03/25 11:52:45 INFO spark.SparkContext: Running Spark version 1.3.0 15/03/25 11:52:45 WARN spark.SparkConf: SPARK_CLASSPATH was detected (set to '/home/hadoop/spark/conf:/home/hadoop/conf:/home/hadoop/spark/classpath/emr/:/home/hadoop/spark/classpath/emrfs/:/home/hadoop/share/hadoop/common/lib/*:/home/hadoop/share/hadoop/common/lib/hadoop-lzo.jar'). This is deprecated in Spark 1.0+. Please instead use: • ./spark-submit with --driver-class-path to augment the driver classpath • spark.executor.extraClassPath to augment the executor classpath 15/03/25 11:52:45 WARN spark.SparkConf: Setting 'spark.executor.extraClassPath' to '/home/hadoop/spark/conf:/home/hadoop/conf:/home/hadoop/spark/classpath/emr/:/home/hadoop/spark/classpath/emrfs/:/home/hadoop/share/hadoop/common/lib/:/home/hadoop/share/hadoop/common/lib/hadoop-lzo.jar' as a work-around. 15/03/25 11:52:45 WARN spark.SparkConf: Setting 'spark.driver.extraClassPath' to '/home/hadoop/spark/conf:/home/hadoop/conf:/home/hadoop/spark/classpath/emr/:/home/hadoop/spark/classpath/emrfs/:/home/hadoop/share/hadoop/common/lib/:/home/hadoop/share/hadoop/common/lib/hadoop-lzo.jar' as a work-around. 15/03/25 11:52:46 INFO spark.SecurityManager: Changing view acls to: hadoop 15/03/25 11:52:46 INFO spark.SecurityManager: Changing modify acls to: hadoop 15/03/25 11:52:46 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop) 15/03/25 11:52:47 INFO slf4j.Slf4jLogger: Slf4jLogger started 15/03/25 11:52:48 INFO Remoting: Starting remoting 15/03/25 11:52:48 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@ip-10-80-175-92.ec2.internal:59504] 15/03/25 11:52:48 INFO util.Utils: Successfully started service 'sparkDriver' on port 59504. 15/03/25 11:52:48 INFO spark.SparkEnv: Registering MapOutputTracker 15/03/25 11:52:48 INFO spark.SparkEnv: Registering BlockManagerMaster 15/03/25 11:52:48 INFO storage.DiskBlockManager: Created local directory at /mnt/spark/spark-120befbc-6dae-4751-b41f-dbf7b3d97616/blockmgr-d339d180-36f5-465f-bda3-cecccb23b1d3 15/03/25 11:52:48 INFO storage.MemoryStore: MemoryStore started with capacity 265.4 MB 15/03/25 11:52:48 INFO spark.HttpFileServer: HTTP File server directory is /mnt/spark/spark-85e88478-3dad-4fcf-a43a-efd15166bef3/httpd-6115870a-0d90-44df-aa7c-a6bd1a47e107 15/03/25 11:52:48 INFO spark.HttpServer: Starting HTTP Server 15/03/25 11:52:49 INFO server.Server: jetty-8.y.z-SNAPSHOT 15/03/25 11:52:49 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:44879http://SocketConnector@0.0.0.0:44879 15/03/25 11:52:49 INFO util.Utils: Successfully started service 'HTTP file server' on port 44879. 15/03/25 11:52:49 INFO spark.SparkEnv: Registering OutputCommitCoordinator 15/03/25 11:52:49 INFO server.Server: jetty-8.y.z-SNAPSHOT 15/03/25 11:52:49 INFO
Re: Python Example sql.py not working in version spark-1.3.0-bin-hadoop2.4
This will be fixed in https://github.com/apache/spark/pull/5230/files On Fri, Mar 27, 2015 at 9:13 AM, Peter Mac peter.machar...@noaa.gov wrote: I downloaded spark version spark-1.3.0-bin-hadoop2.4. When the python version of sql.py is run the following error occurs: [root@nde-dev8-template python]# /root/spark-1.3.0-bin-hadoop2.4/bin/spark-submit sql.py Spark assembly has been built with Hive, including Datanucleus jars on classpath Traceback (most recent call last): File /root/spark-1.3.0-bin-hadoop2.4/examples/src/main/python/sql.py, line 22, in module from pyspark.sql import Row, StructField, StructType, StringType, IntegerType ImportError: cannot import name StructField -- The sql.py version, spark-1.2.1-bin-hadoop2.4, does not throw the error: [root@nde-dev8-template python]# /root/spark-1.2.1-bin-hadoop2.4/bin/spark-submit sql.py Spark assembly has been built with Hive, including Datanucleus jars on classpath 15/03/27 14:18:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/03/27 14:19:41 WARN ThreadLocalRandom: Failed to generate a seed from SecureRandom within 3 seconds. Not enough entrophy? root |-- age: integer (nullable = true) |-- name: string (nullable = true) root |-- person_name: string (nullable = false) |-- person_age: integer (nullable = false) root |-- age: integer (nullable = true) |-- name: string (nullable = true) Justin - The OS/JAVA environments are: OS: Linux nde-dev8-template 2.6.32-431.17.1.el6.x86_64 #1 SMP Fri Apr 11 17:27:00 EDT 2014 x86_64 x86_64 x86_64 GNU/Linux JAVA: [root@nde-dev8-template bin]# java -version java version 1.7.0_51 Java(TM) SE Runtime Environment (build 1.7.0_51-b13) Java HotSpot(TM) 64-Bit Server VM (build 24.51-b03, mixed mode) The same error occurs when using bin/pyspark shell. from pyspark.sql import StructField Traceback (most recent call last): File stdin, line 1, in module ImportError: cannot import name StructField --- Any advice for resolving? Thanks in advance. Peter -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Python-Example-sql-py-not-working-in-version-spark-1-3-0-bin-hadoop2-4-tp22261.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
JettyUtils.createServletHandler Method not Found?
I have a very strange error in Spark 1.3 where at runtime in the org.apache.spark.ui.JettyUtils object the method createServletHandler is not found Exception in thread main java.lang.NoSuchMethodError: org.apache.spark.ui.JettyUtils$.createServletHandler(Ljava/lang/String;Ljavax/servlet/http/HttpServlet;Ljava/lang/String;)Lorg/eclipse/jetty/servlet/ServletContextHandler; The code compiles without issue, but at runtime it fails. I know the Jetty dependencies have been changed, but this should not affect the JettyUtils inside Spark Core, or is there another change I am not aware of? Thanks, Kevin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/JettyUtils-createServletHandler-Method-not-Found-tp22262.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to avoid the repartitioning in graph construction
Hi, Now I have 10 edge data files in my HDFS directory, e.g. edges_part00, edges_part01, …, edges_part09 format: srcId tarId (They make a good partitioning of that whole graph, so I never expect any change(re-partitoning operations) on them during graph building). I am thinking of how to use them to construct graph using Graphx api, without any repartitioning. My idea: 1) to build an RDD, edgeTupleRDD, by using sc.textFile(“hdfs://myDirectory”) in where each file size is limited below 64MB(smaller than a HDFS block) so, normally I could get 1 partitions per file, right? 2) then, to build the graph by using Graph.fromEdgeTuples(edgeTupleRDD,..) from graphx documentation, this operation will keep those partitions without any change, right? ——— — - Is there any other idea, or anything I missed? - if a file is larger than 64MB(the default size of a HDFS block), the repartitioning will be inevitable?? Thanks in advance! Best, Yifan LI
[Dataframe] Problem with insertIntoJDBC and existing database
Hello, I 'm trying to develop with the new Dataframe API, but I'm running into an error. I have an existing MySQL database and I want to insert rows. I create a Dataframe from an RDD, then use the insertIntoJDBC function. It appear that dataframes reorder the data inside them. As a result, I get an error because the fields are not inserted in the proper order. Is there a way to specify the name or the order of my variables inside the database? If it is a bug, here is an example to reproduce it: My table: == CREATE TABLE `reference` ( `zvalue` text, `avalue` int(11) DEFAULT NULL ) ENGINE=InnoDB DEFAULT CHARSET=latin1 == My class: == package org.mypackage.testspark; import java.util.Arrays; public class MysqlInsert { public static class rowStruct implements java.io.Serializable { private static final long serialVersionUID = 1L; public java.lang.String zvalue; public java.lang.Integer avalue; public rowStruct() { } public java.lang.String getZvalue() { return this.zvalue; } public java.lang.Integer getAvalue() { return this.avalue; } public void setZvalue(java.lang.String zvalue) { this.zvalue = zvalue; } public void setAvalue(java.lang.Integer avalue) { this.avalue = avalue; } } public static void main(String[] args) throws Exception { SparkConf sparkConf = new SparkConf().setAppName(myApp); sparkConf.setMaster(local[2]); JavaSparkContext ctx = new JavaSparkContext(sparkConf); rowStruct rowStruct = new rowStruct(); rowStruct.setZvalue(test); rowStruct.setAvalue(1); org.apache.spark.api.java.JavaRDDrowStruct rdd_row6 = ctx.parallelize(Arrays.asList(rowStruct)); org.apache.spark.sql.SQLContext sqlCtx = new org.apache.spark.sql.SQLContext(ctx); org.apache.spark.sql.DataFrame df = sqlCtx.createDataFrame(rdd_row6, rowStruct.class); df.insertIntoJDBC(jdbc:mysql://172.17.0.2:3306/mysql?user=rootpassword=pass, reference, false); } } == My error log: == Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 15/03/27 18:31:19 INFO SparkContext: Running Spark version 1.3.0-SNAPSHOT 15/03/27 18:31:19 WARN Utils: Your hostname, Tlnd-pbailly resolves to a loopback address: 127.0.1.1; using 10.42.20.124 instead (on interface wlan0) 15/03/27 18:31:19 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address 15/03/27 18:31:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/03/27 18:31:20 INFO SecurityManager: Changing view acls to: pbailly 15/03/27 18:31:20 INFO SecurityManager: Changing modify acls to: pbailly 15/03/27 18:31:20 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(pbailly); users with modify permissions: Set(pbailly) 15/03/27 18:31:20 INFO Slf4jLogger: Slf4jLogger started 15/03/27 18:31:20 INFO Remoting: Starting remoting 15/03/27 18:31:20 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@10.42.20.124:58185] 15/03/27 18:31:20 INFO Utils: Successfully started service 'sparkDriver' on port 58185. 15/03/27 18:31:20 INFO SparkEnv: Registering MapOutputTracker 15/03/27 18:31:20 INFO SparkEnv: Registering BlockManagerMaster 15/03/27 18:31:20 INFO DiskBlockManager: Created local directory at /tmp/spark-1baef5a9-8c70-4c88-aaa6-7462f473c5b6/blockmgr-20176350-a69c-4170-b704-6621ca393889 15/03/27 18:31:20 INFO MemoryStore: MemoryStore started with capacity 947.7 MB 15/03/27 18:31:20 INFO HttpFileServer: HTTP File server directory is /tmp/spark-1ff51d4d-6172-4231-98c0-5e69edc6e64e/httpd-4eb77dcf-da49-438e-b5db-ecbf07193245 15/03/27 18:31:20 INFO HttpServer: Starting HTTP Server 15/03/27 18:31:20 INFO Server: jetty-8.y.z-SNAPSHOT 15/03/27 18:31:20 INFO AbstractConnector: Started SocketConnector@0.0.0.0:43576 15/03/27 18:31:20 INFO Utils: Successfully started service 'HTTP file server' on port 43576. 15/03/27 18:31:20 INFO SparkEnv: Registering OutputCommitCoordinator 15/03/27 18:31:20 INFO Server: jetty-8.y.z-SNAPSHOT 15/03/27 18:31:20 INFO AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040 15/03/27 18:31:20 INFO Utils: Successfully started service 'SparkUI' on port 4040. 15/03/27 18:31:20 INFO SparkUI: Started SparkUI at http://10.42.20.124:4040 15/03/27 18:31:20 INFO Executor: Starting executor ID driver on host localhost 15/03/27 18:31:20 INFO AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkDriver@10.42.20.124:58185/user/HeartbeatReceiver 15/03/27 18:31:21 INFO NettyBlockTransferService: Server created on 43013 15/03/27 18:31:21 INFO BlockManagerMaster: Trying to register BlockManager 15/03/27 18:31:21 INFO BlockManagerMasterActor: Registering block
Re: WordCount example
I checked the ports using netstat and don't see any connections established on that port. Logs show only this: 15/03/27 13:50:48 INFO Master: Registering app NetworkWordCount 15/03/27 13:50:48 INFO Master: Registered app NetworkWordCount with ID app-20150327135048-0002 Spark ui shows: Running Applications IDNameCoresMemory per NodeSubmitted TimeUserStateDuration app-20150327135048-0002 http://54.69.225.94:8080/app?appId=app-20150327135048-0002NetworkWordCount http://ip-10-241-251-232.us-west-2.compute.internal:4040/0512.0 MB2015/03/27 13:50:48ec2-userWAITING33 s Code looks like is being executed: java -cp .:* org.spark.test.WordCount spark://ip-10-241-251-232:7077 *public* *static* *void* doWork(String masterUrl){ SparkConf conf = *new* SparkConf().setMaster(masterUrl).setAppName( NetworkWordCount); JavaStreamingContext *jssc* = *new* JavaStreamingContext(conf, Durations. *seconds*(1)); JavaReceiverInputDStreamString lines = jssc.socketTextStream(localhost, ); System.*out*.println(Successfully created connection); *mapAndReduce*(lines); jssc.start(); // Start the computation jssc.awaitTermination(); // Wait for the computation to terminate } *public* *static* *void* main(String ...args){ *doWork*(args[0]); } And output of the java program after submitting the task: java -cp .:* org.spark.test.WordCount spark://ip-10-241-251-232:7077 Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 15/03/27 13:50:46 INFO SecurityManager: Changing view acls to: ec2-user 15/03/27 13:50:46 INFO SecurityManager: Changing modify acls to: ec2-user 15/03/27 13:50:46 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(ec2-user); users with modify permissions: Set(ec2-user) 15/03/27 13:50:46 INFO Slf4jLogger: Slf4jLogger started 15/03/27 13:50:46 INFO Remoting: Starting remoting 15/03/27 13:50:47 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkdri...@ip-10-241-251-232.us-west-2.compute.internal:60184] 15/03/27 13:50:47 INFO Utils: Successfully started service 'sparkDriver' on port 60184. 15/03/27 13:50:47 INFO SparkEnv: Registering MapOutputTracker 15/03/27 13:50:47 INFO SparkEnv: Registering BlockManagerMaster 15/03/27 13:50:47 INFO DiskBlockManager: Created local directory at /tmp/spark-local-20150327135047-5399 15/03/27 13:50:47 INFO MemoryStore: MemoryStore started with capacity 3.5 GB 15/03/27 13:50:47 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/03/27 13:50:47 INFO HttpFileServer: HTTP File server directory is /tmp/spark-7e26df49-1520-4c77-b411-c837da59fa5b 15/03/27 13:50:47 INFO HttpServer: Starting HTTP Server 15/03/27 13:50:47 INFO Utils: Successfully started service 'HTTP file server' on port 57955. 15/03/27 13:50:47 INFO Utils: Successfully started service 'SparkUI' on port 4040. 15/03/27 13:50:47 INFO SparkUI: Started SparkUI at http://ip-10-241-251-232.us-west-2.compute.internal:4040 15/03/27 13:50:47 INFO AppClient$ClientActor: Connecting to master spark://ip-10-241-251-232:7077... 15/03/27 13:50:48 INFO SparkDeploySchedulerBackend: Connected to Spark cluster with app ID app-20150327135048-0002 15/03/27 13:50:48 INFO NettyBlockTransferService: Server created on 58358 15/03/27 13:50:48 INFO BlockManagerMaster: Trying to register BlockManager 15/03/27 13:50:48 INFO BlockManagerMasterActor: Registering block manager ip-10-241-251-232.us-west-2.compute.internal:58358 with 3.5 GB RAM, BlockManagerId(driver, ip-10-241-251-232.us-west-2.compute.internal, 58358) 15/03/27 13:50:48 INFO BlockManagerMaster: Registered BlockManager 15/03/27 13:50:48 INFO SparkDeploySchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.0 15/03/27 13:50:48 INFO ReceiverTracker: ReceiverTracker started 15/03/27 13:50:48 INFO ForEachDStream: metadataCleanupDelay = -1 15/03/27 13:50:48 INFO ShuffledDStream: metadataCleanupDelay = -1 15/03/27 13:50:48 INFO MappedDStream: metadataCleanupDelay = -1 15/03/27 13:50:48 INFO FlatMappedDStream: metadataCleanupDelay = -1 15/03/27 13:50:48 INFO SocketInputDStream: metadataCleanupDelay = -1 15/03/27 13:50:48 INFO SocketInputDStream: Slide time = 1000 ms 15/03/27 13:50:48 INFO SocketInputDStream: Storage level = StorageLevel(false, false, false, false, 1) 15/03/27 13:50:48 INFO SocketInputDStream: Checkpoint interval = null 15/03/27 13:50:48 INFO SocketInputDStream: Remember duration = 1000 ms 15/03/27 13:50:48 INFO SocketInputDStream: Initialized and validated org.apache.spark.streaming.dstream.SocketInputDStream@75efa13d 15/03/27 13:50:48 INFO FlatMappedDStream: Slide time = 1000 ms 15/03/27 13:50:48 INFO FlatMappedDStream: Storage level = StorageLevel(false, false, false, false, 1) 15/03/27 13:50:48 INFO FlatMappedDStream: Checkpoint interval = null 15/03/27 13:50:48 INFO FlatMappedDStream: Remember duration = 1000 ms 15/03/27
Re: JettyUtils.createServletHandler Method not Found?
JettyUtils is marked with: private[spark] object JettyUtils extends Logging { FYI On Fri, Mar 27, 2015 at 9:50 AM, kmader kevin.ma...@gmail.com wrote: I have a very strange error in Spark 1.3 where at runtime in the org.apache.spark.ui.JettyUtils object the method createServletHandler is not found Exception in thread main java.lang.NoSuchMethodError: org.apache.spark.ui.JettyUtils$.createServletHandler(Ljava/lang/String;Ljavax/servlet/http/HttpServlet;Ljava/lang/String;)Lorg/eclipse/jetty/servlet/ServletContextHandler; The code compiles without issue, but at runtime it fails. I know the Jetty dependencies have been changed, but this should not affect the JettyUtils inside Spark Core, or is there another change I am not aware of? Thanks, Kevin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/JettyUtils-createServletHandler-Method-not-Found-tp22262.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Could not compute split, block not found in Spark Streaming Simple Application
Hi, I am just running this simple example with machineA: 1 master + 1 worker machineB: 1 worker « val ssc = new StreamingContext(sparkConf, Duration(1000)) val rawStreams = (1 to numStreams).map(_ =ssc.rawSocketStream[String](host, port, StorageLevel.MEMORY_ONLY_SER)).toArray val union = ssc.union(rawStreams) union.filter(line = Random.nextInt(1) == 0).map(line = { var sum = BigInt(0) line.toCharArray.foreach(chr = sum += chr.toInt) fib2(sum) sum }).reduceByWindow(_+_, Seconds(1),Seconds(1)).map(s = s### result: $s).print() » And I'm getting the following exceptions: Log from machineB « 15/03/27 16:21:35 INFO CoarseGrainedExecutorBackend: Got assigned task 132 15/03/27 16:21:35 INFO Executor: Running task 0.0 in stage 27.0 (TID 132) 15/03/27 16:21:35 INFO CoarseGrainedExecutorBackend: Got assigned task 134 15/03/27 16:21:35 INFO Executor: Running task 2.0 in stage 27.0 (TID 134) 15/03/27 16:21:35 INFO TorrentBroadcast: Started reading broadcast variable 24 15/03/27 16:21:35 INFO CoarseGrainedExecutorBackend: Got assigned task 136 15/03/27 16:21:35 INFO Executor: Running task 4.0 in stage 27.0 (TID 136) 15/03/27 16:21:35 INFO CoarseGrainedExecutorBackend: Got assigned task 138 15/03/27 16:21:35 INFO Executor: Running task 6.0 in stage 27.0 (TID 138) 15/03/27 16:21:35 INFO CoarseGrainedExecutorBackend: Got assigned task 140 15/03/27 16:21:35 INFO Executor: Running task 8.0 in stage 27.0 (TID 140) 15/03/27 16:21:35 INFO MemoryStore: ensureFreeSpace(1886) called with curMem=47117, maxMem=280248975 15/03/27 16:21:35 INFO MemoryStore: Block broadcast_24_piece0 stored as bytes in memory (estimated size 1886.0 B, free 267.2 MB) 15/03/27 16:21:35 INFO BlockManagerMaster: Updated info of block broadcast_24_piece0 15/03/27 16:21:35 INFO TorrentBroadcast: Reading broadcast variable 24 took 19 ms 15/03/27 16:21:35 INFO MemoryStore: ensureFreeSpace(3104) called with curMem=49003, maxMem=280248975 15/03/27 16:21:35 INFO MemoryStore: Block broadcast_24 stored as values in memory (estimated size 3.0 KB, free 267.2 MB) 15/03/27 16:21:35 ERROR Executor: Exception in task 8.0 in stage 27.0 (TID 140) java.lang.Exception: Could not compute split, block input-0-1427473262420 not found at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1146) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:701) 15/03/27 16:21:35 ERROR Executor: Exception in task 6.0 in stage 27.0 (TID 138) java.lang.Exception: Could not compute split, block input-0-1427473262418 not found at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
Re: Could not compute split, block not found in Spark Streaming Simple Application
If it is deterministically reproducible, could you generate full DEBUG level logs, from the driver and the workers and give it to me? Basically I want to trace through what is happening to the block that is not being found. And can you tell what Cluster manager are you using? Spark Standalone, Mesos or YARN? On Fri, Mar 27, 2015 at 10:09 AM, Saiph Kappa saiph.ka...@gmail.com wrote: Hi, I am just running this simple example with machineA: 1 master + 1 worker machineB: 1 worker « val ssc = new StreamingContext(sparkConf, Duration(1000)) val rawStreams = (1 to numStreams).map(_ =ssc.rawSocketStream[String](host, port, StorageLevel.MEMORY_ONLY_SER)).toArray val union = ssc.union(rawStreams) union.filter(line = Random.nextInt(1) == 0).map(line = { var sum = BigInt(0) line.toCharArray.foreach(chr = sum += chr.toInt) fib2(sum) sum }).reduceByWindow(_+_, Seconds(1),Seconds(1)).map(s = s### result: $s).print() » And I'm getting the following exceptions: Log from machineB « 15/03/27 16:21:35 INFO CoarseGrainedExecutorBackend: Got assigned task 132 15/03/27 16:21:35 INFO Executor: Running task 0.0 in stage 27.0 (TID 132) 15/03/27 16:21:35 INFO CoarseGrainedExecutorBackend: Got assigned task 134 15/03/27 16:21:35 INFO Executor: Running task 2.0 in stage 27.0 (TID 134) 15/03/27 16:21:35 INFO TorrentBroadcast: Started reading broadcast variable 24 15/03/27 16:21:35 INFO CoarseGrainedExecutorBackend: Got assigned task 136 15/03/27 16:21:35 INFO Executor: Running task 4.0 in stage 27.0 (TID 136) 15/03/27 16:21:35 INFO CoarseGrainedExecutorBackend: Got assigned task 138 15/03/27 16:21:35 INFO Executor: Running task 6.0 in stage 27.0 (TID 138) 15/03/27 16:21:35 INFO CoarseGrainedExecutorBackend: Got assigned task 140 15/03/27 16:21:35 INFO Executor: Running task 8.0 in stage 27.0 (TID 140) 15/03/27 16:21:35 INFO MemoryStore: ensureFreeSpace(1886) called with curMem=47117, maxMem=280248975 15/03/27 16:21:35 INFO MemoryStore: Block broadcast_24_piece0 stored as bytes in memory (estimated size 1886.0 B, free 267.2 MB) 15/03/27 16:21:35 INFO BlockManagerMaster: Updated info of block broadcast_24_piece0 15/03/27 16:21:35 INFO TorrentBroadcast: Reading broadcast variable 24 took 19 ms 15/03/27 16:21:35 INFO MemoryStore: ensureFreeSpace(3104) called with curMem=49003, maxMem=280248975 15/03/27 16:21:35 INFO MemoryStore: Block broadcast_24 stored as values in memory (estimated size 3.0 KB, free 267.2 MB) 15/03/27 16:21:35 ERROR Executor: Exception in task 8.0 in stage 27.0 (TID 140) java.lang.Exception: Could not compute split, block input-0-1427473262420 not found at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1146) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:701) 15/03/27 16:21:35 ERROR Executor: Exception in task 6.0 in stage 27.0 (TID 138) java.lang.Exception: Could not compute split, block input-0-1427473262418 not found at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at
RDD collect hangs on large input data
Hi, I have a simple Spark application: it creates an input rdd with sc.textfile, and it calls flatMapToPair, reduceByKey and map on it. The output rdd is small, a few MB's. Then I call collect() on the output. If the textfile is ~50GB, it finishes in a few minutes. However, if it's larger (~100GB) the execution hangs at the end of the collect() stage. The UI shows one active job (collect); one completed (flatMapToPair) and one active stage (collect). The collect stage has 880/892 tasks succeeded so I think the issue should happen when the whole job is finished (every task on the UI is either in SUCCESS or in RUNNING state). The driver and the containers don't log anything for 15 mins, then I get Connection time out. I run the job in yarn-cluster mode on Amazon EMR with Spark 1.2.1 and Hadoop 2.4.0. This happens every time I run the process with larger input data so I think this isn't just a connection issue or something like that. Is this a Spark bug or something is wrong with my setup? Zsolt
Re: Combining Many RDDs
Hi Kelvin, Thank you. That works for me. I wrote my own joins that produced Scala collections, instead of using rdd.join. Regards, Yang On Thu, Mar 26, 2015 at 5:51 PM, Kelvin Chu 2dot7kel...@gmail.com wrote: Hi, I used union() before and yes it may be slow sometimes. I _guess_ your variable 'data' is a Scala collection and compute() returns an RDD. Right? If yes, I tried the approach below to operate on one RDD only during the whole computation (Yes, I also saw that too many RDD hurt performance). Change compute() to return Scala collection instead of RDD. val result = sc.parallelize(data)// Create and partition the 0.5M items in a single RDD. .flatMap(compute(_)) // You still have only one RDD with each item joined with external data already Hope this help. Kelvin On Thu, Mar 26, 2015 at 2:37 PM, Yang Chen y...@yang-cs.com wrote: Hi Mark, That's true, but in neither way can I combine the RDDs, so I have to avoid unions. Thanks, Yang On Thu, Mar 26, 2015 at 5:31 PM, Mark Hamstra m...@clearstorydata.com wrote: RDD#union is not the same thing as SparkContext#union On Thu, Mar 26, 2015 at 2:27 PM, Yang Chen y...@yang-cs.com wrote: Hi Noorul, Thank you for your suggestion. I tried that, but ran out of memory. I did some search and found some suggestions that we should try to avoid rdd.union( http://stackoverflow.com/questions/28343181/memory-efficient-way-of-union-a-sequence-of-rdds-from-files-in-apache-spark ). I will try to come up with some other ways. Thank you, Yang On Thu, Mar 26, 2015 at 1:13 PM, Noorul Islam K M noo...@noorul.com wrote: sparkx y...@yang-cs.com writes: Hi, I have a Spark job and a dataset of 0.5 Million items. Each item performs some sort of computation (joining a shared external dataset, if that does matter) and produces an RDD containing 20-500 result items. Now I would like to combine all these RDDs and perform a next job. What I have found out is that the computation itself is quite fast, but combining these RDDs takes much longer time. val result = data// 0.5M data items .map(compute(_)) // Produces an RDD - fast .reduce(_ ++ _) // Combining RDDs - slow I have also tried to collect results from compute(_) and use a flatMap, but that is also slow. Is there a way to efficiently do this? I'm thinking about writing this result to HDFS and reading from disk for the next job, but am not sure if that's a preferred way in Spark. Are you looking for SparkContext.union() [1] ? This is not performing well with spark cassandra connector. I am not sure whether this will help you. Thanks and Regards Noorul [1] http://spark.apache.org/docs/1.3.0/api/scala/index.html#org.apache.spark.SparkContext -- Yang Chen Dept. of CISE, University of Florida Mail: y...@yang-cs.com Web: www.cise.ufl.edu/~yang -- Yang Chen Dept. of CISE, University of Florida Mail: y...@yang-cs.com Web: www.cise.ufl.edu/~yang -- Yang Chen Dept. of CISE, University of Florida Mail: y...@yang-cs.com Web: www.cise.ufl.edu/~yang
Re: Spark streaming
It is just a comma separated file, about 10 columns wide which we append with a unique id and a few additional values. On Fri, Mar 27, 2015 at 2:43 PM, Ted Yu yuzhih...@gmail.com wrote: jamborta : Please also describe the format of your csv files. Cheers On Fri, Mar 27, 2015 at 6:42 AM, DW @ Gmail deanwamp...@gmail.com wrote: Show us the code. This shouldn't happen for the simple process you described Sent from my rotary phone. On Mar 27, 2015, at 5:47 AM, jamborta jambo...@gmail.com wrote: Hi all, We have a workflow that pulls in data from csv files, then originally setup up of the workflow was to parse the data as it comes in (turn into array), then store it. This resulted in out of memory errors with larger files (as a result of increased GC?). It turns out if the data gets stored as a string first, then parsed, it issues does not occur. Why is that? Thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-tp22255.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Hive Table not from from Spark SQL
Upon reviewing your other thread, could you confirm that your Hive metastore that you can connect to via Hive is a MySQL database? And to also confirm, when you're running spark-shell and doing a show tables statement, you're getting the same error? On Fri, Mar 27, 2015 at 6:08 AM ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I tried the following 1) ./bin/spark-submit -v --master yarn-cluster --driver-class-path /home/dvasthimal/spark1.3/mysql-connector-java-5.1.34.jar:/apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/yarn/lib/guava-11.0.2.jar: *$SPARK_HOME/conf/hive-site.xml* --jars /home/dvasthimal/spark1.3/spark-avro_2.10-1.0.0.jar,/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-api-jdo-3.2.6.jar,/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-core-3.2.10.jar,/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-rdbms-3.2.9.jar --num-executors 1 --driver-memory 4g --driver-java-options -XX:MaxPermSize=2G --executor-memory 2g --executor-cores 1 --queue hdmi-express --class com.ebay.ep.poc.spark.reporting.SparkApp spark_reporting-1.0-SNAPSHOT.jar startDate=2015-02-16 endDate=2015-02-16 input=/user/dvasthimal/epdatasets/successdetail1/part-r-0.avro subcommand=successevents2 output=/user/dvasthimal/epdatasets/successdetail2 This throws dw_bid not found. Looks like Spark SQL is unable to read my existing Hive metastore and creates its own and hence complains that table is not found. 2) ./bin/spark-submit -v --master yarn-cluster --driver-class-path /home/dvasthimal/spark1.3/mysql-connector-java-5.1.34.jar:/apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/yarn/lib/guava-11.0.2.jar --jars /home/dvasthimal/spark1.3/spark-avro_2.10-1.0.0.jar,/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-api-jdo-3.2.6.jar,/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-core-3.2.10.jar,/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-rdbms-3.2.9.jar: *$SPARK_HOME/conf/hive-site.xml* --num-executors 1 --driver-memory 4g --driver-java-options -XX:MaxPermSize=2G --executor-memory 2g --executor-cores 1 --queue hdmi-express --class com.ebay.ep.poc.spark.reporting.SparkApp spark_reporting-1.0-SNAPSHOT.jar startDate=2015-02-16 endDate=2015-02-16 input=/user/dvasthimal/epdatasets/successdetail1/part-r-0.avro subcommand=successevents2 output=/user/dvasthimal/epdatasets/successdetail2 This time i do not get above error, however i get MySQL driver not found exception. Looks like this is even before its able to communicate to Hive. Caused by: org.datanucleus.exceptions.NucleusException: Attempt to invoke the BONECP plugin to create a ConnectionPool gave an error : The specified datastore driver (com.mysql.jdbc.Driver) was not found in the CLASSPATH. Please check your CLASSPATH specification, and the name of the driver. In both above cases, i do have hive-site.xml in Spark/conf folder. 3) ./bin/spark-submit -v --master yarn-cluster --driver-class-path /home/dvasthimal/spark1.3/mysql-connector-java-5.1.34.jar:/apache/hadoop/share/hadoop/common/hadoop-common-2.4.1-EBAY-2.jar:/apache/hadoop/lib/hadoop-lzo-0.6.0.jar:/apache/hadoop-2.4.1-2.1.3.0-2-EBAY/share/hadoop/yarn/lib/guava-11.0.2.jar --jars /home/dvasthimal/spark1.3/spark-avro_2.10-1.0.0.jar,/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-api-jdo-3.2.6.jar,/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-core-3.2.10.jar,/home/dvasthimal/spark1.3/spark-1.3.0-bin-hadoop2.4/lib/datanucleus-rdbms-3.2.9.jar--num-executors 1 --driver-memory 4g --driver-java-options -XX:MaxPermSize=2G --executor-memory 2g --executor-cores 1 --queue hdmi-express --class com.ebay.ep.poc.spark.reporting.SparkApp spark_reporting-1.0-SNAPSHOT.jar startDate=2015-02-16 endDate=2015-02-16 input=/user/dvasthimal/epdatasets/successdetail1/part-r-0.avro subcommand=successevents2 output=/user/dvasthimal/epdatasets/successdetail2 I do not specify hive-site.xml in --jars or --driver-class-path. Its present in spark/conf folder as per https://spark.apache.org/docs/1.3.0/sql-programming-guide.html#hive-tables . In this case i get same error as #1. dw_bid table not found. I want Spark SQL to know that there are tables in Hive and read that data. As per guide it looks like Spark SQL has that support. Please suggest. Regards, Deepak On Thu, Mar 26, 2015 at 9:01 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: Stack Trace: 15/03/26 08:25:42 INFO ql.Driver: OK 15/03/26 08:25:42 INFO log.PerfLogger: PERFLOG method=releaseLocks from=org.apache.hadoop.hive.ql.Driver 15/03/26 08:25:42 INFO log.PerfLogger: /PERFLOG method=releaseLocks start=1427383542966 end=1427383542966 duration=0
Python Example sql.py not working in version spark-1.3.0-bin-hadoop2.4
I downloaded spark version spark-1.3.0-bin-hadoop2.4. When the python version of sql.py is run the following error occurs: [root@nde-dev8-template python]# /root/spark-1.3.0-bin-hadoop2.4/bin/spark-submit sql.py Spark assembly has been built with Hive, including Datanucleus jars on classpath Traceback (most recent call last): File /root/spark-1.3.0-bin-hadoop2.4/examples/src/main/python/sql.py, line 22, in module from pyspark.sql import Row, StructField, StructType, StringType, IntegerType ImportError: cannot import name StructField -- The sql.py version, spark-1.2.1-bin-hadoop2.4, does not throw the error: [root@nde-dev8-template python]# /root/spark-1.2.1-bin-hadoop2.4/bin/spark-submit sql.py Spark assembly has been built with Hive, including Datanucleus jars on classpath 15/03/27 14:18:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/03/27 14:19:41 WARN ThreadLocalRandom: Failed to generate a seed from SecureRandom within 3 seconds. Not enough entrophy? root |-- age: integer (nullable = true) |-- name: string (nullable = true) root |-- person_name: string (nullable = false) |-- person_age: integer (nullable = false) root |-- age: integer (nullable = true) |-- name: string (nullable = true) Justin - The OS/JAVA environments are: OS: Linux nde-dev8-template 2.6.32-431.17.1.el6.x86_64 #1 SMP Fri Apr 11 17:27:00 EDT 2014 x86_64 x86_64 x86_64 GNU/Linux JAVA: [root@nde-dev8-template bin]# java -version java version 1.7.0_51 Java(TM) SE Runtime Environment (build 1.7.0_51-b13) Java HotSpot(TM) 64-Bit Server VM (build 24.51-b03, mixed mode) The same error occurs when using bin/pyspark shell. from pyspark.sql import StructField Traceback (most recent call last): File stdin, line 1, in module ImportError: cannot import name StructField --- Any advice for resolving? Thanks in advance. Peter -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Python-Example-sql-py-not-working-in-version-spark-1-3-0-bin-hadoop2-4-tp22261.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
spark streaming driver hang
I ran a spark streaming job. 100 executors 30G heap per executor 4 cores per executor The version I used is 1.3.0-cdh5.1.0. The job is reading from a directory on HDFS (with files incoming continuously) and does some join on the data. I set batch interval to be 15 minutes and the job worked fine in the first few batches. However, it just stalled after 7-8 batches. Below are some symptoms. * In Spark UI, every tab worked fine except Streaming tab. When I clicked on it, it just hang forever. * I did not see any GC activity on driver. * Nothing was printed out from driver log. Anyone has seen this before? -- Chen Song
Re: Single threaded laptop implementation beating a 128 node GraphX cluster on a 1TB data set (128 billion nodes) - What is a use case for GraphX then? when is it worth the cost?
Hallo, Well all problems you want to solve with technology need to have good justification for a certain technology. So the first thing is that you ask which technology fits to my current and future problems. This is also what the article says. Unfortunately, it does only provide a vague answer why there is this performance gap. Is it a Spark architecture issue? Is it a configuration issue? Is it a design issue of the spark version of the algorithms? Is it an amazon issue? Why did he use a laptop and not a single Amazon machine to compare? Why did he not run multiple threads on a single machine (for some problems single thread might be the fastest solution anyway)? Based on my experience a single machine can be already quiet useful for graph algorithms. There are also different graph systems all for different purposes. Spark Graphx is more general (can be used in combination with the whole Spark Plattform!) and probably less performant than highly specialed graph systems leveraging GPU etc. - These systems have the disadvantage that they are not generally suitable or integrated with other types of processing, such as streaming, mr, rdd, etc. I am always curios for any technology why and where do one looses performance. That's why one does proof-of-concepts and evaluates technology depending on the business case. Maybe the article is right, but it is unclear if it can be generalized or if it really has an impact of your business case for Spark/Graphx. His algorithms can only do graph processing for a very special case and are not suitable for a general all-purpose big data infrastructure. Best regards Le 27 mars 2015 19:33, Eran Medan ehrann.meh...@gmail.com a écrit : Remember that article that went viral on HN? (Where a guy showed how GraphX / Giraph / GraphLab / Spark have worse performance on a 128 cluster than on a 1 thread machine? if not here is the article - http://www.frankmcsherry.org/graph/scalability/cost/2015/01/15/COST.html) Well as you may recall, this stirred up a lot of commotion in the big data community (and Spark/GraphX in particular) People (justly I guess) blamed him for not really having “big data”, as all of his data set fits in memory, so it doesn't really count. So he took the challenge and came with a pretty hard to argue counter benchmark, now with a huge data set (1TB of data, encoded using Hilbert curves to 154GB, but still large). see at - http://www.frankmcsherry.org/graph/scalability/cost/2015/02/04/COST2.html He provided the source here https://github.com/frankmcsherry/COST as an example His benchmark shows how on a 128 billion edges graph, he got X2 to X10 faster results on a single threaded Rust based implementation So, what is the counter argument? it pretty much seems like a blow in the face of Spark / GraphX etc, (which I like and use on a daily basis) Before I dive into re-validating his benchmarks with my own use cases. What is your opinion on this? If this is the case, then what IS the use case for using Spark/GraphX at all?
Re: Strange JavaDeserialization error - java.lang.ClassNotFoundException: org/apache/spark/storage/StorageLevel
Yes, only when using fine grained mode and replication (StorageLevel.MEMORY_ONLY_2 etc). 2015-03-27 19:06 GMT+01:00 Tathagata Das t...@databricks.com: Does it fail with just Spark jobs (using storage levels) on non-coarse mode? TD On Fri, Mar 27, 2015 at 4:39 AM, Ondrej Smola ondrej.sm...@gmail.com wrote: More info when using *spark.mesos.coarse* everything works as expected. I think this must be a bug in spark-mesos integration. 2015-03-27 9:23 GMT+01:00 Ondrej Smola ondrej.sm...@gmail.com: It happens only when StorageLevel is used with 1 replica ( StorageLevel. MEMORY_ONLY_2,StorageLevel.MEMORY_AND_DISK_2) , StorageLevel.MEMORY_ONLY ,StorageLevel.MEMORY_AND_DISK works - the problems must be clearly somewhere between mesos-spark . From console I see that spark is trying to replicate to nodes - nodes show up in Mesos active tasks ... but they always fail with ClassNotFoundE. 2015-03-27 0:52 GMT+01:00 Tathagata Das t...@databricks.com: Could you try running a simpler spark streaming program with receiver (may be socketStream) and see if that works. TD On Thu, Mar 26, 2015 at 2:08 PM, Ondrej Smola ondrej.sm...@gmail.com wrote: Hi thanks for reply, yes I have custom receiver - but it has simple logic .. pop ids from redis queue - load docs based on ids from elastic and store them in spark. No classloader modifications. I am running multiple Spark batch jobs (with user supplied partitioning) and they have no problems, debug in local mode show no errors. 2015-03-26 21:47 GMT+01:00 Tathagata Das t...@databricks.com: Here are few steps to debug. 1. Try using replication from a Spark job: sc.parallelize(1 to 100, 100).persist(StorageLevel.MEMORY_ONLY_2).count() 2. If one works, then we know that there is probably nothing wrong with the Spark installation, and probably in the threads related to the receivers receiving the data. Are you writing a custom receiver? Are you somehow playing around with the class loader in the custom receiver? TD On Thu, Mar 26, 2015 at 10:59 AM, Ondrej Smola ondrej.sm...@gmail.com wrote: Hi, I am running spark streaming v 1.3.0 (running inside Docker) on Mesos 0.21.1. Spark streaming is started using Marathon - docker container gets deployed and starts streaming (from custom Actor). Spark binary is located on shared GlusterFS volume. Data is streamed from Elasticsearch/Redis. When new batch arrives Spark tries to replicate it but fails with following error : 15/03/26 14:50:00 INFO MemoryStore: Block broadcast_0 of size 2840 dropped from memory (free 278017782) 15/03/26 14:50:00 INFO BlockManager: Removing block broadcast_0_piece0 15/03/26 14:50:00 INFO MemoryStore: Block broadcast_0_piece0 of size 1658 dropped from memory (free 278019440) 15/03/26 14:50:00 INFO BlockManagerMaster: Updated info of block broadcast_0_piece0 15/03/26 14:50:00 ERROR TransportRequestHandler: Error while invoking RpcHandler#receive() on RPC id 7178767328921933569 java.lang.ClassNotFoundException: org/apache/spark/storage/StorageLevel at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:344) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:65) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:68) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:88) at org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:65) at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:124) at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:97) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:91) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44) at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at
Re: spark streaming driver hang
Do you have the logs of the driver? Does that give any exceptions? TD On Fri, Mar 27, 2015 at 12:24 PM, Chen Song chen.song...@gmail.com wrote: I ran a spark streaming job. 100 executors 30G heap per executor 4 cores per executor The version I used is 1.3.0-cdh5.1.0. The job is reading from a directory on HDFS (with files incoming continuously) and does some join on the data. I set batch interval to be 15 minutes and the job worked fine in the first few batches. However, it just stalled after 7-8 batches. Below are some symptoms. * In Spark UI, every tab worked fine except Streaming tab. When I clicked on it, it just hang forever. * I did not see any GC activity on driver. * Nothing was printed out from driver log. Anyone has seen this before? -- Chen Song
Re: Can spark sql read existing tables created in hive
Are you running on yarn? - If you are running in yarn-client mode, set HADOOP_CONF_DIR to /etc/hive/conf/ (or the directory where your hive-site.xml is located). - If you are running in yarn-cluster mode, the easiest thing to do is to add--files=/etc/hive/conf/hive-site.xml (or the path for your hive-site.xml) to your spark-submit script. On Fri, Mar 27, 2015 at 5:42 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I can recreate tables but what about data. It looks like this is a obvious feature that Spark SQL must be having. People will want to transform tons of data stored in HDFS through Hive from Spark SQL. Spark programming guide suggests its possible. Spark SQL also supports reading and writing data stored in Apache Hive http://hive.apache.org/. Configuration of Hive is done by placing your hive-site.xml file in conf/. https://spark.apache.org/docs/1.3.0/sql-programming-guide.html#hive-tables For some reason its not working. On Fri, Mar 27, 2015 at 3:35 PM, Arush Kharbanda ar...@sigmoidanalytics.com wrote: Seems Spark SQL accesses some more columns apart from those created by hive. You can always recreate the tables, you would need to execute the table creation scripts but it would be good to avoid recreation. On Fri, Mar 27, 2015 at 3:20 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I did copy hive-conf.xml form Hive installation into spark-home/conf. IT does have all the meta store connection details, host, username, passwd, driver and others. Snippet == configuration property namejavax.jdo.option.ConnectionURL/name valuejdbc:mysql://host.vip.company.com:3306/HDB/value /property property namejavax.jdo.option.ConnectionDriverName/name valuecom.mysql.jdbc.Driver/value descriptionDriver class name for a JDBC metastore/description /property property namejavax.jdo.option.ConnectionUserName/name valuehiveuser/value descriptionusername to use against metastore database/description /property property namejavax.jdo.option.ConnectionPassword/name valuesome-password/value descriptionpassword to use against metastore database/description /property property namehive.metastore.local/name valuefalse/value descriptioncontrols whether to connect to remove metastore server or open a new metastore server in Hive Client JVM/description /property property namehive.metastore.warehouse.dir/name value/user/hive/warehouse/value descriptionlocation of default database for the warehouse/description /property .. When i attempt to read hive table, it does not work. dw_bid does not exists. I am sure there is a way to read tables stored in HDFS (Hive) from Spark SQL. Otherwise how would anyone do analytics since the source tables are always either persisted directly on HDFS or through Hive. On Fri, Mar 27, 2015 at 1:15 PM, Arush Kharbanda ar...@sigmoidanalytics.com wrote: Since hive and spark SQL internally use HDFS and Hive metastore. The only thing you want to change is the processing engine. You can try to bring your hive-site.xml to %SPARK_HOME%/conf/hive-site.xml.(Ensure that the hive site xml captures the metastore connection details). Its a hack, i havnt tried it. I have played around with the metastore and it should work. On Fri, Mar 27, 2015 at 12:04 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I have few tables that are created in Hive. I wan to transform data stored in these Hive tables using Spark SQL. Is this even possible ? So far i have seen that i can create new tables using Spark SQL dialect. However when i run show tables or do desc hive_table it says table not found. I am now wondering is this support present or not in Spark SQL ? -- Deepak -- [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com *Arush Kharbanda* || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com -- Deepak -- [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com *Arush Kharbanda* || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com -- Deepak
RDD resiliency -- does it keep state?
Hi Spark group, We haven't been able to find clear descriptions of how Spark handles the resiliency of RDDs in relationship to executing actions with side-effects. If you do an `rdd.foreach(someSideEffect)`, then you are doing a side-effect for each element in the RDD. If a partition goes down -- the resiliency rebuilds the data, but did it keep track of how far it go in the partition's set of data or will it start from the beginning again. So will it do at-least-once execution of foreach closures or at-most-once? thanks, Michal
Re: RDD resiliency -- does it keep state?
If you invoke this, you will get at-least-once semantics on failure. For instance, if a machine dies in the middle of executing the foreach for a single partition, that will be re-executed on another machine. It could even fully complete on one machine, but the machine dies immediately before reporting the result back to the driver. This means you need to make sure the side-effects are idempotent, or use some transactional locking. Spark's own output operations, such as saving to Hadoop, use such mechanisms. For instance, in the case of Hadoop it uses the OutputCommitter classes. - Patrick On Fri, Mar 27, 2015 at 12:36 PM, Michal Klos michal.klo...@gmail.com wrote: Hi Spark group, We haven't been able to find clear descriptions of how Spark handles the resiliency of RDDs in relationship to executing actions with side-effects. If you do an `rdd.foreach(someSideEffect)`, then you are doing a side-effect for each element in the RDD. If a partition goes down -- the resiliency rebuilds the data, but did it keep track of how far it go in the partition's set of data or will it start from the beginning again. So will it do at-least-once execution of foreach closures or at-most-once? thanks, Michal - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: k-means can only run on one executor with one thread?
Can you try specifying the number of partitions when you load the data to equal the number of executors? If your ETL changes the number of partitions, you can also repartition before calling KMeans. On Thu, Mar 26, 2015 at 8:04 PM, Xi Shen davidshe...@gmail.com wrote: Hi, I have a large data set, and I expects to get 5000 clusters. I load the raw data, convert them into DenseVector; then I did repartition and cache; finally I give the RDD[Vector] to KMeans.train(). Now the job is running, and data are loaded. But according to the Spark UI, all data are loaded onto one executor. I checked that executor, and its CPU workload is very low. I think it is using only 1 of the 8 cores. And all other 3 executors are at rest. Did I miss something? Is it possible to distribute the workload to all 4 executors? Thanks, David
Re: Can't access file in spark, but can in hadoop
Yes, I could recompile the hdfs client with more logging, but I don’t have the day or two to spare right this week. One more thing about this, the cluster is Horton Works 2.1.3 [.0] They seem to have a claim of supporting spark on Horton Works 2.2 Dale. From: Ted Yu yuzhih...@gmail.commailto:yuzhih...@gmail.com Date: Thursday, March 26, 2015 at 4:54 PM To: Johnson, Dale daljohn...@ebay.commailto:daljohn...@ebay.com Cc: user user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Can't access file in spark, but can in hadoop Looks like the following assertion failed: Preconditions.checkState(storageIDsCount == locs.size()); locs is ListDatanodeInfoProto Can you enhance the assertion to log more information ? Cheers On Thu, Mar 26, 2015 at 3:06 PM, Dale Johnson daljohn...@ebay.commailto:daljohn...@ebay.com wrote: There seems to be a special kind of corrupted according to Spark state of file in HDFS. I have isolated a set of files (maybe 1% of all files I need to work with) which are producing the following stack dump when I try to sc.textFile() open them. When I try to open directories, most large directories contain at least one file of this type. Curiously, the following two lines fail inside of a Spark job, but not inside of a Scoobi job: val conf = new org.apache.hadoop.conf.Configuration val fs = org.apache.hadoop.fs.FileSystem.get(conf) The stack trace follows: 15/03/26 14:22:43 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: null) Exception in thread Driver java.lang.IllegalStateException at org.spark-project.guava.common.base.Preconditions.checkState(Preconditions.java:133) at org.apache.hadoop.hdfs.protocolPB.PBHelper.convert(PBHelper.java:673) at org.apache.hadoop.hdfs.protocolPB.PBHelper.convertLocatedBlock(PBHelper.java:1100) at org.apache.hadoop.hdfs.protocolPB.PBHelper.convert(PBHelper.java:1118) at org.apache.hadoop.hdfs.protocolPB.PBHelper.convert(PBHelper.java:1251) at org.apache.hadoop.hdfs.protocolPB.PBHelper.convert(PBHelper.java:1354) at org.apache.hadoop.hdfs.protocolPB.PBHelper.convert(PBHelper.java:1363) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getListing(ClientNamenodeProtocolTranslatorPB.java:518) at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:1743) at org.apache.hadoop.hdfs.DistributedFileSystem$15.init(DistributedFileSystem.java:738) at org.apache.hadoop.hdfs.DistributedFileSystem.listLocatedStatus(DistributedFileSystem.java:727) at org.apache.hadoop.fs.FileSystem.listLocatedStatus(FileSystem.java:1662) at org.apache.hadoop.fs.FileSystem$5.init(FileSystem.java:1724) at org.apache.hadoop.fs.FileSystem.listFiles(FileSystem.java:1721) at com.ebay.ss.niffler.miner.speller.SpellQueryLaunch$$anonfun$main$2.apply(SpellQuery.scala:1125) at com.ebay.ss.niffler.miner.speller.SpellQueryLaunch$$anonfun$main$2.apply(SpellQuery.scala:1123) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at com.ebay.ss.niffler.miner.speller.SpellQueryLaunch$.main(SpellQuery.scala:1123) at com.ebay.ss.niffler.miner.speller.SpellQueryLaunch.main(SpellQuery.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:427) 15/03/26 14:22:43 INFO yarn.ApplicationMaster: Invoking sc stop from shutdown hook It appears to have found the three copies of the given HDFS block, but is performing some sort of validation with them before giving them back to spark to schedule the job. But there is an assert failing. I've tried this with 1.2.0, 1.2.1 and 1.3.0, and I get the exact same error, but I've seen the line numbers change on the HDFS libraries, but not the function names. I've tried recompiling myself with different hadoop versions, and it's the same. We're running hadoop 2.4.1 on our cluster. A google search turns up absolutely nothing on this. Any insight at all would be appreciated. Dale Johnson Applied Researcher eBay.com -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Can-t-access-file-in-spark-but-can-in-hadoop-tp22251.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org For additional commands, e-mail:
Re: Using ORC input for mllib algorithms
This is a PR in review to support ORC via the SQL data source API: https://github.com/apache/spark/pull/3753. You can try pulling that PR and help test it. -Xiangrui On Wed, Mar 25, 2015 at 5:03 AM, Zsolt Tóth toth.zsolt@gmail.com wrote: Hi, I use sc.hadoopFile(directory, OrcInputFormat.class, NullWritable.class, OrcStruct.class) to use data in ORC format as an RDD. I made some benchmarking on ORC input vs Text input for MLlib and I ran into a few issues with ORC. Setup: yarn-cluster mode, 11 executors, 4 cores, 9g executor memory, 2g executor memoryOverhead, 1g driver memory. The cluster nodes have sufficient resources for the setup. Logistic regression: When using 1GB ORC input (stored in 4 blocks on hdfs), only one block (25%) is cached and only one executor is used, however the whole rdd could be cached even as Textfile (that's around 5.5GB). Is it possible to make Spark use the available resources? Decision tree: Using 8GB ORC input, the job fails every time with the Size exceeds INTEGER.MAX_VALUE error. Plus, I see errors from the JVM in the logs that container is running beyond physical memory limits. Is it possible to avoid this when using ORC input format? Tried to set the min.split.size/max.split.size or dfs.blocksize but that didn't help. Again, none of these happen when using Text input. Cheers, Zsolt - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark ML Pipeline inaccessible types
Hi Martin, Could you attach the code snippet and the stack trace? The default implementation of some methods uses reflection, which may be the cause. Best, Xiangrui On Wed, Mar 25, 2015 at 3:18 PM, zapletal-mar...@email.cz wrote: Thanks Peter, I ended up doing something similar. I however consider both the approaches you mentioned bad practices which is why I was looking for a solution directly supported by the current code. I can work with that now, but it does not seem to be the proper solution. Regards, Martin -- Původní zpráva -- Od: Peter Rudenko petro.rude...@gmail.com Komu: zapletal-mar...@email.cz, Sean Owen so...@cloudera.com Datum: 25. 3. 2015 13:28:38 Předmět: Re: Spark ML Pipeline inaccessible types Hi Martin, here’s 2 possibilities to overcome this: 1) Put your logic into org.apache.spark package in your project - then everything would be accessible. 2) Dirty trick: object SparkVector extends HashingTF { val VectorUDT: DataType = outputDataType } then you can do like this: StructType(vectorTypeColumn, SparkVector.VectorUDT, false)) Thanks, Peter Rudenko On 2015-03-25 13:14, zapletal-mar...@email.cz wrote: Sean, thanks for your response. I am familiar with NoSuchMethodException in general, but I think it is not the case this time. The code actually attempts to get parameter by name using val m = this.getClass.getMethodName(paramName). This may be a bug, but it is only a side effect caused by the real problem I am facing. My issue is that VectorUDT is not accessible by user code and therefore it is not possible to use custom ML pipeline with the existing Predictors (see the last two paragraphs in my first email). Best Regards, Martin -- Původní zpráva -- Od: Sean Owen so...@cloudera.com Komu: zapletal-mar...@email.cz Datum: 25. 3. 2015 11:05:54 Předmět: Re: Spark ML Pipeline inaccessible types NoSuchMethodError in general means that your runtime and compile-time environments are different. I think you need to first make sure you don't have mismatching versions of Spark. On Wed, Mar 25, 2015 at 11:00 AM, zapletal-mar...@email.cz wrote: Hi, I have started implementing a machine learning pipeline using Spark 1.3.0 and the new pipelining API and DataFrames. I got to a point where I have my training data set prepared using a sequence of Transformers, but I am struggling to actually train a model and use it for predictions. I am getting a java.lang.NoSuchMethodException: org.apache.spark.ml.regression.LinearRegression.myFeaturesColumnName() exception thrown at checkInputColumn method in Params trait when using a Predictor (LinearRegression in my case, but that should not matter). This looks like a bug - the exception is thrown when executing getParam(colName) when the require(actualDataType.equals(datatype), ...) requirement is not met so the expected requirement failed exception is not thrown and is hidden by the unexpected NoSuchMethodException instead. I can raise a bug if this really is an issue and I am not using something incorrectly. The problem I am facing however is that the Predictor expects features to have VectorUDT type as defined in Predictor class (protected def featuresDataType: DataType = new VectorUDT). But since this type is private[spark] my Transformer can not prepare features with this type which then correctly results in the exception above when I use a different type. Is there a way to define a custom Pipeline that would be able to use the existing Predictors without having to bypass the access modifiers or reimplement something or is the pipelining API not yet expected to be used in this way? Thanks, Martin - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Implicit matrix factorization returning different results between spark 1.2.0 and 1.3.0
This sounds like a bug ... Did you try a different lambda? It would be great if you can share your dataset or re-produce this issue on the public dataset. Thanks! -Xiangrui On Thu, Mar 26, 2015 at 7:56 AM, Ravi Mody rmody...@gmail.com wrote: After upgrading to 1.3.0, ALS.trainImplicit() has been returning vastly smaller factors (and hence scores). For example, the first few product's factor values in 1.2.0 are (0.04821, -0.00674, -0.0325). In 1.3.0, the first few factor values are (2.535456E-8, 1.690301E-8, 6.99245E-8). This difference of several orders of magnitude is consistent throughout both user and product. The recommendations from 1.2.0 are subjectively much better than in 1.3.0. 1.3.0 trains significantly faster than 1.2.0, and uses less memory. My first thought is that there is too much regularization in the 1.3.0 results, but I'm using the same lambda parameter value. This is a snippet of my scala code: . val rank = 75 val numIterations = 15 val alpha = 10 val lambda = 0.01 val model = ALS.trainImplicit(train_data, rank, numIterations, lambda=lambda, alpha=alpha) . The code and input data are identical across both versions. Did anything change between the two versions I'm not aware of? I'd appreciate any help! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Single threaded laptop implementation beating a 128 node GraphX cluster on a 1TB data set (128 billion nodes) - What is a use case for GraphX then? when is it worth the cost?
Remember that article that went viral on HN? (Where a guy showed how GraphX / Giraph / GraphLab / Spark have worse performance on a 128 cluster than on a 1 thread machine? if not here is the article - http://www.frankmcsherry.org/graph/scalability/cost/2015/01/15/COST.html) Well as you may recall, this stirred up a lot of commotion in the big data community (and Spark/GraphX in particular) People (justly I guess) blamed him for not really having “big data”, as all of his data set fits in memory, so it doesn't really count. So he took the challenge and came with a pretty hard to argue counter benchmark, now with a huge data set (1TB of data, encoded using Hilbert curves to 154GB, but still large). see at - http://www.frankmcsherry.org/graph/scalability/cost/2015/02/04/COST2.html He provided the source here https://github.com/frankmcsherry/COST as an example His benchmark shows how on a 128 billion edges graph, he got X2 to X10 faster results on a single threaded Rust based implementation So, what is the counter argument? it pretty much seems like a blow in the face of Spark / GraphX etc, (which I like and use on a daily basis) Before I dive into re-validating his benchmarks with my own use cases. What is your opinion on this? If this is the case, then what IS the use case for using Spark/GraphX at all?
Spark 1.3 Source - Github and source tar does not seem to match
While looking into a issue, I noticed that the source displayed on Github site does not matches the downloaded tar for 1.3 Thoughts ?
Re: Spark 1.3 Source - Github and source tar does not seem to match
The source code should match the Spark commit 4aaf48d46d13129f0f9bdafd771dd80fe568a7dc. Do you see any differences? On Fri, Mar 27, 2015 at 11:28 AM, Manoj Samel manojsamelt...@gmail.com wrote: While looking into a issue, I noticed that the source displayed on Github site does not matches the downloaded tar for 1.3 Thoughts ? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Strange JavaDeserialization error - java.lang.ClassNotFoundException: org/apache/spark/storage/StorageLevel
Does it fail with just Spark jobs (using storage levels) on non-coarse mode? TD On Fri, Mar 27, 2015 at 4:39 AM, Ondrej Smola ondrej.sm...@gmail.com wrote: More info when using *spark.mesos.coarse* everything works as expected. I think this must be a bug in spark-mesos integration. 2015-03-27 9:23 GMT+01:00 Ondrej Smola ondrej.sm...@gmail.com: It happens only when StorageLevel is used with 1 replica ( StorageLevel. MEMORY_ONLY_2,StorageLevel.MEMORY_AND_DISK_2) , StorageLevel.MEMORY_ONLY ,StorageLevel.MEMORY_AND_DISK works - the problems must be clearly somewhere between mesos-spark . From console I see that spark is trying to replicate to nodes - nodes show up in Mesos active tasks ... but they always fail with ClassNotFoundE. 2015-03-27 0:52 GMT+01:00 Tathagata Das t...@databricks.com: Could you try running a simpler spark streaming program with receiver (may be socketStream) and see if that works. TD On Thu, Mar 26, 2015 at 2:08 PM, Ondrej Smola ondrej.sm...@gmail.com wrote: Hi thanks for reply, yes I have custom receiver - but it has simple logic .. pop ids from redis queue - load docs based on ids from elastic and store them in spark. No classloader modifications. I am running multiple Spark batch jobs (with user supplied partitioning) and they have no problems, debug in local mode show no errors. 2015-03-26 21:47 GMT+01:00 Tathagata Das t...@databricks.com: Here are few steps to debug. 1. Try using replication from a Spark job: sc.parallelize(1 to 100, 100).persist(StorageLevel.MEMORY_ONLY_2).count() 2. If one works, then we know that there is probably nothing wrong with the Spark installation, and probably in the threads related to the receivers receiving the data. Are you writing a custom receiver? Are you somehow playing around with the class loader in the custom receiver? TD On Thu, Mar 26, 2015 at 10:59 AM, Ondrej Smola ondrej.sm...@gmail.com wrote: Hi, I am running spark streaming v 1.3.0 (running inside Docker) on Mesos 0.21.1. Spark streaming is started using Marathon - docker container gets deployed and starts streaming (from custom Actor). Spark binary is located on shared GlusterFS volume. Data is streamed from Elasticsearch/Redis. When new batch arrives Spark tries to replicate it but fails with following error : 15/03/26 14:50:00 INFO MemoryStore: Block broadcast_0 of size 2840 dropped from memory (free 278017782) 15/03/26 14:50:00 INFO BlockManager: Removing block broadcast_0_piece0 15/03/26 14:50:00 INFO MemoryStore: Block broadcast_0_piece0 of size 1658 dropped from memory (free 278019440) 15/03/26 14:50:00 INFO BlockManagerMaster: Updated info of block broadcast_0_piece0 15/03/26 14:50:00 ERROR TransportRequestHandler: Error while invoking RpcHandler#receive() on RPC id 7178767328921933569 java.lang.ClassNotFoundException: org/apache/spark/storage/StorageLevel at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:344) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:65) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:68) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:88) at org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:65) at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:124) at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:97) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:91) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44) at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163) at
Re: Spark ML Pipeline inaccessible types
Hi Martin, In the short term: Would you be able to work with a different type other than Vector? If so, then you can override the *Predictor* class's *protected def featuresDataType: DataType* with a DataFrame type which fits your purpose. If you need Vector, then you might have to do a hack like Peter suggested. In the long term: VectorUDT should indeed be made public, but that will have to wait until the next release. Thanks for the feedback, Joseph On Fri, Mar 27, 2015 at 11:12 AM, Xiangrui Meng men...@gmail.com wrote: Hi Martin, Could you attach the code snippet and the stack trace? The default implementation of some methods uses reflection, which may be the cause. Best, Xiangrui On Wed, Mar 25, 2015 at 3:18 PM, zapletal-mar...@email.cz wrote: Thanks Peter, I ended up doing something similar. I however consider both the approaches you mentioned bad practices which is why I was looking for a solution directly supported by the current code. I can work with that now, but it does not seem to be the proper solution. Regards, Martin -- Původní zpráva -- Od: Peter Rudenko petro.rude...@gmail.com Komu: zapletal-mar...@email.cz, Sean Owen so...@cloudera.com Datum: 25. 3. 2015 13:28:38 Předmět: Re: Spark ML Pipeline inaccessible types Hi Martin, here’s 2 possibilities to overcome this: 1) Put your logic into org.apache.spark package in your project - then everything would be accessible. 2) Dirty trick: object SparkVector extends HashingTF { val VectorUDT: DataType = outputDataType } then you can do like this: StructType(vectorTypeColumn, SparkVector.VectorUDT, false)) Thanks, Peter Rudenko On 2015-03-25 13:14, zapletal-mar...@email.cz wrote: Sean, thanks for your response. I am familiar with NoSuchMethodException in general, but I think it is not the case this time. The code actually attempts to get parameter by name using val m = this.getClass.getMethodName(paramName). This may be a bug, but it is only a side effect caused by the real problem I am facing. My issue is that VectorUDT is not accessible by user code and therefore it is not possible to use custom ML pipeline with the existing Predictors (see the last two paragraphs in my first email). Best Regards, Martin -- Původní zpráva -- Od: Sean Owen so...@cloudera.com Komu: zapletal-mar...@email.cz Datum: 25. 3. 2015 11:05:54 Předmět: Re: Spark ML Pipeline inaccessible types NoSuchMethodError in general means that your runtime and compile-time environments are different. I think you need to first make sure you don't have mismatching versions of Spark. On Wed, Mar 25, 2015 at 11:00 AM, zapletal-mar...@email.cz wrote: Hi, I have started implementing a machine learning pipeline using Spark 1.3.0 and the new pipelining API and DataFrames. I got to a point where I have my training data set prepared using a sequence of Transformers, but I am struggling to actually train a model and use it for predictions. I am getting a java.lang.NoSuchMethodException: org.apache.spark.ml.regression.LinearRegression.myFeaturesColumnName() exception thrown at checkInputColumn method in Params trait when using a Predictor (LinearRegression in my case, but that should not matter). This looks like a bug - the exception is thrown when executing getParam(colName) when the require(actualDataType.equals(datatype), ...) requirement is not met so the expected requirement failed exception is not thrown and is hidden by the unexpected NoSuchMethodException instead. I can raise a bug if this really is an issue and I am not using something incorrectly. The problem I am facing however is that the Predictor expects features to have VectorUDT type as defined in Predictor class (protected def featuresDataType: DataType = new VectorUDT). But since this type is private[spark] my Transformer can not prepare features with this type which then correctly results in the exception above when I use a different type. Is there a way to define a custom Pipeline that would be able to use the existing Predictors without having to bypass the access modifiers or reimplement something or is the pipelining API not yet expected to be used in this way? Thanks, Martin - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
[spark-sql] What is the right way to represent an “Any” type in Spark SQL?
Hi everyone, I had a lot of questions today, sorry if I'm spamming the list, but I thought it's better than posting all questions in one thread. Let me know if I should throttle my posts ;) Here is my question: When I try to have a case class that has Any in it (e.g. I have a property map and values can be either String, Int or Boolean, and since we don't have union types, Any is the closest thing) When I try to register such an RDD as a table in 1.2.1 (or convert to DataFrame in 1.3 and then register as a table) I get this weird exception: Exception in thread main scala.MatchError: Any (of class scala.reflect.internal.Types$ClassNoArgsTypeRef) at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:112) Which from my interpretaion simply means that Any is not a valid type that Spark SQL can support in it's schema I already sent a pull request https://github.com/apache/spark/pull/5235 to solve the cryptic exception but my question is - *is there a way to support an Any type in Spark SQL?* disclaimer - also posted at http://stackoverflow.com/questions/29310405/what-is-the-right-way-to-represent-an-any-type-in-spark-sql
Understanding Spark Memory distribution
Hi All, I am running a spark cluster on EC2 instances of type: m3.2xlarge. I have given 26gb of memory with all 8 cores to my executors. I can see that in the logs too: *15/03/27 21:31:06 INFO AppClient$ClientActor: Executor added: app-20150327213106-/0 on worker-20150327212934-10.x.y.z-40128 (10.x.y.z:40128) with 8 cores* I am not caching any RDD so I have set spark.storage.memoryFraction to 0.2. I can see on SparkUI under executors tab Memory used is 0.0/4.5 GB. I am now confused with these logs? *15/03/27 21:31:08 INFO BlockManagerMasterActor: Registering block manager 10.77.100.196:58407 http://10.77.100.196:58407 with 4.5 GB RAM, BlockManagerId(4, 10.x.y.z, 58407)* I am broadcasting a large object of 3 gb and after that when I am creating an RDD, I see logs which show this 4.5 GB memory getting full and then I get OOM. How can I make block manager use more memory? Is there any other fine tuning I need to do for broadcasting large objects? And does broadcast variable use cache memory or rest of the heap? Thanks Ankur
Re: Strange JavaDeserialization error - java.lang.ClassNotFoundException: org/apache/spark/storage/StorageLevel
Seems like a bug, could you file a JIRA? @Tim: Patrick said you take a look at Mesos related issues. Could you take a look at this. Thanks! TD On Fri, Mar 27, 2015 at 1:25 PM, Ondrej Smola ondrej.sm...@gmail.com wrote: Yes, only when using fine grained mode and replication (StorageLevel.MEMORY_ONLY_2 etc). 2015-03-27 19:06 GMT+01:00 Tathagata Das t...@databricks.com: Does it fail with just Spark jobs (using storage levels) on non-coarse mode? TD On Fri, Mar 27, 2015 at 4:39 AM, Ondrej Smola ondrej.sm...@gmail.com wrote: More info when using *spark.mesos.coarse* everything works as expected. I think this must be a bug in spark-mesos integration. 2015-03-27 9:23 GMT+01:00 Ondrej Smola ondrej.sm...@gmail.com: It happens only when StorageLevel is used with 1 replica ( StorageLevel.MEMORY_ONLY_2,StorageLevel.MEMORY_AND_DISK_2) , StorageLevel.MEMORY_ONLY ,StorageLevel.MEMORY_AND_DISK works - the problems must be clearly somewhere between mesos-spark . From console I see that spark is trying to replicate to nodes - nodes show up in Mesos active tasks ... but they always fail with ClassNotFoundE. 2015-03-27 0:52 GMT+01:00 Tathagata Das t...@databricks.com: Could you try running a simpler spark streaming program with receiver (may be socketStream) and see if that works. TD On Thu, Mar 26, 2015 at 2:08 PM, Ondrej Smola ondrej.sm...@gmail.com wrote: Hi thanks for reply, yes I have custom receiver - but it has simple logic .. pop ids from redis queue - load docs based on ids from elastic and store them in spark. No classloader modifications. I am running multiple Spark batch jobs (with user supplied partitioning) and they have no problems, debug in local mode show no errors. 2015-03-26 21:47 GMT+01:00 Tathagata Das t...@databricks.com: Here are few steps to debug. 1. Try using replication from a Spark job: sc.parallelize(1 to 100, 100).persist(StorageLevel.MEMORY_ONLY_2).count() 2. If one works, then we know that there is probably nothing wrong with the Spark installation, and probably in the threads related to the receivers receiving the data. Are you writing a custom receiver? Are you somehow playing around with the class loader in the custom receiver? TD On Thu, Mar 26, 2015 at 10:59 AM, Ondrej Smola ondrej.sm...@gmail.com wrote: Hi, I am running spark streaming v 1.3.0 (running inside Docker) on Mesos 0.21.1. Spark streaming is started using Marathon - docker container gets deployed and starts streaming (from custom Actor). Spark binary is located on shared GlusterFS volume. Data is streamed from Elasticsearch/Redis. When new batch arrives Spark tries to replicate it but fails with following error : 15/03/26 14:50:00 INFO MemoryStore: Block broadcast_0 of size 2840 dropped from memory (free 278017782) 15/03/26 14:50:00 INFO BlockManager: Removing block broadcast_0_piece0 15/03/26 14:50:00 INFO MemoryStore: Block broadcast_0_piece0 of size 1658 dropped from memory (free 278019440) 15/03/26 14:50:00 INFO BlockManagerMaster: Updated info of block broadcast_0_piece0 15/03/26 14:50:00 ERROR TransportRequestHandler: Error while invoking RpcHandler#receive() on RPC id 7178767328921933569 java.lang.ClassNotFoundException: org/apache/spark/storage/StorageLevel at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:344) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:65) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:68) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:88) at org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:65) at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:124) at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:97) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:91) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44) at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) at
2 input paths generate 3 partitions
Hello, I am using the Spark shell in Scala on the localhost. I am using sc.textFile to read a directory. The directory looks like this (generated by another Spark script): part-0 part-1 _SUCCESS The part-0 has four short lines of text while part-1 has two short lines of text. The _SUCCESS file is empty. When I check the number of partitions on the RDD I get: scala foo.partitions.length 15/03/27 14:57:31 INFO FileInputFormat: Total input paths to process : 2 res68: Int = 3 I wonder why do the two input files generate three partitions. Does Spark check the number of lines in each file and try to generate three balanced partitions? Thanks! Rares
Re: HQL function Rollup and Cube
Yes, it works for me. Make sure the Spark machine can access the hive machine. On Thu, Mar 26, 2015 at 6:55 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: Did you manage to connect to Hive metastore from Spark SQL. I copied hive conf file into Spark conf folder but when i run show tables, or do select * from dw_bid (dw_bid is stored in Hive) it says table not found. On Thu, Mar 26, 2015 at 11:43 PM, Chang Lim chang...@gmail.com wrote: Solved. In IDE, project settings was missing the dependent lib jars (jar files under spark-xx/lib). When theses jar is not set, I got class not found error about datanucleus classes (compared to an out of memory error in Spark Shell). In the context of Spark Shell, these dependent jars needs to be passed in at the spark-shell command line. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/HQL-function-Rollup-and-Cube-tp22241p22246.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Deepak
Re: How to specify the port for AM Actor ...
I looked @ the 1.3.0 code and figured where this can be added In org.apache.spark.deploy.yarn ApplicationMaster.scala:282 is actorSystem = AkkaUtils.createActorSystem(sparkYarnAM, Utils.localHostName, 0, conf = sparkConf, securityManager = securityMgr)._1 If I change it to below, then I can start it on the port I want. val port = sparkConf.getInt(spark.am.actor.port, 0) // New property ... actorSystem = AkkaUtils.createActorSystem(sparkYarnAM, Utils.localHostName, port, conf = sparkConf, securityManager = securityMgr)._1 Thoughts? Any other place where any change is needed? On Wed, Mar 25, 2015 at 4:44 PM, Shixiong Zhu zsxw...@gmail.com wrote: There is no configuration for it now. Best Regards, Shixiong Zhu 2015-03-26 7:13 GMT+08:00 Manoj Samel manojsamelt...@gmail.com: There may be firewall rules limiting the ports between host running spark and the hadoop cluster. In that case, not all ports are allowed. Can it be a range of ports that can be specified ? On Wed, Mar 25, 2015 at 4:06 PM, Shixiong Zhu zsxw...@gmail.com wrote: It's a random port to avoid port conflicts, since multiple AMs can run in the same machine. Why do you need a fixed port? Best Regards, Shixiong Zhu 2015-03-26 6:49 GMT+08:00 Manoj Samel manojsamelt...@gmail.com: Spark 1.3, Hadoop 2.5, Kerbeors When running spark-shell in yarn client mode, it shows following message with a random port every time (44071 in example below). Is there a way to specify that port to a specific port ? It does not seem to be part of ports specified in http://spark.apache.org/docs/latest/configuration.html spark.xxx.port ... Thanks, 15/03/25 22:27:10 INFO Client: Application report for application_1427316153428_0014 (state: ACCEPTED) 15/03/25 22:27:10 INFO YarnClientSchedulerBackend: ApplicationMaster registered as Actor[akka.tcp://sparkYarnAM@xyz :44071/user/YarnAM#-1989273896]
Re: 2 input paths generate 3 partitions
Hi Rares, The number of partition is controlled by HDFS input format, and one file may have multiple partitions if it consists of multiple block. In you case, I think there is one file with 2 splits. Thanks. Zhan Zhang On Mar 27, 2015, at 3:12 PM, Rares Vernica rvern...@gmail.commailto:rvern...@gmail.com wrote: Hello, I am using the Spark shell in Scala on the localhost. I am using sc.textFile to read a directory. The directory looks like this (generated by another Spark script): part-0 part-1 _SUCCESS The part-0 has four short lines of text while part-1 has two short lines of text. The _SUCCESS file is empty. When I check the number of partitions on the RDD I get: scala foo.partitions.length 15/03/27 14:57:31 INFO FileInputFormat: Total input paths to process : 2 res68: Int = 3 I wonder why do the two input files generate three partitions. Does Spark check the number of lines in each file and try to generate three balanced partitions? Thanks! Rares
Re: Can't access file in spark, but can in hadoop
Probably guava version conflicts issue. What spark version did you use, and which hadoop version it compile against? Thanks. Zhan Zhang On Mar 27, 2015, at 12:13 PM, Johnson, Dale daljohn...@ebay.commailto:daljohn...@ebay.com wrote: Yes, I could recompile the hdfs client with more logging, but I don’t have the day or two to spare right this week. One more thing about this, the cluster is Horton Works 2.1.3 [.0] They seem to have a claim of supporting spark on Horton Works 2.2 Dale. From: Ted Yu yuzhih...@gmail.commailto:yuzhih...@gmail.com Date: Thursday, March 26, 2015 at 4:54 PM To: Johnson, Dale daljohn...@ebay.commailto:daljohn...@ebay.com Cc: user user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Can't access file in spark, but can in hadoop Looks like the following assertion failed: Preconditions.checkState(storageIDsCount == locs.size()); locs is ListDatanodeInfoProto Can you enhance the assertion to log more information ? Cheers On Thu, Mar 26, 2015 at 3:06 PM, Dale Johnson daljohn...@ebay.commailto:daljohn...@ebay.com wrote: There seems to be a special kind of corrupted according to Spark state of file in HDFS. I have isolated a set of files (maybe 1% of all files I need to work with) which are producing the following stack dump when I try to sc.textFile() open them. When I try to open directories, most large directories contain at least one file of this type. Curiously, the following two lines fail inside of a Spark job, but not inside of a Scoobi job: val conf = new org.apache.hadoop.conf.Configuration val fs = org.apache.hadoop.fs.FileSystem.get(conf) The stack trace follows: 15/03/26 14:22:43 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: null) Exception in thread Driver java.lang.IllegalStateException at org.spark-project.guava.common.base.Preconditions.checkState(Preconditions.java:133) at org.apache.hadoop.hdfs.protocolPB.PBHelper.convert(PBHelper.java:673) at org.apache.hadoop.hdfs.protocolPB.PBHelper.convertLocatedBlock(PBHelper.java:1100) at org.apache.hadoop.hdfs.protocolPB.PBHelper.convert(PBHelper.java:1118) at org.apache.hadoop.hdfs.protocolPB.PBHelper.convert(PBHelper.java:1251) at org.apache.hadoop.hdfs.protocolPB.PBHelper.convert(PBHelper.java:1354) at org.apache.hadoop.hdfs.protocolPB.PBHelper.convert(PBHelper.java:1363) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getListing(ClientNamenodeProtocolTranslatorPB.java:518) at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:1743) at org.apache.hadoop.hdfs.DistributedFileSystem$15.init(DistributedFileSystem.java:738) at org.apache.hadoop.hdfs.DistributedFileSystem.listLocatedStatus(DistributedFileSystem.java:727) at org.apache.hadoop.fs.FileSystem.listLocatedStatus(FileSystem.java:1662) at org.apache.hadoop.fs.FileSystem$5.init(FileSystem.java:1724) at org.apache.hadoop.fs.FileSystem.listFiles(FileSystem.java:1721) at com.ebay.ss.niffler.miner.speller.SpellQueryLaunch$$anonfun$main$2.apply(SpellQuery.scala:1125) at com.ebay.ss.niffler.miner.speller.SpellQueryLaunch$$anonfun$main$2.apply(SpellQuery.scala:1123) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at com.ebay.ss.niffler.miner.speller.SpellQueryLaunch$.main(SpellQuery.scala:1123) at com.ebay.ss.niffler.miner.speller.SpellQueryLaunch.main(SpellQuery.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:427) 15/03/26 14:22:43 INFO yarn.ApplicationMaster: Invoking sc stop from shutdown hook It appears to have found the three copies of the given HDFS block, but is performing some sort of validation with them before giving them back to spark to schedule the job. But there is an assert failing. I've tried this with 1.2.0, 1.2.1 and 1.3.0, and I get the exact same error, but I've seen the line numbers change on the HDFS libraries, but not the function names. I've tried recompiling myself with different hadoop versions, and it's the same. We're running hadoop 2.4.1 on our cluster. A google search turns up absolutely nothing on this. Any insight at all would be appreciated. Dale Johnson Applied Researcher eBay.comhttp://eBay.com -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Can-t-access-file-in-spark-but-can-in-hadoop-tp22251.html Sent from the Apache Spark User List
RE: 2 input paths generate 3 partitions
The files sound too small to be 2 blocks in HDFS. Did you set the defaultParallelism to be 3 in your spark? Yong Subject: Re: 2 input paths generate 3 partitions From: zzh...@hortonworks.com To: rvern...@gmail.com CC: user@spark.apache.org Date: Fri, 27 Mar 2015 23:15:38 + Hi Rares, The number of partition is controlled by HDFS input format, and one file may have multiple partitions if it consists of multiple block. In you case, I think there is one file with 2 splits. Thanks. Zhan Zhang On Mar 27, 2015, at 3:12 PM, Rares Vernica rvern...@gmail.com wrote: Hello, I am using the Spark shell in Scala on the localhost. I am using sc.textFile to read a directory. The directory looks like this (generated by another Spark script): part-0 part-1 _SUCCESS The part-0 has four short lines of text while part-1 has two short lines of text. The _SUCCESS file is empty. When I check the number of partitions on the RDD I get: scala foo.partitions.length 15/03/27 14:57:31 INFO FileInputFormat: Total input paths to process : 2 res68: Int = 3 I wonder why do the two input files generate three partitions. Does Spark check the number of lines in each file and try to generate three balanced partitions? Thanks! Rares
Streaming anomaly detection using ARIMA
I want to use ARIMA for a predictive model so that I can take time series data (metrics) and perform a light anomaly detection. The time series data is going to be bucketed to different time units (several minutes within several hours, several hours within several days, several days within several years. I want to do the algorithm in Spark Streaming. I'm used to tuple at a time streaming and I'm having a tad bit of trouble gaining insight into how exactly the windows are managed inside of DStreams. Let's say I have a simple dataset that is marked by a key/value tuple where the key is the name of the component who's metrics I want to run the algorithm against and the value is a metric (a value representing a sum for the time bucket. I want to create histograms of the time series data for each key in the windows in which they reside so I can use that histogram vector to generate my ARIMA prediction (actually, it seems like this doesn't just apply to ARIMA but could apply to any sliding average). I *think* my prediction code may look something like this: val predictionAverages = dstream .groupByKeyAndWindow(60*60*24, 60*60*24) .mapValues(applyARIMAFunction) That is, keep 24 hours worth of metrics in each window and use that for the ARIMA prediction. The part I'm struggling with is how to join together the actual values so that i can do my comparison against the prediction model. Let's say dstream contains the actual values. For any time window, I should be able to take a previous set of windows and use model to compare against the current values.
Setting a custom loss function for GradientDescent
I am working with the mllib.optimization.GradientDescent class and I'm confused about how to set a custom loss function with setGradient? For instance, if I wanted my loss function to be x^2 how would I go about setting it using setGradient? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Setting-a-custom-loss-function-for-GradientDescent-tp22263.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: k-means can only run on one executor with one thread?
Yes, I have done repartition. I tried to repartition to the number of cores in my cluster. Not helping... I tried to repartition to the number of centroids (k value). Not helping... On Sat, Mar 28, 2015 at 7:27 AM Joseph Bradley jos...@databricks.com wrote: Can you try specifying the number of partitions when you load the data to equal the number of executors? If your ETL changes the number of partitions, you can also repartition before calling KMeans. On Thu, Mar 26, 2015 at 8:04 PM, Xi Shen davidshe...@gmail.com wrote: Hi, I have a large data set, and I expects to get 5000 clusters. I load the raw data, convert them into DenseVector; then I did repartition and cache; finally I give the RDD[Vector] to KMeans.train(). Now the job is running, and data are loaded. But according to the Spark UI, all data are loaded onto one executor. I checked that executor, and its CPU workload is very low. I think it is using only 1 of the 8 cores. And all other 3 executors are at rest. Did I miss something? Is it possible to distribute the workload to all 4 executors? Thanks, David
Re: SparkSQL overwrite parquet file does not generate _common_metadata
JIRA ticket created at: https://issues.apache.org/jira/browse/SPARK-6581 Thanks, -- Pei-Lun On Fri, Mar 27, 2015 at 7:03 PM, Cheng Lian lian.cs@gmail.com wrote: Thanks for the information. Verified that the _common_metadata and _metadata file are missing in this case when using Hadoop 1.0.4. Would you mind to open a JIRA for this? Cheng On 3/27/15 2:40 PM, Pei-Lun Lee wrote: I'm using 1.0.4 Thanks, -- Pei-Lun On Fri, Mar 27, 2015 at 2:32 PM, Cheng Lian lian.cs@gmail.com wrote: Hm, which version of Hadoop are you using? Actually there should also be a _metadata file together with _common_metadata. I was using Hadoop 2.4.1 btw. I'm not sure whether Hadoop version matters here, but I did observe cases where Spark behaves differently because of semantic differences of the same API in different Hadoop versions. Cheng On 3/27/15 11:33 AM, Pei-Lun Lee wrote: Hi Cheng, on my computer, execute res0.save(xxx, org.apache.spark.sql.SaveMode. Overwrite) produces: peilunlee@pllee-mini:~/opt/spark-1.3...rc3-bin-hadoop1$ ls -l xxx total 32 -rwxrwxrwx 1 peilunlee staff0 Mar 27 11:29 _SUCCESS* -rwxrwxrwx 1 peilunlee staff 272 Mar 27 11:29 part-r-1.parquet* -rwxrwxrwx 1 peilunlee staff 272 Mar 27 11:29 part-r-2.parquet* -rwxrwxrwx 1 peilunlee staff 272 Mar 27 11:29 part-r-3.parquet* -rwxrwxrwx 1 peilunlee staff 488 Mar 27 11:29 part-r-4.parquet* while res0.save(xxx) produces: peilunlee@pllee-mini:~/opt/spark-1.3...rc3-bin-hadoop1$ ls -l xxx total 40 -rwxrwxrwx 1 peilunlee staff0 Mar 27 11:29 _SUCCESS* -rwxrwxrwx 1 peilunlee staff 250 Mar 27 11:29 _common_metadata* -rwxrwxrwx 1 peilunlee staff 272 Mar 27 11:29 part-r-1.parquet* -rwxrwxrwx 1 peilunlee staff 272 Mar 27 11:29 part-r-2.parquet* -rwxrwxrwx 1 peilunlee staff 272 Mar 27 11:29 part-r-3.parquet* -rwxrwxrwx 1 peilunlee staff 488 Mar 27 11:29 part-r-4.parquet* On Thu, Mar 26, 2015 at 7:26 PM, Cheng Lian lian.cs@gmail.com wrote: I couldn’t reproduce this with the following spark-shell snippet: scala import sqlContext.implicits._ scala Seq((1, 2)).toDF(a, b) scala res0.save(xxx, org.apache.spark.sql.SaveMode.Overwrite) scala res0.save(xxx, org.apache.spark.sql.SaveMode.Overwrite) The _common_metadata file is typically much smaller than _metadata, because it doesn’t contain row group information, and thus can be faster to read than _metadata. Cheng On 3/26/15 12:48 PM, Pei-Lun Lee wrote: Hi, When I save parquet file with SaveMode.Overwrite, it never generate _common_metadata. Whether it overwrites an existing dir or not. Is this expected behavior? And what is the benefit of _common_metadata? Will reading performs better when it is present? Thanks, -- Pei-Lun
Re: 2 input paths generate 3 partitions
Hi, I am not using HDFS, I am using the local file system. Moreover, I did not modify the defaultParallelism. The Spark instance is the default one started by Spark Shell. Thanks! Rares On Fri, Mar 27, 2015 at 4:48 PM, java8964 java8...@hotmail.com wrote: The files sound too small to be 2 blocks in HDFS. Did you set the defaultParallelism to be 3 in your spark? Yong -- Subject: Re: 2 input paths generate 3 partitions From: zzh...@hortonworks.com To: rvern...@gmail.com CC: user@spark.apache.org Date: Fri, 27 Mar 2015 23:15:38 + Hi Rares, The number of partition is controlled by HDFS input format, and one file may have multiple partitions if it consists of multiple block. In you case, I think there is one file with 2 splits. Thanks. Zhan Zhang On Mar 27, 2015, at 3:12 PM, Rares Vernica rvern...@gmail.com wrote: Hello, I am using the Spark shell in Scala on the localhost. I am using sc.textFile to read a directory. The directory looks like this (generated by another Spark script): part-0 part-1 _SUCCESS The part-0 has four short lines of text while part-1 has two short lines of text. The _SUCCESS file is empty. When I check the number of partitions on the RDD I get: scala foo.partitions.length 15/03/27 14:57:31 INFO FileInputFormat: Total input paths to process : 2 res68: Int = 3 I wonder why do the two input files generate three partitions. Does Spark check the number of lines in each file and try to generate three balanced partitions? Thanks! Rares
unable to read avro file
Hi I am following the instruction on this website. http://www.infoobjects.com/spark-with-avro/ I installed the sparkavro libary on https://github.com/databricks/spark-avro on a machine which only has hive gateway client role on a hadoop cluster. somehow I got error on reading the avro file. scala val ufos = sqlContext.avroFile(hdfs://sss/x/iii/iiiAaaaSs/hh//00/22/22/iiiAaaaSs.6.11.469465.2099222123.142749720.avro) console:20: error: erroneous or inaccessible type val ufos = sqlContext.avroFile(hdfs://sss/x/iii/iiiAaaaSs/hh//00/22/22/iiiAaaaSs.6.11.469465.2099222123.142749720.avro) Any advice please? Thank you! J
Re: Single threaded laptop implementation beating a 128 node GraphX cluster on a 1TB data set (128 billion nodes) - What is a use case for GraphX then? when is it worth the cost?
(I bet the Spark implementation could be improved. I bet GraphX could be optimized.) Not sure about this one, but in core benchmarks often start by assuming that the data is local. In the real world, data is unlikely to be. The benchmark has to include the cost of bringing all the data to the local computation too, since the point of distributed computation is bringing work to the data. Specialist implementations for a special problem should always win over generalist, and Spark is a generalist. Likewise you can factor matrices way faster in a GPU than in Spark. These aren't entirely either/or propositions; you can use Rust or GPU in a larger distributed program. Typically a real-world problem involves more than core computation: ETL, security, monitoring. Generalists are more likely to have an answer to hand for these. Specialist implementations do just one thing, and they typically have to be custom built. Compare the cost of highly skilled developer time to generalist computing resources; $1m buys several dev years but also rents a small data center. Speed is an important issue but by no means everything in the real world, and these are rarely mutually exclusive options in the OSS world. This is a great piece of work, but I don't think it's some kind of argument against distributed computing. On Fri, Mar 27, 2015 at 6:32 PM, Eran Medan ehrann.meh...@gmail.com wrote: Remember that article that went viral on HN? (Where a guy showed how GraphX / Giraph / GraphLab / Spark have worse performance on a 128 cluster than on a 1 thread machine? if not here is the article -http://www.frankmcsherry.org/graph/scalability/cost/2015/01/15/COST.html) Well as you may recall, this stirred up a lot of commotion in the big data community (and Spark/GraphX in particular) People (justly I guess) blamed him for not really having “big data”, as all of his data set fits in memory, so it doesn't really count. So he took the challenge and came with a pretty hard to argue counter benchmark, now with a huge data set (1TB of data, encoded using Hilbert curves to 154GB, but still large). see at - http://www.frankmcsherry.org/graph/scalability/cost/2015/02/04/COST2.html He provided the source here https://github.com/frankmcsherry/COST as an example His benchmark shows how on a 128 billion edges graph, he got X2 to X10 faster results on a single threaded Rust based implementation So, what is the counter argument? it pretty much seems like a blow in the face of Spark / GraphX etc, (which I like and use on a daily basis) Before I dive into re-validating his benchmarks with my own use cases. What is your opinion on this? If this is the case, then what IS the use case for using Spark/GraphX at all? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Understanding Spark Memory distribution
I have increased the spark.storage.memoryFraction to 0.4 but I still get OOM errors on Spark Executor nodes 15/03/27 23:19:51 INFO BlockManagerMaster: Updated info of block broadcast_5_piece10 15/03/27 23:19:51 INFO TorrentBroadcast: Reading broadcast variable 5 took 2704 ms 15/03/27 23:19:52 INFO MemoryStore: ensureFreeSpace(672530208) called with curMem=2484698683, maxMem=9631778734 15/03/27 23:19:52 INFO MemoryStore: Block broadcast_5 stored as values in memory (estimated size 641.4 MB, free 6.0 GB) 15/03/27 23:34:02 WARN AkkaUtils: Error sending message in 1 attempts java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:187) at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:407) 15/03/27 23:34:02 ERROR Executor: Exception in task 7.0 in stage 2.0 (TID 4007) java.lang.OutOfMemoryError: GC overhead limit exceeded at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1986) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) Thanks Ankur On Fri, Mar 27, 2015 at 2:52 PM, Ankur Srivastava ankur.srivast...@gmail.com wrote: Hi All, I am running a spark cluster on EC2 instances of type: m3.2xlarge. I have given 26gb of memory with all 8 cores to my executors. I can see that in the logs too: *15/03/27 21:31:06 INFO AppClient$ClientActor: Executor added: app-20150327213106-/0 on worker-20150327212934-10.x.y.z-40128 (10.x.y.z:40128) with 8 cores* I am not caching any RDD so I have set spark.storage.memoryFraction to 0.2. I can see on SparkUI under executors tab Memory used is 0.0/4.5 GB. I am now confused with these logs? *15/03/27 21:31:08 INFO BlockManagerMasterActor: Registering block manager 10.77.100.196:58407 http://10.77.100.196:58407 with 4.5 GB RAM, BlockManagerId(4, 10.x.y.z, 58407)* I am broadcasting a large object of 3 gb and after that when I am creating an RDD, I see logs which show this 4.5 GB memory getting full and then I get OOM. How can I make block manager use more memory? Is there any other fine tuning I need to do for broadcasting large objects? And does broadcast variable use cache memory or rest of the heap? Thanks Ankur
Re: unable to read avro file
never mind. find my spark is still 1.2 but the avro library requires 1.3. will try again. On Fri, Mar 27, 2015 at 9:38 PM, Joanne Contact joannenetw...@gmail.com wrote: Hi I am following the instruction on this website. http://www.infoobjects.com/spark-with-avro/ I installed the sparkavro libary on https:// github.com/databricks/spark-avro on a machine which only has hive gateway client role on a hadoop cluster. somehow I got error on reading the avro file. scala val ufos = sqlContext.avroFile(hdfs://sss/x/iii/iiiAaaaSs/hh//00/22/22/iiiAaaaSs.6.11.469465. 2099222123.142749720.avro) console:20: error: erroneous or inaccessible type val ufos = sqlContext.avroFile(hdfs://sss/x/iii/iiiAaaaSs/hh//00/22/22/iiiAaaaSs.6.11.469465. 2099222123.142749720.avro) Any advice please? Thank you! J
rdd.toDF().saveAsParquetFile(tachyon://host:19998/test)
spark version is 1.3.0 with tanhyon-0.6.1 QUESTION DESCRIPTION: rdd.saveAsObjectFile(tachyon://host:19998/test) and rdd.saveAsTextFile(tachyon://host:19998/test) succeed, but rdd.toDF().saveAsParquetFile(tachyon://host:19998/test) failure. ERROR MESSAGE:java.lang.IllegalArgumentException: Wrong FS: tachyon://host:19998/test, expected: hdfs://host:8020 at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:645) at org.apache.hadoop.fs.FileSystem.makeQualified(FileSystem.java:465) at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$6.apply(newParquet.scala:252) at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$6.apply(newParquet.scala:251) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/rdd-toDF-saveAsParquetFile-tachyon-host-19998-test-tp22264.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org