PYSPARK_PYTHON doesn't work in spark worker
Hi all I had following configuration in spark worker (spark-env.sh) export PYTHON_HOME=/opt/soft/anaconda2 export PYSPARK_PYTHON=$PYTHON_HOME/bin/python I'm try to run a simple test script on pyspark --master yarn --queue spark --executor-cores 1 --num-executors 10 from pyspark import SparkContext, SparkConf df = sc.textFile("/user/hive/warehouse/hdptest.db/111"); df.collect(); sortedCount = df.map(lambda x: (float(x), 1)).sortByKey(); sortedCount.collect(); However , i get following error : org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 1.0 failed 4 times, most recent failure: Lost task 1.3 in stage 1.0 (TID 7, datanode-8): org.apache.spark.SparkException: Error from python worker: /usr/bin/python: module pyspark.daemon not found PYTHONPATH was: /home/hadoop/spark/python/lib/pyspark.zip:/home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip:/data/hadoop_local/usercache/hdptest/filecache/38/spark-assembly-1.4.1-hadoop2.5.2.jar:pyspark.zip:py4j-0.8.2.1-src.zip It seem that PYSPARK_PYTHON doesn't work in spark worker , can someone please help me to solve it ? Thanks~ guoqing0...@yahoo.com.hk
Table is modified by DataFrameWriter
Hi all, I found the table structure was modified when use DataFrameWriter.jdbc to save the content of DataFrame , sqlContext.sql("select '2015-09-17',count(1) from test").write.jdbc(url,test,properties) table structure before saving: app_key text t_amount bigint(20) saved: _c0 text _c1 bigint(20) Is there any way to just save the field in sequence and do not alter the table ? Thanks!
Re: Re: Table is modified by DataFrameWriter
I tried SaveMode.Append and SaveMode.Overwrite , the output table was modified . Is the _c0 and _c1 automatically generated for the DataFrame Schema? In my scenario , i hope it just flush the data from DataFrame to RMDB if there are the same structure between on both . but i found the column name was modified. From: Josh Rosen Date: 2015-09-17 11:42 To: guoqing0...@yahoo.com.hk CC: Ted Yu; user Subject: Re: Re: Table is modified by DataFrameWriter What are your JDBC properties configured to? Do you have overwrite mode enabled? On Wed, Sep 16, 2015 at 7:39 PM, guoqing0...@yahoo.com.hk <guoqing0...@yahoo.com.hk> wrote: Spark-1.4.1 From: Ted Yu Date: 2015-09-17 10:29 To: guoqing0...@yahoo.com.hk CC: user Subject: Re: Table is modified by DataFrameWriter Can you tell us which release you were using ? Thanks On Sep 16, 2015, at 7:11 PM, "guoqing0...@yahoo.com.hk" <guoqing0...@yahoo.com.hk> wrote: Hi all, I found the table structure was modified when use DataFrameWriter.jdbc to save the content of DataFrame , sqlContext.sql("select '2015-09-17',count(1) from test").write.jdbc(url,test,properties) table structure before saving: app_key text t_amount bigint(20) saved: _c0 text _c1 bigint(20) Is there any way to just save the field in sequence and do not alter the table ? Thanks!
Re: Re: Table is modified by DataFrameWriter
Spark-1.4.1 From: Ted Yu Date: 2015-09-17 10:29 To: guoqing0...@yahoo.com.hk CC: user Subject: Re: Table is modified by DataFrameWriter Can you tell us which release you were using ? Thanks On Sep 16, 2015, at 7:11 PM, "guoqing0...@yahoo.com.hk" <guoqing0...@yahoo.com.hk> wrote: Hi all, I found the table structure was modified when use DataFrameWriter.jdbc to save the content of DataFrame , sqlContext.sql("select '2015-09-17',count(1) from test").write.jdbc(url,test,properties) table structure before saving: app_key text t_amount bigint(20) saved: _c0 text _c1 bigint(20) Is there any way to just save the field in sequence and do not alter the table ? Thanks!
java.util.NoSuchElementException: key not found
Hi all , After upgrade spark to 1.5 , Streaming throw java.util.NoSuchElementException: key not found occasionally , is the problem of data cause this error ? please help me if anyone got similar problem before , Thanks very much. the exception accur when write into database. org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 5.0 failed 4 times, most recent failure: Lost task 0.3 in stage 5.0 (TID 76, slave2): java.util.NoSuchElementException: key not found: ruixue.sys.session.request at scala.collection.MapLike$class.default(MapLike.scala:228) at scala.collection.AbstractMap.default(Map.scala:58) at scala.collection.mutable.HashMap.apply(HashMap.scala:64) at org.apache.spark.sql.columnar.compression.DictionaryEncoding$Encoder.compress(compressionSchemes.scala:258) at org.apache.spark.sql.columnar.compression.CompressibleColumnBuilder$class.build(CompressibleColumnBuilder.scala:110) at org.apache.spark.sql.columnar.NativeColumnBuilder.build(ColumnBuilder.scala:87) at org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1$$anonfun$next$2.apply(InMemoryColumnarTableScan.scala:152) at org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1$$anonfun$next$2.apply(InMemoryColumnarTableScan.scala:152) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(InMemoryColumnarTableScan.scala:152) at org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(InMemoryColumnarTableScan.scala:120) at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:278) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78) at org.apache.spark.rdd.RDD.iterator(RDD.scala:262) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) guoqing0...@yahoo.com.hk
Insert operation in Dataframe
Hi all , Is the Dataframe support the insert operation , like sqlContext.sql(insert into table1 xxx select xxx from table2) ? guoqing0...@yahoo.com.hk
streaming issue
Hi, I got a error when running spark streaming as below . java.lang.reflect.InvocationTargetException at sun.reflect.GeneratedMethodAccessor13.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$3.apply(EventLoggingListener.scala:144) at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$3.apply(EventLoggingListener.scala:144) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:144) at org.apache.spark.scheduler.EventLoggingListener.onUnpersistRDD(EventLoggingListener.scala:175) at org.apache.spark.scheduler.SparkListenerBus$class.onPostEvent(SparkListenerBus.scala:50) at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31) at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31) at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:53) at org.apache.spark.util.AsynchronousListenerBus.postToAll(AsynchronousListenerBus.scala:36) at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(AsynchronousListenerBus.scala:76) at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply(AsynchronousListenerBus.scala:61) at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply(AsynchronousListenerBus.scala:61) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1618) at org.apache.spark.util.AsynchronousListenerBus$$anon$1.run(AsynchronousListenerBus.scala:60) Caused by: java.io.IOException: All datanodes 10.153.192.159:50010 are bad. Aborting... at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1137) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.processDatanodeError(DFSOutputStream.java:933) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:487) 15/07/28 02:01:10 ERROR LiveListenerBus: Listener EventLoggingListener threw an exception java.lang.reflect.InvocationTargetException at sun.reflect.GeneratedMethodAccessor13.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$3.apply(EventLoggingListener.scala:144) at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$3.apply(EventLoggingListener.scala:144) at scala.Option.foreach(Option.scala:236) I had set the ulimit in /etc/security/limits.conf , but still get the same exception . can please some body help me to resolved this issue ? core file size (blocks, -c) 0 data seg size (kbytes, -d) unlimited scheduling priority (-e) 0 file size (blocks, -f) unlimited pending signals (-i) 264192 max locked memory (kbytes, -l) 32 max memory size (kbytes, -m) unlimited open files (-n) 65535 pipe size(512 bytes, -p) 8 POSIX message queues (bytes, -q) 819200 real-time priority (-r) 0 stack size (kbytes, -s) 10240 cpu time (seconds, -t) unlimited max user processes (-u) 34816 virtual memory (kbytes, -v) unlimited file locks (-x) unlimited Thanks .
Spark build with Hive
Hi , is the Spark-1.3.1 can build with the Hive-1.2 ? it seem to Spark-1.3.1 can only build with 0.13 , 0.12 according to the document . # Apache Hadoop 2.4.X with Hive 13 support mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean package # Apache Hadoop 2.4.X with Hive 12 support mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive -Phive-0.12.0 -Phive-thriftserver -DskipTests clean package guoqing0...@yahoo.com.hk
Re: RE: Spark build with Hive
Thanks very much , Which version will be support In the upcome 1.4 ? I hope it will be support more versions. guoqing0...@yahoo.com.hk From: Cheng, Hao Date: 2015-05-21 11:20 To: Ted Yu; guoqing0...@yahoo.com.hk CC: user Subject: RE: Spark build with Hive Yes, ONLY support 0.12.0 and 0.13.1 currently. Hopefully we can support higher versions in next 1 or 2 releases. From: Ted Yu [mailto:yuzhih...@gmail.com] Sent: Thursday, May 21, 2015 11:12 AM To: guoqing0...@yahoo.com.hk Cc: user Subject: Re: Spark build with Hive I am afraid even Hive 1.0 is not supported, let alone Hive 1.2 Cheers On Wed, May 20, 2015 at 8:08 PM, guoqing0...@yahoo.com.hk guoqing0...@yahoo.com.hk wrote: Hi , is the Spark-1.3.1 can build with the Hive-1.2 ? it seem to Spark-1.3.1 can only build with 0.13 , 0.12 according to the document . # Apache Hadoop 2.4.X with Hive 13 supportmvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean package# Apache Hadoop 2.4.X with Hive 12 supportmvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive -Phive-0.12.0 -Phive-thriftserver -DskipTests clean package guoqing0...@yahoo.com.hk
Hive on Spark VS Spark SQL
Hive on Spark and SparkSQL which should be better , and what are the key characteristics and the advantages and the disadvantages between ? guoqing0...@yahoo.com.hk
Worker Core in Spark
Assume that i had several mathines with 8cores , 1 core per work with 8 workers , 8 cores per work with 1 work , which one is better ?
Re: Re: implicit function in SparkStreaming
Thank you for your pointers , it`s very helpful to me , in this scenario how can i use the implicit def in the enclosing class ? From: Tathagata Das Date: 2015-04-30 07:00 To: guoqing0...@yahoo.com.hk CC: user Subject: Re: implicit function in SparkStreaming I believe that the implicit def is pulling in the enclosing class (in which the def is defined) in the closure which is not serializable. On Wed, Apr 29, 2015 at 4:20 AM, guoqing0...@yahoo.com.hk guoqing0...@yahoo.com.hk wrote: Hi guys, I`m puzzled why i cant use the implicit function in spark streaming to cause Task not serializable . code snippet: implicit final def str2KeyValue(s:String): (String,String) = { val message = s.split(\\|) if(message.length = 2) (message(0),message(1)) else if(message.length == 1) { (message(0), ) } else (,) } def filter(stream:DStream[String]) :DStream[String] = { stream.filter(s = { (s._1==Action s._2==TRUE) }) Could you please give me some pointers ? Thank you . guoqing0...@yahoo.com.hk
Re: Re: implicit function in SparkStreaming
Appreciate for your help , it works . i`m curious why the enclosing class cannot serialized , is it need to extends java.io.Serializable ? if object never serialized how it works in the task .whether there`s any association with the spark.closure.serializer . guoqing0...@yahoo.com.hk From: Tathagata Das Date: 2015-04-30 09:30 To: guoqing0...@yahoo.com.hk CC: user Subject: Re: Re: implicit function in SparkStreaming Could you put the implicit def in an object? That should work, as objects are never serialized. On Wed, Apr 29, 2015 at 6:28 PM, guoqing0...@yahoo.com.hk guoqing0...@yahoo.com.hk wrote: Thank you for your pointers , it`s very helpful to me , in this scenario how can i use the implicit def in the enclosing class ? From: Tathagata Das Date: 2015-04-30 07:00 To: guoqing0...@yahoo.com.hk CC: user Subject: Re: implicit function in SparkStreaming I believe that the implicit def is pulling in the enclosing class (in which the def is defined) in the closure which is not serializable. On Wed, Apr 29, 2015 at 4:20 AM, guoqing0...@yahoo.com.hk guoqing0...@yahoo.com.hk wrote: Hi guys, I`m puzzled why i cant use the implicit function in spark streaming to cause Task not serializable . code snippet: implicit final def str2KeyValue(s:String): (String,String) = { val message = s.split(\\|) if(message.length = 2) (message(0),message(1)) else if(message.length == 1) { (message(0), ) } else (,) } def filter(stream:DStream[String]) :DStream[String] = { stream.filter(s = { (s._1==Action s._2==TRUE) }) Could you please give me some pointers ? Thank you . guoqing0...@yahoo.com.hk
Is the Spark-1.3.1 support build with scala 2.8 ?
Is the Spark-1.3.1 support build with scala 2.8 ? Wether it can integrated with kafka_2.8.0-0.8.0 If build with scala 2.10 . Thanks.
Re: Re: HiveContext setConf seems not stable
Hi all , My understanding for this problem is SQLConf will be overwrite by the hiveconfig in initialization phase when setConf(key: String, value: String) being called in the first time as below code snippets , so it is correctly in later. I`m not sure whether it is right , any point are welcome. Thanks. @transient protected[hive] lazy val hiveconf: HiveConf = { setConf(sessionState.getConf.getAllProperties) sessionState.getConf } protected def runHive(cmd: String, maxRows: Int = 1000): Seq[String] = synchronized { try { val cmd_trimmed: String = cmd.trim() val tokens: Array[String] = cmd_trimmed.split(\\s+) val cmd_1: String = cmd_trimmed.substring(tokens(0).length()).trim() val proc: CommandProcessor = HiveShim.getCommandProcessor(Array(tokens(0)), hiveconf)...}protected[sql] def runSqlHive(sql: String): Seq[String] = { val maxResults = 10 val results = runHive(sql, maxResults) // It is very confusing when you only get back some of the results... if (results.size == maxResults) sys.error(RESULTS POSSIBLY TRUNCATED) results }override def setConf(key: String, value: String): Unit = { super.setConf(key, value) runSqlHive(sSET $key=$value) } From: madhu phatak Date: 2015-04-23 02:17 To: Michael Armbrust CC: Ophir Cohen; Hao Ren; user Subject: Re: HiveContext setConf seems not stable Hi, calling getConf don't solve the issue. Even many hive specific queries are broken. Seems like no hive configurations are getting passed properly. Regards, Madhukara Phatak http://datamantra.io/ On Wed, Apr 22, 2015 at 2:19 AM, Michael Armbrust mich...@databricks.com wrote: As a workaround, can you call getConf first before any setConf? On Tue, Apr 21, 2015 at 1:58 AM, Ophir Cohen oph...@gmail.com wrote: I think I encounter the same problem, I'm trying to turn on the compression of Hive. I have the following lines: def initHiveContext(sc: SparkContext): HiveContext = { val hc: HiveContext = new HiveContext(sc) hc.setConf(hive.exec.compress.output, true) hc.setConf(mapreduce.output.fileoutputformat.compress.codec, org.apache.hadoop.io.compress.SnappyCodec) hc.setConf(mapreduce.output.fileoutputformat.compress.type, BLOCK) logger.info(hc.getConf(hive.exec.compress.output)) logger.info(hc.getConf(mapreduce.output.fileoutputformat.compress.codec)) logger.info(hc.getConf(mapreduce.output.fileoutputformat.compress.type)) hc } And the log for calling it twice: 15/04/21 08:37:39 INFO util.SchemaRDDUtils$: false 15/04/21 08:37:39 INFO util.SchemaRDDUtils$: org.apache.hadoop.io.compress.SnappyCodec 15/04/21 08:37:39 INFO util.SchemaRDDUtils$: BLOCK 15/04/21 08:37:39 INFO util.SchemaRDDUtils$: true 15/04/21 08:37:39 INFO util.SchemaRDDUtils$: org.apache.hadoop.io.compress.SnappyCodec 15/04/21 08:37:39 INFO util.SchemaRDDUtils$: BLOCK BTW It worked on 1.2.1... On Thu, Apr 2, 2015 at 11:47 AM, Hao Ren inv...@gmail.com wrote: Hi, Jira created: https://issues.apache.org/jira/browse/SPARK-6675 Thank you. On Wed, Apr 1, 2015 at 7:50 PM, Michael Armbrust mich...@databricks.com wrote: Can you open a JIRA please? On Wed, Apr 1, 2015 at 9:38 AM, Hao Ren inv...@gmail.com wrote: Hi, I find HiveContext.setConf does not work correctly. Here are some code snippets showing the problem: snippet 1: import org.apache.spark.sql.hive.HiveContext import org.apache.spark.{SparkConf, SparkContext} object Main extends App { val conf = new SparkConf() .setAppName(context-test) .setMaster(local[8]) val sc = new SparkContext(conf) val hc = new HiveContext(sc) hc.setConf(spark.sql.shuffle.partitions, 10) hc.setConf(hive.metastore.warehouse.dir, /home/spark/hive/warehouse_test) hc.getAllConfs filter(_._1.contains(warehouse.dir)) foreach println hc.getAllConfs filter(_._1.contains(shuffle.partitions)) foreach println } Results: (hive.metastore.warehouse.dir,/home/spark/hive/warehouse_test) (spark.sql.shuffle.partitions,10) snippet 2: ... hc.setConf(hive.metastore.warehouse.dir, /home/spark/hive/warehouse_test) hc.setConf(spark.sql.shuffle.partitions, 10) hc.getAllConfs filter(_._1.contains(warehouse.dir)) foreach println hc.getAllConfs filter(_._1.contains(shuffle.partitions)) foreach println ... Results: (hive.metastore.warehouse.dir,/user/hive/warehouse) (spark.sql.shuffle.partitions,10) You can see that I just permuted the two setConf call, then that leads to two different Hive configuration. It seems that HiveContext can not set a new value on
Re: Re: Is the Spark-1.3.1 support build with scala 2.8 ?
Thank you very much for your suggestion. Regards, From: madhu phatak Date: 2015-04-24 13:06 To: guoqing0...@yahoo.com.hk CC: user Subject: Re: Is the Spark-1.3.1 support build with scala 2.8 ? Hi, AFAIK it's only build with 2.10 and 2.11. You should integrate kafka_2.10.0-0.8.0 to make it work. Regards, Madhukara Phatak http://datamantra.io/ On Fri, Apr 24, 2015 at 9:22 AM, guoqing0...@yahoo.com.hk guoqing0...@yahoo.com.hk wrote: Is the Spark-1.3.1 support build with scala 2.8 ? Wether it can integrated with kafka_2.8.0-0.8.0 If build with scala 2.10 . Thanks.
Re: Re: problem with spark thrift server
Thanks for your reply , i would like to use Spark Thriftserver as JDBC SQL interface and the Spark application running on YARN . but the application was FINISHED when the Thriftserver crashed , all the cached table was lost . Thriftserver start command: start-thriftserver.sh --master yarn --executor-memory 20480m --executor-cores 2 --num-executors 20 --queue spark My question is whether the Thriftserver has anyother more stable mode on YARN , like active standby in the Thriftserver . Really appreciate for any suggestions and idea . Thanks. From: Arush Kharbanda Date: 2015-04-23 18:40 To: guoqing0...@yahoo.com.hk CC: user Subject: Re: problem with spark thrift server Hi What do you mean disable the driver? what are you trying to achieve. Thanks Arush On Thu, Apr 23, 2015 at 12:29 PM, guoqing0...@yahoo.com.hk guoqing0...@yahoo.com.hk wrote: Hi , I have a question about spark thrift server , i deployed the spark on yarn and found if the spark driver disable , the spark application will be crashed on yarn. appreciate for any suggestions and idea . Thank you! -- Arush Kharbanda || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com