Null pointer exception when using com.databricks.spark.csv
Hi, i am using spark 1.6.0 prebuilt hadoop 2.6.0 version in my windows machine. i was trying to use databricks csv format to read csv file. i used the below command. [image: Inline image 1] I got null pointer exception. Any help would be greatly appreciated. [image: Inline image 2] -- Selvam Raman "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
Re: Null pointer exception when using com.databricks.spark.csv
Hi, i can able to load and extract the data. only problem when i using this databricks library. thanks, selvam R On Wed, Mar 30, 2016 at 9:33 AM, Hyukjin Kwon wrote: > Hi, > > I guess this is not a CSV-datasource specific problem. > > Does loading any file (eg. textFile()) work as well? > > I think this is related with this thread, > http://apache-spark-user-list.1001560.n3.nabble.com/Error-while-running-example-scala-application-using-spark-submit-td10056.html > . > > > 2016-03-30 12:44 GMT+09:00 Selvam Raman : > >> Hi, >> >> i am using spark 1.6.0 prebuilt hadoop 2.6.0 version in my windows >> machine. >> >> i was trying to use databricks csv format to read csv file. i used the >> below command. >> >> [image: Inline image 1] >> >> I got null pointer exception. Any help would be greatly appreciated. >> >> [image: Inline image 2] >> >> -- >> Selvam Raman >> "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து" >> > > -- Selvam Raman "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
HiveContext in spark
I Could not able to use Insert , update and delete command in HiveContext. i am using spark 1.6.1 version and hive 1.1.0 Please find the error below. scala> hc.sql("delete from trans_detail where counter=1"); 16/04/12 14:58:45 INFO ParseDriver: Parsing command: delete from trans_detail where counter=1 16/04/12 14:58:45 INFO ParseDriver: Parse Completed 16/04/12 14:58:45 INFO ParseDriver: Parsing command: delete from trans_detail where counter=1 16/04/12 14:58:45 INFO ParseDriver: Parse Completed 16/04/12 14:58:45 INFO BlockManagerInfo: Removed broadcast_2_piece0 on localhost:60409 in memory (size: 46.9 KB, free: 536.7 MB) 16/04/12 14:58:46 INFO ContextCleaner: Cleaned accumulator 3 16/04/12 14:58:46 INFO BlockManagerInfo: Removed broadcast_4_piece0 on localhost:60409 in memory (size: 3.6 KB, free: 536.7 MB) org.apache.spark.sql.AnalysisException: Unsupported language features in query: delete from trans_detail where counter=1 TOK_DELETE_FROM 1, 0,11, 13 TOK_TABNAME 1, 5,5, 13 trans_detail 1, 5,5, 13 TOK_WHERE 1, 7,11, 39 = 1, 9,11, 39 TOK_TABLE_OR_COL 1, 9,9, 32 counter 1, 9,9, 32 1 1, 11,11, 40 scala.NotImplementedError: No parse rules for TOK_DELETE_FROM: TOK_DELETE_FROM 1, 0,11, 13 TOK_TABNAME 1, 5,5, 13 trans_detail 1, 5,5, 13 TOK_WHERE 1, 7,11, 39 = 1, 9,11, 39 TOK_TABLE_OR_COL 1, 9,9, 32 counter 1, 9,9, 32 1 1, 11,11, 40 org.apache.spark.sql.hive.HiveQl$.nodeToPlan(HiveQl.scala:1217) -- Selvam Raman "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
next on empty iterator though i used hasNext
I am reading a data from Kinesis stream (merging shard values with union stream) to spark streaming. then doing the following code to push the data to DB. splitCSV.foreachRDD(new VoidFunction2,Time>() { private static final long serialVersionUID = 1L; public void call(JavaRDD rdd, Time time) throws Exception { JavaRDD varMapRDD = rdd.map(new Function() { private static final long serialVersionUID = 1L; public SFieldBean call(String[] values) throws Exception { . ); varMapRDD.foreachPartition(new VoidFunction>( { private static final long serialVersionUID = 1L; MySQLConnectionHelper.getConnection("urlinfo"); @Override public void call(Iterator iterValues) throws Exception { while(iterValues.hasNext()) { } } Though I am using hasNext but it throws the follwing error Caused by: java.util.NoSuchElementException: next on empty iterator at scala.collection.Iterator$$anon$2.next(Iterator.scala:39) at scala.collection.Iterator$$anon$2.next(Iterator.scala:37) at scala.collection.IndexedSeqLike$Elements.next(IndexedSeqLike.scala:64) at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.convert.Wrappers$IteratorWrapper.next(Wrappers.scala:30) at com.test.selva.KinesisToSpark$2$2.call(KinesisToSpark.java:319) at com.test.selva.KinesisToSpark$2$2.call(KinesisToSpark.java:288) at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:225) at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:225) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) ... 3 more -- Selvam Raman "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
Re: Release Announcement: XGBoost4J - Portable Distributed XGBoost in Spark, Flink and Dataflow
XGBoost4J could integrate with spark from 1.6 version. Currently I am using spark 1.5.2. Can I use XGBoost instead of XGBoost4j. Will both provides same result. Thanks, Selvam R +91-97877-87724 On Mar 15, 2016 9:23 PM, "Nan Zhu" wrote: > Dear Spark Users and Developers, > > We (Distributed (Deep) Machine Learning Community (http://dmlc.ml/)) are > happy to announce the release of XGBoost4J ( > http://dmlc.ml/2016/03/14/xgboost4j-portable-distributed-xgboost-in-spark-flink-and-dataflow.html), > a Portable Distributed XGBoost in Spark, Flink and Dataflow > > XGBoost is an optimized distributed gradient boosting library designed to > be highly *efficient*, *flexible* and *portable*.XGBoost provides a > parallel tree boosting (also known as GBDT, GBM) that solve many data > science problems in a fast and accurate way. It has been the winning > solution for many machine learning scenarios, ranging from Machine Learning > Challenges ( > https://github.com/dmlc/xgboost/tree/master/demo#machine-learning-challenge-winning-solutions) > to Industrial User Cases ( > https://github.com/dmlc/xgboost/tree/master/demo#usecases) > > *XGBoost4J* is a new package in XGBoost aiming to provide the clean > Scala/Java APIs and the seamless integration with the mainstream data > processing platform, like Apache Spark. With XGBoost4J, users can run > XGBoost as a stage of Spark job and build a unified pipeline from ETL to > Model training to data product service within Spark, instead of jumping > across two different systems, i.e. XGBoost and Spark. (Example: > https://github.com/dmlc/xgboost/blob/master/jvm-packages/xgboost4j-example/src/main/scala/ml/dmlc/xgboost4j/scala/example/spark/DistTrainWithSpark.scala > ) > > Today, we release the first version of XGBoost4J to bring more choices to > the Spark users who are seeking the solutions to build highly efficient > data analytic platform and enrich the Spark ecosystem. We will keep moving > forward to integrate with more features of Spark. Of course, you are more > than welcome to join us and contribute to the project! > > For more details of distributed XGBoost, you can refer to the > recently published paper: http://arxiv.org/abs/1603.02754 > > Best, > > -- > Nan Zhu > http://codingcat.me >
Windows Rstudio to Linux spakR
Hi , How to connect to sparkR (which is available in Linux env) using Rstudio(Windows env). Please help me. -- Selvam Raman "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
Skew data
Hi, What is skew data. I read that if the data was skewed while joining it would take long time to finish the job.(99 percent finished in seconds where 1 percent of task taking minutes to hour). How to handle skewed data in spark. Thanks, Selvam R +91-97877-87724
Sqoop On Spark
Hi Team, how can i use spark as execution engine in sqoop2. i see the patch(S QOOP-1532 <https://issues.apache.org/jira/browse/SQOOP-1532>) but it shows in progess. so can not we use sqoop on spark. Please help me if you have an any idea. -- Selvam Raman "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
SparkSession for RDBMS
Hi All, I would like to read the data from RDBMS to spark (2.0) using sparksession. How can i decide upper boundary, lower boundary and partitions. is there any specific approach available. How Sqoop2 decides number of partitions, upper and lower boundary if we are not specifying anything. -- Selvam Raman "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
Get distinct column data from grouped data
Example: sel1 test sel1 test sel1 ok sel2 ok sel2 test expected result: sel1, [test,ok] sel2,[test,ok] How to achieve the above result using spark dataframe. please suggest me. -- Selvam Raman "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
Re: Get distinct column data from grouped data
my frined suggest this way val fil = sc.textFile("hdfs:///user/vijayc/data/test-spk.tx") val res =fil.map(l => l.split(",")).map(l =>( l(0),l(1))).groupByKey.map(rd =>(rd._1,rd._2.toList.distinct)) another useful function is *collect_set* in dataframe. Thanks, selvam R On Tue, Aug 9, 2016 at 4:19 PM, Selvam Raman wrote: > Example: > > sel1 test > sel1 test > sel1 ok > sel2 ok > sel2 test > > > expected result: > > sel1, [test,ok] > sel2,[test,ok] > > How to achieve the above result using spark dataframe. > > please suggest me. > -- > Selvam Raman > "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து" > -- Selvam Raman "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
Data frame Performance
Hi All, Please suggest me the best approach to achieve result. [ Please comment if the existing logic is fine or not] Input Record : ColA ColB ColC 1 2 56 1 2 46 1 3 45 1 5 34 1 5 90 2 1 89 2 5 45 Expected Result ResA ResB 12:2|3:3|5:5 2 1:1|5:5 I followd the below Spark steps (Spark version - 1.5.0) def valsplit(elem :scala.collection.mutable.WrappedArray[String]) : String = { elem.map(e => e+":"+e).mkString("|") } sqlContext.udf.register("valudf",valsplit(_:scala.collection.mutable.WrappedArray[String])) val x =sqlContext.sql("select site,valudf(collect_set(requests)) as test from sel group by site").first -- Selvam Raman "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
Re: Data frame Performance
Hi Mich, The input and output are just for example and it s not exact column name. Colc not needed. The code which I shared is working fine but need to confirm, was it right approach and effect performance. Thanks, Selvam R +91-97877-87724 On Aug 16, 2016 5:18 PM, "Mich Talebzadeh" wrote: > Hi Selvan, > > is table called sel,? > > And are these assumptions correct? > > site -> ColA > requests -> ColB > > I don't think you are using ColC here? > > HTH > > > > Dr Mich Talebzadeh > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > > http://talebzadehmich.wordpress.com > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > On 16 August 2016 at 12:06, Selvam Raman wrote: > >> Hi All, >> >> Please suggest me the best approach to achieve result. [ Please comment >> if the existing logic is fine or not] >> >> Input Record : >> >> ColA ColB ColC >> 1 2 56 >> 1 2 46 >> 1 3 45 >> 1 5 34 >> 1 5 90 >> 2 1 89 >> 2 5 45 >> >> Expected Result >> >> ResA ResB >> 12:2|3:3|5:5 >> 2 1:1|5:5 >> >> I followd the below Spark steps >> >> (Spark version - 1.5.0) >> >> def valsplit(elem :scala.collection.mutable.WrappedArray[String]) : >> String = >> { >> >> elem.map(e => e+":"+e).mkString("|") >> } >> >> sqlContext.udf.register("valudf",valsplit(_:scala.collection >> .mutable.WrappedArray[String])) >> >> >> val x =sqlContext.sql("select site,valudf(collect_set(requests)) as test >> from sel group by site").first >> >> >> >> -- >> Selvam Raman >> "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து" >> > >
Extract year from string format of date
Spark Version : 1.5.0 Record: 01-Jan-16 Expected Result: 2016 I used the below code which is shared in user group. from_unixtime(unix_timestamp($"Creation Date","dd-MMM-yy"),"")) is this right approach or do we have any other approach. NOTE: i tried *year() *function but it gives only null values for the string same for *to_date()* function. -- Selvam Raman "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
Re: Windows operation orderBy desc
Hi all, i am using window function to find out the latest record using row_number; the hive table is partitioned. when i run the function it runs for 545. what is the reason for 545 task. Thanks, selvam R On Mon, Aug 1, 2016 at 8:09 PM, Mich Talebzadeh wrote: > You need to get the position right > > > val wSpec = Window.partitionBy("col1").orderBy(desc("col2")) > > HTH > > Dr Mich Talebzadeh > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > > http://talebzadehmich.wordpress.com > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > On 1 August 2016 at 14:56, Ashok Kumar > wrote: > >> Hi, >> >> in the following Window spec I want orderBy ("") to be displayed >> in descending order please >> >> val W = Window.partitionBy("col1").orderBy("col2") >> >> If I Do >> >> val W = Window.partitionBy("col1").orderBy("col2".desc) >> >> It throws error >> >> console>:26: error: value desc is not a member of String >> >> How can I achieve that? >> >> Thanking you >> > > -- Selvam Raman "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
Insert non-null values from dataframe
Hi , Dataframe: colA colB colC colD colE 1 2 3 4 5 1 2 3 null null 1 null null null 5 null null 3 4 5 I want to insert dataframe to nosql database, where null occupies values(Cassandra). so i have to insert the column which has non-null values in the row. Expected: Record 1: (1,2,3,4,5) Record 2:(1,2,3) Record 3:(1,5) Record 4:(3,4,5) -- Selvam Raman "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
Re: Insert non-null values from dataframe
Thanks for the update. we are using 2.0 version. so planning to write own custom logic to remove the null values. Thanks, selvam R On Fri, Aug 26, 2016 at 9:08 PM, Russell Spitzer wrote: > Cassandra does not differentiate between null and empty, so when reading > from C* all empty values are reported as null. To avoid inserting nulls > (avoiding tombstones) see > > https://github.com/datastax/spark-cassandra-connector/ > blob/master/doc/5_saving.md#globally-treating-all-nulls-as-unset > > This will not prevent those columns from being read as null though, it > will only skip writing tombstones. > > On Thu, Aug 25, 2016, 1:23 PM Selvam Raman wrote: > >> Hi , >> >> Dataframe: >> colA colB colC colD colE >> 1 2 3 4 5 >> 1 2 3 null null >> 1 null null null 5 >> null null 3 4 5 >> >> I want to insert dataframe to nosql database, where null occupies >> values(Cassandra). so i have to insert the column which has non-null values >> in the row. >> >> Expected: >> >> Record 1: (1,2,3,4,5) >> Record 2:(1,2,3) >> Record 3:(1,5) >> Record 4:(3,4,5) >> >> -- >> Selvam Raman >> "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து" >> > -- Selvam Raman "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
Need a help in row repetation
I have my dataset as dataframe. Using spark 1.5.0 version cola,colb,colc,cold,cole,colf,colg,colh,coli -> columns in row In the above column date fileds column are (colc,colf,colh,coli). scenario:((colc -2016,colf -2016,colh -2016,coli -2016) if all the year are same, no need of any logic. just remains same record. scenario:((colc -2016,colf -2017,colh -2016,coli -2018) -> unque values are 2016,2017,2018 if all the year(in date fields) are different then we need repeat the record as distinct years(ie. the above column has three year so we need to repeat the same row twice) please give me any suggestion in terms of dataframe. -- Selvam Raman "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
spark cassandra issue
Please help me to solve the issue. spark-shell --packages com.datastax.spark:spark-cassandra-connector_2.10:1.3.0 --conf spark.cassandra.connection.host=** val df = sqlContext.read. | format("org.apache.spark.sql.cassandra"). | options(Map( "table" -> "", "keyspace" -> "***")). | load() java.util.NoSuchElementException: key not found: c_table at scala.collection.MapLike$class.default(MapLike.scala:228) at org.apache.spark.sql.execution.datasources.CaseInsensitiveMap.default(ddl.scala:151) at scala.collection.MapLike$class.apply(MapLike.scala:141) at org.apache.spark.sql.execution.datasources.CaseInsensitiveMap.apply(ddl.scala:151) at org.apache.spark.sql.cassandra.DefaultSource$.TableRefAndOptions(DefaultSource.scala:120) at org.apache.spark.sql.cassandra.DefaultSource.createRelation(DefaultSource.scala:56) at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:125) a -- Selvam Raman "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
Re: spark cassandra issue
its very urgent. please help me guys. On Sun, Sep 4, 2016 at 8:05 PM, Selvam Raman wrote: > Please help me to solve the issue. > > spark-shell --packages com.datastax.spark:spark-cassandra-connector_2.10:1.3.0 > --conf spark.cassandra.connection.host=** > > val df = sqlContext.read. > | format("org.apache.spark.sql.cassandra"). > | options(Map( "table" -> "", "keyspace" -> "***")). > | load() > java.util.NoSuchElementException: key not found: c_table > at scala.collection.MapLike$class.default(MapLike.scala:228) > at org.apache.spark.sql.execution.datasources. > CaseInsensitiveMap.default(ddl.scala:151) > at scala.collection.MapLike$class.apply(MapLike.scala:141) > at org.apache.spark.sql.execution.datasources. > CaseInsensitiveMap.apply(ddl.scala:151) > at org.apache.spark.sql.cassandra.DefaultSource$. > TableRefAndOptions(DefaultSource.scala:120) > at org.apache.spark.sql.cassandra.DefaultSource. > createRelation(DefaultSource.scala:56) > at org.apache.spark.sql.execution.datasources. > ResolvedDataSource$.apply(ResolvedDataSource.scala:125) > a > > -- > Selvam Raman > "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து" > -- Selvam Raman "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
Re: spark cassandra issue
Hey Mich, I am using the same one right now. Thanks for the reply. import org.apache.spark.sql.cassandra._ import com.datastax.spark.connector._ //Loads implicit functions sc.cassandraTable("keyspace name", "table name") On Sun, Sep 4, 2016 at 8:48 PM, Mich Talebzadeh wrote: > Hi Selvan. > > I don't deal with Cassandra but have you tried other options as described > here > > https://github.com/datastax/spark-cassandra-connector/ > blob/master/doc/2_loading.md > > To get a Spark RDD that represents a Cassandra table, call the > cassandraTable method on the SparkContext object. > > import com.datastax.spark.connector._ //Loads implicit functions > sc.cassandraTable("keyspace name", "table name") > > > > HTH > > > Dr Mich Talebzadeh > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > > http://talebzadehmich.wordpress.com > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > On 4 September 2016 at 15:52, Selvam Raman wrote: > >> its very urgent. please help me guys. >> >> On Sun, Sep 4, 2016 at 8:05 PM, Selvam Raman wrote: >> >>> Please help me to solve the issue. >>> >>> spark-shell --packages >>> com.datastax.spark:spark-cassandra-connector_2.10:1.3.0 >>> --conf spark.cassandra.connection.host=** >>> >>> val df = sqlContext.read. >>> | format("org.apache.spark.sql.cassandra"). >>> | options(Map( "table" -> "", "keyspace" -> "***")). >>> | load() >>> java.util.NoSuchElementException: key not found: c_table >>> at scala.collection.MapLike$class.default(MapLike.scala:228) >>> at org.apache.spark.sql.execution.datasources.CaseInsensitiveMa >>> p.default(ddl.scala:151) >>> at scala.collection.MapLike$class.apply(MapLike.scala:141) >>> at org.apache.spark.sql.execution.datasources.CaseInsensitiveMa >>> p.apply(ddl.scala:151) >>> at org.apache.spark.sql.cassandra.DefaultSource$.TableRefAndOpt >>> ions(DefaultSource.scala:120) >>> at org.apache.spark.sql.cassandra.DefaultSource.createRelation( >>> DefaultSource.scala:56) >>> at org.apache.spark.sql.execution.datasources.ResolvedDataSourc >>> e$.apply(ResolvedDataSource.scala:125) >>> a >>> >>> -- >>> Selvam Raman >>> "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து" >>> >> >> >> >> -- >> Selvam Raman >> "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து" >> > > -- Selvam Raman "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
Re: spark cassandra issue
at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at org.apache.spark.sql.cassandra.DataTypeConverter$.< init>(DataTypeConverter.scala:32) at org.apache.spark.sql.cassandra.DataTypeConverter$.< clinit>(DataTypeConverter.scala) at org.apache.spark.sql.cassandra.CassandraSourceRelation$$ anonfun$schema$1$$anonfun$apply$1.apply(CassandraSourceRelation.scala:58) at org.apache.spark.sql.cassandra.CassandraSourceRelation$$ anonfun$schema$1$$anonfun$apply$1.apply(CassandraSourceRelation.scala:58) at scala.collection.TraversableLike$$anonfun$map$ 1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$ 1.apply(TraversableLike.scala:244) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike. scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map( TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.sql.cassandra.CassandraSourceRelation$$ anonfun$schema$1.apply(CassandraSourceRelation.scala:58) at org.apache.spark.sql.cassandra.CassandraSourceRelation$$ anonfun$schema$1.apply(CassandraSourceRelation.scala:58) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.sql.cassandra.CassandraSourceRelation.schema( CassandraSourceRelation.scala:58) at org.apache.spark.sql.execution.datasources. LogicalRelation.(LogicalRelation.scala:37) at org.apache.spark.sql.DataFrameReader.load( DataFrameReader.scala:120) at com.zebra.avp.oracle11i.OracleRepairData$.main( OracleRepairData.scala:298) at com.zebra.avp.oracle11i.OracleRepairData.main( OracleRepairData.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke( NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke( DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$ deploy$SparkSubmit$$runMain(SparkSubmit.scala:672) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1( SparkSubmit.scala:180) at org.apache.spark.deploy.SparkSubmit$.submit( SparkSubmit.scala:205) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.types. PrimitiveType at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) On Sun, Sep 4, 2016 at 10:04 PM, Russell Spitzer wrote: > This would also be a better question for the SCC user list :) > https://groups.google.com/a/lists.datastax.com/forum/#! > forum/spark-connector-user > > On Sun, Sep 4, 2016 at 9:31 AM Russell Spitzer > wrote: > >> https://github.com/datastax/spark-cassandra-connector/ >> blob/v1.3.1/doc/14_data_frames.md >> In Spark 1.3 it was illegal to use "table" as a key in Spark SQL so in >> that version of Spark the connector needed to use the option "c_table" >> >> >> val df = sqlContext.read. >> | format("org.apache.spark.sql.cassandra"). >> | options(Map( "c_table" -> "", "keyspace" -> "***")). >> | load() >> >> >> On Sun, Sep 4, 2016 at 8:32 AM Mich Talebzadeh >> wrote: >> >>> and your Cassandra table is there etc? >>> >>> >>> >>> Dr Mich Talebzadeh >>> >>> >>> >>> LinkedIn * >>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>> >>> >>> >>> http://talebzadehmich.wordpress.com >>> >>> >>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>> any loss, damage or destruction of data or any other property which may >&g
Cassandra timestamp to spark Date field
Hi All, As per datastax report Cassandra to spark type timestamp Long, java.util.Date, java.sql.Date, org.joda.time.DateTime Please help me with your input. I have a Cassandra table with 30 fields. Out of it 3 are timestamp. I read cassandratable using sc.cassandraTable [com.datastax.spark.connector.rdd.CassandraTableScanRDD[com.datastax.spark.connector.CassandraRow] = CassandraTableScanRDD[9] at RDD at CassandraRDD.scala:15] then I have converted to row of rdd *val* exis_repair_fact = sqlContext.createDataFrame(rddrepfact.map(r => org.apache.spark.sql.Row.fromSeq(r.columnValues)),schema) in schema fields I have mentioned timestamp as *StructField*("shipped_datetime", *DateType*), when I try to show the result, it throws java.util.Date can not convert to java.sql.Date. so how can I solve the issue. First I have converted cassandrascanrdd to -- Selvam Raman "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
Spark Checkpoint for JDBC/ODBC
Hi, Need your input to take decision. We have an n-number of databases(ie oracle, MySQL,etc). I want to read a data from the sources but how it is maintaining fault tolerance in source side. if source side system went down. how the spark system reads the data. -- Selvam Raman "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
Spark CSV skip lines
Hi, I am using spark csv to read csv file. The issue is my files first n lines contains some report and followed by actual data (header and rest of the data). So how can i skip first n lines in spark csv. I dont have any specific comment character in the first byte. Please give me some idea. -- Selvam Raman "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
Re: Spark CSV skip lines
Hi, I saw this two option already anyway thanks for the idea. i am using wholetext file to read my data(cause there are \n middle of it) and using opencsv to parse the data. In my data first two lines are just some report. how can i eliminate. *How to eliminate first two lines after reading from wholetextfiles.* val test = wholeTextFiles.flatMap{ case (_, txt) => | val reader = new CSVReader(new StringReader(txt)); | reader.readAll().map(data => Row(data(3),data(4),data(7),data(9),data(14)))} The above code throws arrayoutofbounce exception for empty line and report line. On Sat, Sep 10, 2016 at 3:02 PM, Hyukjin Kwon wrote: > Hi Selvam, > > If your report is commented with any character (e.g. #), you can skip > these lines via comment option [1]. > > If you are using Spark 1.x, then you might be able to do this by manually > skipping from the RDD and then making this to DataFrame as below: > > I haven’t tested this but I think this should work. > > val rdd = sparkContext.textFile("...") > val filteredRdd = rdd.mapPartitionsWithIndex { (idx, iter) => > if (idx == 0) { > iter.drop(10) > } else { > iter > } > } > val df = new CsvParser().csvRdd(sqlContext, filteredRdd) > > If you are using Spark 2.0, then it seems there is no way to manually > modifying the source data because loading existing RDD or DataSet[String] > to DataFrame is not yet supported. > > There is an issue open[2]. I hope this is helpful. > > Thanks. > > [1] https://github.com/apache/spark/blob/27209252f09ff73c58e60c6df8aaba > 73b308088c/sql/core/src/main/scala/org/apache/spark/sql/ > DataFrameReader.scala#L369 > [2] https://issues.apache.org/jira/browse/SPARK-15463 > > > > > > On 10 Sep 2016 6:14 p.m., "Selvam Raman" wrote: > >> Hi, >> >> I am using spark csv to read csv file. The issue is my files first n >> lines contains some report and followed by actual data (header and rest of >> the data). >> >> So how can i skip first n lines in spark csv. I dont have any specific >> comment character in the first byte. >> >> Please give me some idea. >> >> -- >> Selvam Raman >> "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து" >> > -- Selvam Raman "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
Spark S3
Hi, How spark reads data from s3 and runs parallel task. Assume I have a s3 bucket size of 35 GB( parquet file). How the sparksession will read the data and process the data parallel. How it splits the s3 data and assign to each executor task. Please share me your points. Note: if we have RDD , then we can look at the partitions.size or length to check how many partition for a file. But how this will be accomplished in terms of S3 bucket. -- Selvam Raman "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
Re: Spark S3
I mentioned parquet as input format. On Oct 10, 2016 11:06 PM, "ayan guha" wrote: > It really depends on the input format used. > On 11 Oct 2016 08:46, "Selvam Raman" wrote: > >> Hi, >> >> How spark reads data from s3 and runs parallel task. >> >> Assume I have a s3 bucket size of 35 GB( parquet file). >> >> How the sparksession will read the data and process the data parallel. >> How it splits the s3 data and assign to each executor task. >> >> Please share me your points. >> >> Note: >> if we have RDD , then we can look at the partitions.size or length to >> check how many partition for a file. But how this will be accomplished in >> terms of S3 bucket. >> >> -- >> Selvam Raman >> "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து" >> >
Spark-Sql 2.0 nullpointerException
ssPetDB.main(ProcessPetDB.java:46) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:736) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.NullPointerException at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:112) at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:110) at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:582) at com.elsevier.datasearch.ExecuteSQL.executeQuery(ExecuteSQL.java:11) at com.elsevier.datasearch.ProcessPetDB$1.call(ProcessPetDB.java:53) at com.elsevier.datasearch.ProcessPetDB$1.call(ProcessPetDB.java:1) at org.apache.spark.sql.Dataset$$anonfun$foreach$2.apply(Dataset.scala:2118) at org.apache.spark.sql.Dataset$$anonfun$foreach$2.apply(Dataset.scala:2118) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$27.apply(RDD.scala:894) at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$27.apply(RDD.scala:894) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1916) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1916) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) at org.apache.spark.scheduler.Task.run(Task.scala:86) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 16/10/12 15:59:53 INFO SparkContext: Invoking stop() from shutdown hook Please let me know if i am missing anything. Thank you for the help. -- Selvam Raman "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
Re: Spark-Sql 2.0 nullpointerException
What i am trying to achieve is Trigger query to get number(i.e.,1,2,3,...n) for every number i have to trigger another 3 queries. Thanks, selvam R On Wed, Oct 12, 2016 at 4:10 PM, Selvam Raman wrote: > Hi , > > I am reading parquet file and creating temp table. when i am trying to > execute query outside of foreach function it is working fine. > throws nullpointerexception within data frame.foreach function. > > code snippet: > > String CITATION_QUERY = "select c.citation_num, c.title, c.publisher from > test c"; > > Dataset citation_query = spark.sql(CITATION_QUERY); > > System.out.println("mistery:"+citation_query.count()); > > > // Iterator iterofresulDF = resultDF.toLocalIterator(); > > > resultDF.foreach(new ForeachFunction() > > { > > private static final long serialVersionUID = 1L; > > public void call(Row line) > > { > > Dataset row = spark.sql(CITATION_QUERY); > > System.out.println("mistery row count:"+row.count()); > > } > > }); > > > Error trace: > > 16/10/12 15:59:53 INFO CodecPool: Got brand-new decompressor [.snappy] > > 16/10/12 15:59:53 ERROR Executor: Exception in task 0.0 in stage 5.0 (TID > 5) > > java.lang.NullPointerException > > at org.apache.spark.sql.SparkSession.sessionState$ > lzycompute(SparkSession.scala:112) > > at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:110) > > at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:582) > > at com.elsevier.datasearch.ExecuteSQL.executeQuery(ExecuteSQL.java:11) > > at com.elsevier.datasearch.ProcessPetDB$1.call(ProcessPetDB.java:53) > > at com.elsevier.datasearch.ProcessPetDB$1.call(ProcessPetDB.java:1) > > at org.apache.spark.sql.Dataset$$anonfun$foreach$2.apply( > Dataset.scala:2118) > > at org.apache.spark.sql.Dataset$$anonfun$foreach$2.apply( > Dataset.scala:2118) > > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > > at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) > > at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$ > apply$27.apply(RDD.scala:894) > > at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$ > apply$27.apply(RDD.scala:894) > > at org.apache.spark.SparkContext$$anonfun$runJob$5.apply( > SparkContext.scala:1916) > > at org.apache.spark.SparkContext$$anonfun$runJob$5.apply( > SparkContext.scala:1916) > > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) > > at org.apache.spark.scheduler.Task.run(Task.scala:86) > > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > > at java.util.concurrent.ThreadPoolExecutor.runWorker( > ThreadPoolExecutor.java:1142) > > at java.util.concurrent.ThreadPoolExecutor$Worker.run( > ThreadPoolExecutor.java:617) > > at java.lang.Thread.run(Thread.java:745) > > > > > Driver stacktrace: > > at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$ > scheduler$DAGScheduler$$failJobAndIndependentStages( > DAGScheduler.scala:1454) > > at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply( > DAGScheduler.scala:1442) > > at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply( > DAGScheduler.scala:1441) > > at scala.collection.mutable.ResizableArray$class.foreach( > ResizableArray.scala:59) > > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > > at org.apache.spark.scheduler.DAGScheduler.abortStage( > DAGScheduler.scala:1441) > > at org.apache.spark.scheduler.DAGScheduler$$anonfun$ > handleTaskSetFailed$1.apply(DAGScheduler.scala:811) > > at org.apache.spark.scheduler.DAGScheduler$$anonfun$ > handleTaskSetFailed$1.apply(DAGScheduler.scala:811) > > at scala.Option.foreach(Option.scala:257) > > at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed( > DAGScheduler.scala:811) > > at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop. > doOnReceive(DAGScheduler.scala:1667) > > at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop. > onReceive(DAGScheduler.scala:1622) > > at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop. > onReceive(DAGScheduler.scala:1611) > > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > > at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632) > > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1890) > > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1903) > > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1916) > > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1930) > > at org.apache.spark.rdd.RDD$$anonfun$fore
PostgresSql queries vs spark sql
Hi, Please share me some idea if you work on this earlier. How can i develop postgres CROSSTAB function in spark. Postgres Example Example 1: SELECT mthreport.* FROM *crosstab*('SELECT i.item_name::text As row_name, to_char(if.action_date, ''mon'')::text As bucket, SUM(if.num_used)::integer As bucketvalue FROM inventory As i INNER JOIN inventory_flow As if ON i.item_id = if.item_id AND action_date BETWEEN date ''2007-01-01'' and date ''2007-12-31 23:59'' GROUP BY i.item_name, to_char(if.action_date, ''mon''), date_part(''month'', if.action_date) ORDER BY i.item_name', 'SELECT to_char(date ''2007-01-01'' + (n || '' month'')::interval, ''mon'') As short_mname FROM generate_series(0,11) n') As mthreport(item_name text, jan integer, feb integer, mar integer, apr integer, may integer, jun integer, jul integer, aug integer, sep integer, oct integer, nov integer, dec integer) The output of the above crosstab looks as follows: [image: crosstab source_sql cat_sql example] Example 2: CREATE TABLE ct(id SERIAL, rowid TEXT, attribute TEXT, value TEXT); INSERT INTO ct(rowid, attribute, value) VALUES('test1','att1','val1'); INSERT INTO ct(rowid, attribute, value) VALUES('test1','att2','val2'); INSERT INTO ct(rowid, attribute, value) VALUES('test1','att3','val3'); INSERT INTO ct(rowid, attribute, value) VALUES('test1','att4','val4'); INSERT INTO ct(rowid, attribute, value) VALUES('test2','att1','val5'); INSERT INTO ct(rowid, attribute, value) VALUES('test2','att2','val6'); INSERT INTO ct(rowid, attribute, value) VALUES('test2','att3','val7'); INSERT INTO ct(rowid, attribute, value) VALUES('test2','att4','val8'); SELECT * FROM crosstab( 'select rowid, attribute, value from ct where attribute = ''att2'' or attribute = ''att3'' order by 1,2') AS ct(row_name text, category_1 text, category_2 text, category_3 text); row_name | category_1 | category_2 | category_3 --+++ test1| val2 | val3 | test2| val6 | val7 | -- Selvam Raman "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
Spark SQL parallelize
Hi, I am having 40+ structured data stored in s3 bucket as parquet file . I am going to use 20 table in the use case. There s a Main table which drive the whole flow. Main table contains 1k record. My use case is for every record in the main table process the rest of table( join group by depends on main table field). How can I parallel the process. What I done was read the main table and create tocaliterator for df then do the rest of the processing. This one run one by one record. Please share me your ideas. Thank you.
Re: PostgresSql queries vs spark sql
I found it. We can use pivot which is similar to cross tab In postgres. Thank you. On Oct 17, 2016 10:00 PM, "Selvam Raman" wrote: > Hi, > > Please share me some idea if you work on this earlier. > How can i develop postgres CROSSTAB function in spark. > > Postgres Example > > Example 1: > > SELECT mthreport.* > FROM > *crosstab*('SELECT i.item_name::text As row_name, > to_char(if.action_date, ''mon'')::text As bucket, > SUM(if.num_used)::integer As bucketvalue > FROM inventory As i INNER JOIN inventory_flow As if > ON i.item_id = if.item_id > AND action_date BETWEEN date ''2007-01-01'' and date ''2007-12-31 > 23:59'' > GROUP BY i.item_name, to_char(if.action_date, ''mon''), > date_part(''month'', if.action_date) > ORDER BY i.item_name', > 'SELECT to_char(date ''2007-01-01'' + (n || '' month'')::interval, > ''mon'') As short_mname > FROM generate_series(0,11) n') > As mthreport(item_name text, jan integer, feb integer, mar > integer, > apr integer, may integer, jun integer, jul integer, > aug integer, sep integer, oct integer, nov integer, > dec integer) > > The output of the above crosstab looks as follows: > [image: crosstab source_sql cat_sql example] > > Example 2: > > CREATE TABLE ct(id SERIAL, rowid TEXT, attribute TEXT, value TEXT); > INSERT INTO ct(rowid, attribute, value) VALUES('test1','att1','val1'); > INSERT INTO ct(rowid, attribute, value) VALUES('test1','att2','val2'); > INSERT INTO ct(rowid, attribute, value) VALUES('test1','att3','val3'); > INSERT INTO ct(rowid, attribute, value) VALUES('test1','att4','val4'); > INSERT INTO ct(rowid, attribute, value) VALUES('test2','att1','val5'); > INSERT INTO ct(rowid, attribute, value) VALUES('test2','att2','val6'); > INSERT INTO ct(rowid, attribute, value) VALUES('test2','att3','val7'); > INSERT INTO ct(rowid, attribute, value) VALUES('test2','att4','val8'); > > SELECT * > FROM crosstab( > 'select rowid, attribute, value >from ct >where attribute = ''att2'' or attribute = ''att3'' >order by 1,2') > AS ct(row_name text, category_1 text, category_2 text, category_3 text); > > row_name | category_1 | category_2 | category_3 > --+++ > test1| val2 | val3 | > test2| val6 | val7 | > > > -- > Selvam Raman > "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து" >
Spark Sql 2.0 throws null pointer exception
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1437) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1659) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1871) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1884) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1897) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1911) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:883) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:881) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:358) at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:881) at org.apache.spark.api.java.JavaRDDLike$class.foreachPartition(JavaRDDLike.scala:218) at org.apache.spark.api.java.AbstractJavaRDDLike.foreachPartition(JavaRDDLike.scala:45) at com.elsevier.datasearch.CitationTest.main(CitationTest.java:108) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:729) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.NullPointerException at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:112) at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:110) at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:582) at com.elsevier.datasearch.CitationTest$1.call(CitationTest.java:124) at com.elsevier.datasearch.CitationTest$1.call(CitationTest.java:1) at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:218) at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:218) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:883) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:883) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1897) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1897) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) at org.apache.spark.scheduler.Task.run(Task.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) -- Selvam Raman "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
Re: Spark Sql 2.0 throws null pointer exception
Why i could not able to access sparksession instance within foreachpartition(i have created sparksession instance within main fucntion). spark.sql("select 1").count or any sql queries which return within foreachpartition throws nullpointer exception. please give me some idea if you have faced the problem earlier. Thanks, Selvam R On Mon, Oct 24, 2016 at 10:23 AM, Selvam Raman wrote: > Hi All, > > Please help me. > > I have 10 (tables data) parquet file in s3. > > I am reading and storing as Dataset then registered as temp table. > > One table driving whole flow so i am doing below.(When i am triggering > query from > > > Code Base: > > SparkSession spark = SparkSession.builder().appName("Test").getOrCreate(); > > Dataset citationDF = spark.read().parquet("s3://...") > > ... > > ... > > citationDF.createOrReplaceTempView("citation"); > > ... > > > > cit_num.javaRDD().foreachPartition(new VoidFunction>() > > { > > /** > > * > > */ > > private static final long serialVersionUID = 1L; > > > @Override > > public void call(Iterator iter) > > { > > while (iter.hasNext()) > > { > > Row record=iter.next(); > > int citation_num=record.getInt(0); > > String ci_query="select queries ";//(i can execute this > query outside of foreach) > > System.out.println("citation num:"+citation_num+" count:"+spark > .sql(ci_query).count()); > > accum.add(1); > > System.out.println("accumulator count:"+accum); > > } > > } > > }); > Error: > > 16/10/24 09:08:12 WARN TaskSetManager: Lost task 1.0 in stage 30.0 (TID > 83, ip-10-95-36-172.dev): java.lang.NullPointerException > > at org.apache.spark.sql.SparkSession.sessionState$ > lzycompute(SparkSession.scala:112) > > at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:110) > > at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:582) > > at com.elsevier.datasearch.CitationTest$1.call(CitationTest.java:124) > > at com.elsevier.datasearch.CitationTest$1.call(CitationTest.java:1) > > at org.apache.spark.api.java.JavaRDDLike$$anonfun$ > foreachPartition$1.apply(JavaRDDLike.scala:218) > > at org.apache.spark.api.java.JavaRDDLike$$anonfun$ > foreachPartition$1.apply(JavaRDDLike.scala:218) > > at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$ > anonfun$apply$28.apply(RDD.scala:883) > > at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$ > anonfun$apply$28.apply(RDD.scala:883) > > at org.apache.spark.SparkContext$$anonfun$runJob$5.apply( > SparkContext.scala:1897) > > at org.apache.spark.SparkContext$$anonfun$runJob$5.apply( > SparkContext.scala:1897) > > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) > > at org.apache.spark.scheduler.Task.run(Task.scala:85) > > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > > at java.util.concurrent.ThreadPoolExecutor.runWorker( > ThreadPoolExecutor.java:1142) > > at java.util.concurrent.ThreadPoolExecutor$Worker.run( > ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > > > > > 16/10/24 09:08:12 INFO YarnScheduler: Stage 30 was cancelled > > 16/10/24 09:08:12 INFO DAGScheduler: ResultStage 30 (foreachPartition at > CitationTest.java:108) failed in 0.421 s > > 16/10/24 09:08:12 INFO DAGScheduler: Job 23 failed: foreachPartition at > CitationTest.java:108, took 0.578050 s > > Exception in thread "main" org.apache.spark.SparkException: Job aborted > due to stage failure: Task 6 in stage 30.0 failed 4 times, most recent > failure: Lost task 6.3 in stage 30.0 (TID 99, ip-dev): > java.lang.NullPointerException > > at org.apache.spark.sql.SparkSession.sessionState$ > lzycompute(SparkSession.scala:112) > > at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:110) > > at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:582) > > at com.elsevier.datasearch.CitationTest$1.call(CitationTest.java:124) > > at com.elsevier.datasearch.CitationTest$1.call(CitationTest.java:1) > > at org.apache.spark.api.java.JavaRDDLike$$anonfun$ > foreachPartition$1.apply(JavaRDDLike.scala:218) > > at org.apache.spark.api.java.JavaRDDLike$$anonfun$ > foreachPartition$1.apply(JavaRDDLike.scala:218) > > at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$ > anonfun$apply$28.apply(RDD.scala:883) > > at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$ > anonfun$apply$
Spark Sql - "broadcast-exchange-1" java.lang.OutOfMemoryError: Java heap space
Hi, Need a help to figure out and solve heap space problem. I have query which contains 15+ table and when i trying to print out the result(Just 23 rows) it throws heap space error. Following command i tried in standalone mode: (My mac machine having 8 core and 15GB ram) spark.conf().set("spark.sql.shuffle.partitions", 20); ./spark-submit --master spark://selva:7077 --executor-memory 2g --total-executor-cores 4 --class MemIssue --conf 'spark.executor.extraJavaOptions=-XX:+UseG1GC -XX:+UseG1GC -XX:+PrintFlagsFinal -XX:+PrintReferenceGC -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintAdaptiveSizePolicy -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark' /Users/rs/Desktop/test.jar This is my below query: select concat(sf1.scode, ''-'', m.mcode, ''-'', rf.rnum) , sf1.scode , concat(p.lname,'', '',ci.pyear), at.atext Alias, m.mcode Method, mt.mcode, v.vname, nd.vmeas " + " from result r " + " join var v on v.vnum = r.vnum " + " join numa nd on nd.rnum = r.num " + " join feat fa on fa.fnum = r.fnum " + " join samp sf1 on sf1.snum = fa.snum " + " join spe sp on sf1.snum = sp.snum and sp.mnum not in (1,2)" + " join act a on a.anum = fa.anum " + " join met m on m.mnum = a.mnum " + " join sampl sfa on sfa.snum = sf1.snum " + " join ann at on at.anum = sfa.anum AND at.atypenum = 11 " + " join data dr on r.rnum = dr.rnum " + " join cit cd on dr.dnum = cd.dnum " + " join cit on cd.cnum = ci.cnum " + " join aut al on ci.cnum = al.cnum and al.aorder = 1 " + " join per p on al.pnum = p.pnum " + " left join rel rf on sf1.snum = rf.snum " + " left join samp sf2 on rf.rnum = sf2.snum " + " left join spe s on s.snum = sf1.snum " + " left join mat mt on mt.mnum = s.mnum " + " where sf1.sampling_feature_code = '1234test''" + " order by 1,2 spark.sql(query).show When i checked wholstagecode, first it reads all data from the table. Why it is reading all the data from table and doing sort merge join for 3 or 4 tables. Why it is not applying any filtering value. Though i have given large memory for executor it is still throws the same error. when spark sql do the joining how it is utilizing memory and cores. Any guidelines would be greatly welcome. -- Selvam Raman "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
How Spark determines Parquet partition size
Hi, Can you please tell me how parquet partitions the data while saving the dataframe. I have a dataframe which contains 10 values like below ++ |field_num| ++ | 139| | 140| | 40| | 41| | 148| | 149| | 151| | 152| | 153| | 154| ++ df.write.partitionBy("field_num").parquet("/Users/rs/parti/") it saves the file like (field_num=140,.filed_num=154).. when i try the below command it gives 5. scala> spark.read.parquet("file:///Users/rs/parti").rdd.partitions.length res4: Int = 5 so how does parquet partitioning the data in spark? -- Selvam Raman "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
Re: Dataframe broadcast join hint not working
Hi, Which version of spark you are using. Less than 10Mb automatically converted as broadcast join in spark. \Thanks, selvam R On Sat, Nov 26, 2016 at 6:51 PM, Swapnil Shinde wrote: > Hello > I am trying a broadcast join on dataframes but it is still doing > SortMergeJoin. I even try setting spark.sql.autoBroadcastJoinThreshold > higher but still no luck. > > Related piece of code- > val c = a.join(braodcast(b), "id") > > On a side note, if I do SizeEstimator.estimate(b) and it is really > high(460956584 bytes) compared to data it contains. b has just 85 rows and > around 4964 bytes. > Help is very much appreciated!! > > Thanks > Swapnil > > > -- Selvam Raman "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
Java Collections.emptyList inserted as null object in cassandra
Filed Type in cassandra : List I am trying to insert Collections.emptyList() from spark to cassandra list field. In cassandra it stores as null object. How can i avoid null values here. -- Selvam Raman "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
Spark Job not exited and shows running
Hi, I have submitted spark job in yarn client mode. The executor and cores were dynamically allocated. In the job i have 20 partitions, so 5 container each with 4 core has been submitted. It almost processed all the records but it never exit the job and in the application master container i am seeing the below error message. INFO yarn.YarnAllocator: Canceling requests for 0 executor containers WARN yarn.YarnAllocator: Expected to find pending requests, but found none. The same job i ran it for only 1000 records which successfully finished. Can anyone help me to sort out this issue. Spark version:2.0( AWS EMR). -- Selvam Raman "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
Re: Spark Job not exited and shows running
Hi, I have run the job in cluster mode as well. The job is not ending. After sometime the container just do nothing but it shows running. In my code, every record has been inserted into solr and cassandra as well. When i ran it only for solr the job completed successfully. Still i did not test cassandra part. Will check and update. does anyone have faced this issue earlier. I added sparsession.stop after foreachpartition ends. My code overview: SparkSession read parquet file(20 partition- roughly 90k records) foreachpartition every record do some compution insert into cassandra( i am using insert command ) index into solr stop the sparksession exit the code. Thanks, selvam R On Thu, Dec 1, 2016 at 7:03 AM, Daniel van der Ende < daniel.vandere...@gmail.com> wrote: > Hi, > > I've seen this a few times too. Usually it indicates that your driver > doesn't have enough resources to process the result. Sometimes increasing > driver memory is enough (yarn memory overhead can also help). Is there any > specific reason for you to run in client mode and not in cluster mode? > Having run into this a number of times (and wanting to spare the resources > of our submitting machines) we have now switched to use yarn cluster mode > by default. This seems to resolve the problem. > > Hope this helps, > > Daniel > > On 29 Nov 2016 11:20 p.m., "Selvam Raman" wrote: > >> Hi, >> >> I have submitted spark job in yarn client mode. The executor and cores >> were dynamically allocated. In the job i have 20 partitions, so 5 container >> each with 4 core has been submitted. It almost processed all the records >> but it never exit the job and in the application master container i am >> seeing the below error message. >> >> INFO yarn.YarnAllocator: Canceling requests for 0 executor containers >> WARN yarn.YarnAllocator: Expected to find pending requests, but found none. >> >> >> >> The same job i ran it for only 1000 records which successfully finished. >> >> >> Can anyone help me to sort out this issue. >> >> Spark version:2.0( AWS EMR). >> >> -- >> Selvam Raman >> "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து" >> > -- Selvam Raman "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
Spark Batch checkpoint
Hi, is there any provision in spark batch for checkpoint. I am having huge data, it takes more than 3 hours to process all data. I am currently having 100 partitions. if the job fails after two hours, lets say it has processed 70 partition. should i start spark job from the beginning or is there way for checkpoint provision. Checkpoint,what i am expecting is start from 71 partition to till end. Please give me your suggestions. -- Selvam Raman "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
Re: Spark Batch checkpoint
I am using java. I will try and let u know. On Dec 15, 2016 8:45 PM, "Irving Duran" wrote: > Not sure what programming language you are using, but in python you can do > "sc.setCheckpointDir('~/apps/spark-2.0.1-bin-hadoop2.7/checkpoint/')". > This will store checkpoints on that directory that I called checkpoint. > > > Thank You, > > Irving Duran > > On Thu, Dec 15, 2016 at 10:33 AM, Selvam Raman wrote: > >> Hi, >> >> is there any provision in spark batch for checkpoint. >> >> I am having huge data, it takes more than 3 hours to process all data. I >> am currently having 100 partitions. >> >> if the job fails after two hours, lets say it has processed 70 partition. >> should i start spark job from the beginning or is there way for checkpoint >> provision. >> >> Checkpoint,what i am expecting is start from 71 partition to till end. >> >> Please give me your suggestions. >> >> -- >> Selvam Raman >> "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து" >> > >
Re: Spark Batch checkpoint
Hi, Acutally my requiremnt is read the parquet file which is 100 partition. Then i use foreachpartition to read the data and process it. My sample code public static void main(String[] args) { SparkSession sparkSession = SparkSession.builder().appName("checkpoint verification").getOrCreate(); sparkSession.implicits(); sparkSession.sparkContext().setCheckpointDir("Checkpoint/Dec16"); Dataset sampleData=sparkSession.read().parquet("filepath"); sampleData.foreachPartition(new ForeachPartitionFunction(){ /** * */ private static final long serialVersionUID = 1L; @Override public void call(Iterator row) throws Exception { while(row.hasNext()) { //Process data and insert into No-Sql DB } } }); } } Now where can i apply rdd.checkpoint(). Thanks, selvam On Thu, Dec 15, 2016 at 10:44 PM, Selvam Raman wrote: > I am using java. I will try and let u know. > On Dec 15, 2016 8:45 PM, "Irving Duran" wrote: > >> Not sure what programming language you are using, but in python you can >> do "sc.setCheckpointDir('~/apps/spark-2.0.1-bin-hadoop2.7/checkpoint/')". >> This will store checkpoints on that directory that I called checkpoint. >> >> >> Thank You, >> >> Irving Duran >> >> On Thu, Dec 15, 2016 at 10:33 AM, Selvam Raman wrote: >> >>> Hi, >>> >>> is there any provision in spark batch for checkpoint. >>> >>> I am having huge data, it takes more than 3 hours to process all data. I >>> am currently having 100 partitions. >>> >>> if the job fails after two hours, lets say it has processed 70 >>> partition. should i start spark job from the beginning or is there way for >>> checkpoint provision. >>> >>> Checkpoint,what i am expecting is start from 71 partition to till end. >>> >>> Please give me your suggestions. >>> >>> -- >>> Selvam Raman >>> "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து" >>> >> >> -- Selvam Raman "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
Spark dump in slave Node EMR
Hi, how can i take heap dump in EMR slave node to analyze. I have one master and two slave. if i enter jps command in Master, i could see sparksubmit with pid. But i could not see anything in slave node. how can i take heap dump for spark job. -- Selvam Raman "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
Re: Spark dump in slave Node EMR
If i want to take specifically for the task number which got failed. is it possible to take heap dump. "16/12/16 12:25:54 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Container killed by YARN for exceeding memory limits. 20.0 GB of 19.8 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead. 16/12/16 12:25:54 ERROR YarnClusterScheduler: Lost executor 1 on ip-.dev: Container killed by YARN for exceeding memory limits. 20.0 GB of 19.8 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead. 16/12/16 12:25:55 WARN TaskSetManager: Lost task 7.0 in stage 1.0 (TID 9, ip.dev): ExecutorLostFailure (executor 1 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 20.0 GB of 19.8 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead. 16/12/16 12:25:55 INFO BlockManagerMasterEndpoint: Trying to remove executor 1 from BlockManagerMaster. 16/12/16 12:25:55 INFO BlockManagerMaster: Removal of executor 1 requested 16/12/16 12:25:55 INFO YarnSchedulerBackend$YarnDriverEndpoint: Asked to remove non-existent executor 1 " thanks, selvam R On Fri, Dec 16, 2016 at 12:30 PM, Selvam Raman wrote: > Hi, > > how can i take heap dump in EMR slave node to analyze. > > I have one master and two slave. > > if i enter jps command in Master, i could see sparksubmit with pid. > > But i could not see anything in slave node. > > how can i take heap dump for spark job. > > -- > Selvam Raman > "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து" > -- Selvam Raman "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
Reading xls and xlsx files
Hi, Is there a way to read xls and xlsx files using spark?. is there any hadoop inputformat available to read xls and xlsx files which could be used in spark? -- Selvam Raman "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
Writing into parquet throws Array out of bounds exception
Hi, When i am trying to write dataset to parquet or to show(1,fase), my job throws array out of bounce exception. 16/12/21 12:38:50 WARN TaskSetManager: Lost task 7.0 in stage 36.0 (TID 81, ip-10-95-36-69.dev): java.lang.ArrayIndexOutOfBoundsException: 63 at org.apache.spark.unsafe.types.UTF8String.numBytesForFirstByte(UTF8String.java:156) at org.apache.spark.unsafe.types.UTF8String.indexOf(UTF8String.java:565) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:147) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) at org.apache.spark.scheduler.Task.run(Task.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 16/12/21 12:38:50 INFO TaskSetManager: Starting task 7.1 in stage 36.0 (TID 106, ip-10-95-36-70.dev, partition 7, RACK_LOCAL, 6020 bytes) 16/12/21 12:38:50 INFO YarnSchedulerBackend$YarnDriverEndpoint: Launching task 106 on executor id: 4 hostname: ip-10-95-36-70.dev. 16/12/21 12:38:50 WARN TaskSetManager: Lost task 4.0 in stage 36.0 (TID 78, ip-10-95-36-70.dev): java.lang.ArrayIndexOutOfBoundsException: 62 at org.apache.spark.unsafe.types.UTF8String.numBytesForFirstByte(UTF8String.java:156) at org.apache.spark.unsafe.types.UTF8String.indexOf(UTF8String.java:565) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:147) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) at org.apache.spark.scheduler.Task.run(Task.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) In my dataset there is one column which is longblob, if i convert to unbase64. I face this problem. i could able to write to parquet without conversion. So is there some limit for bytes per line?. Please give me your suggestion. -- Selvam Raman "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
is it possible to read .mdb file in spark
-- Selvam Raman "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
how to read object field within json file
Hi, { "id": "test1", "source": { "F1": { "id": "4970", "eId": "F1", "description": "test1", }, "F2": { "id": "5070", "eId": "F2", "description": "test2", }, "F3": { "id": "5170", "eId": "F3", "description": "test3", }, "F4":{} etc.. "F999":{} } I am having bzip json files like above format. some json row contains two objects within source(like F1 and F2), sometime five(F1,F2,F3,F4,F5),etc. So the final schema will contains combination of all objects for the source field. Now, every row will contain n number of objects but only some contains valid records. how can i retreive the value of "description" in "source" field. source.F1.description - returns the result but how can i get all description result for every row..(something like this "source.*.description"). -- Selvam Raman "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
Re: how to read object field within json file
Thank you Armbust. On Fri, Mar 24, 2017 at 7:02 PM, Michael Armbrust wrote: > I'm not sure you can parse this as an Array, but you can hint to the > parser that you would like to treat source as a map instead of as a > struct. This is a good strategy when you have dynamic columns in your data. > > Here is an example of the schema you can use to parse this JSON and also > how to use explode to turn it into separate rows > <https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1023043053387187/679071429109042/2840265927289860/latest.html>. > This blog post has more on working with semi-structured data in Spark > <https://databricks.com/blog/2017/02/23/working-complex-data-formats-structured-streaming-apache-spark-2-1.html> > . > > On Thu, Mar 23, 2017 at 2:49 PM, Yong Zhang wrote: > >> That's why your "source" should be defined as an Array[Struct] type >> (which makes sense in this case, it has an undetermined length , so you >> can explode it and get the description easily. >> >> Now you need write your own UDF, maybe can do what you want. >> >> Yong >> >> -- >> *From:* Selvam Raman >> *Sent:* Thursday, March 23, 2017 5:03 PM >> *To:* user >> *Subject:* how to read object field within json file >> >> Hi, >> >> { >> "id": "test1", >> "source": { >> "F1": { >> "id": "4970", >> "eId": "F1", >> "description": "test1", >> }, >> "F2": { >> "id": "5070", >> "eId": "F2", >> "description": "test2", >> }, >> "F3": { >> "id": "5170", >> "eId": "F3", >> "description": "test3", >> }, >> "F4":{} >> etc.. >> "F999":{} >> } >> >> I am having bzip json files like above format. >> some json row contains two objects within source(like F1 and F2), >> sometime five(F1,F2,F3,F4,F5),etc. So the final schema will contains >> combination of all objects for the source field. >> >> Now, every row will contain n number of objects but only some contains >> valid records. >> how can i retreive the value of "description" in "source" field. >> >> source.F1.description - returns the result but how can i get all >> description result for every row..(something like this >> "source.*.description"). >> >> -- >> Selvam Raman >> "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து" >> > > -- Selvam Raman "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
Convert Dataframe to Dataset in pyspark
In Scala, val ds = sqlContext.read.text("/home/spark/1.6/lines").as[String] what is the equivalent code in pyspark? -- Selvam Raman "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
Update DF record with delta data in spark
Hi, Table 1:(old File) name number salray Test1 1 1 Test2 2 1 Table 2: (Delta File) namenumber salray Test1 1 4 Test3 3 2 i do not have date stamp field in this table. Having composite key of name and number fields. Expected Result name number salray Test1 1 4 Test2 2 1 Test3 3 2 Current approach: 1) Delete row in table1 where table1.composite key = table2.composite key. 2) Union all table and table2 to get updated result. is this right approach?. is there any other way to achieve it? -- Selvam Raman "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
Pyspark - pickle.PicklingError: Can't pickle
I ran the below code in my Standalone mode. Python version 2.7.6. Spacy 1.7+ version. Spark 2.0.1 version. I'm new pie to pyspark. please help me to understand the below two versions of code. why first version run fine whereas second throws pickle.PicklingError: Can't pickle . at 0x107e39110>. (i was doubting that Second approach failure because it could not serialize the object and sent it to worker). *1) Run-Success:* *(SpacyExample-Module)* import spacy nlp = spacy.load('en_default') def spacyChunks(content): doc = nlp(content) mp=[] for chunk in doc.noun_chunks: phrase = content[chunk.start_char: chunk.end_char] mp.append(phrase) #print(mp) return mp if __name__ == '__main__': pass *Main-Module:* spark = SparkSession.builder.appName("readgzip" ).config(conf=conf).getOrCreate() gzfile = spark.read.schema(schema).json("") ... ... textresult.rdd.map(lambda x:x[0]).\ flatMap(lambda data: SpacyExample.spacyChunks(data)).saveAsTextFile("") *2) Run-Failure:* *MainModule:* nlp= spacy.load('en_default') def spacyChunks(content): doc = nlp(content) mp=[] for chunk in doc.noun_chunks: phrase = content[chunk.start_char: chunk.end_char] mp.append(phrase) #print(mp) return mp if __name__ == '__main__' create spraksession,read file, file.rdd.map(..).flatmap(lambdat data:spacyChunks(data).saveAsTextFile() Stack Trace: File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 649, in save_dict self._batch_setitems(obj.iteritems()) File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 681, in _batch_setitems save(v) File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 331, in save self.save_reduce(obj=obj, *rv) File "/Users/rs/Downloads/spark-2.0.1-bin-hadoop2.7/python/pyspark/cloudpickle.py", line 535, in save_reduce save(args) File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 562, in save_tuple save(element) File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 286, in save f(self, obj) # Call unbound method with explicit self File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 649, in save_dict self._batch_setitems(obj.iteritems()) File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 681, in _batch_setitems save(v) File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 317, in save self.save_global(obj, rv) File "/Users/rs/Downloads/spark-2.0.1-bin-hadoop2.7/python/pyspark/cloudpickle.py", line 390, in save_global raise pickle.PicklingError("Can't pickle %r" % obj) pickle.PicklingError: Can't pickle . at 0x107e39110> -- Selvam Raman "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
Spark Mlib - java.lang.OutOfMemoryError: Java heap space
Hi, I have 1 master and 4 slave node. Input data size is 14GB. Slave Node config : 32GB Ram,16 core I am trying to train word embedding model using spark. It is going out of memory. To train 14GB of data how much memory do i require?. I have givem 20gb per executor but below shows it is using 11.8GB out of 20 GB. BlockManagerInfo: Added broadcast_1_piece0 in memory on ip-.-.-.dev:35035 (size: 4.6 KB, free: 11.8 GB) This is the code if __name__ == "__main__": sc = SparkContext(appName="Word2VecExample") # SparkContext # $example on$ inp = sc.textFile("s3://word2vec/data/word2vec_word_data.txt/").map(lambda row: row.split(" ")) word2vec = Word2Vec() model = word2vec.fit(inp) model.save(sc, "s3://pysparkml/word2vecresult2/") sc.stop() Spark-submit Command: spark-submit --master yarn --conf 'spark.executor.extraJavaOptions=-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/mnt/tmp -XX:+UseG1GC -XX:+UseG1GC -XX:+PrintFlagsFinal -XX:+PrintReferenceGC -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintAdaptiveSizePolicy -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark' --num-executors 4 --executor-cores 2 --executor-memory 20g Word2VecExample.py -- Selvam Raman "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
Re: Spark Mlib - java.lang.OutOfMemoryError: Java heap space
This is where job going out of memory 17/04/24 10:09:22 INFO TaskSetManager: Finished task 122.0 in stage 1.0 (TID 356) in 4260 ms on ip-...-45.dev (124/234) 17/04/24 10:09:26 INFO BlockManagerInfo: Removed taskresult_361 on ip-10...-185.dev:36974 in memory (size: 5.2 MB, free: 8.5 GB) 17/04/24 10:09:26 INFO BlockManagerInfo: Removed taskresult_362 on ip-...-45.dev:40963 in memory (size: 5.2 MB, free: 8.9 GB) 17/04/24 10:09:26 INFO TaskSetManager: Finished task 125.0 in stage 1.0 (TID 359) in 4383 ms on ip-...-45.dev (125/234) # # java.lang.OutOfMemoryError: Java heap space # -XX:OnOutOfMemoryError="kill -9 %p" # Executing /bin/sh -c "kill -9 15090"... Killed Node-45.dev contains 8.9GB free while it throws out of memory. Can anyone please help me to understand the issue? On Mon, Apr 24, 2017 at 11:22 AM, Selvam Raman wrote: > Hi, > > I have 1 master and 4 slave node. Input data size is 14GB. > Slave Node config : 32GB Ram,16 core > > > I am trying to train word embedding model using spark. It is going out of > memory. To train 14GB of data how much memory do i require?. > > > I have givem 20gb per executor but below shows it is using 11.8GB out of > 20 GB. > BlockManagerInfo: Added broadcast_1_piece0 in memory on ip-.-.-.dev:35035 > (size: 4.6 KB, free: 11.8 GB) > > > This is the code > if __name__ == "__main__": > sc = SparkContext(appName="Word2VecExample") # SparkContext > > # $example on$ > inp = sc.textFile("s3://word2vec/data/word2vec_word_data.txt/").map(lambda > row: row.split(" ")) > > word2vec = Word2Vec() > model = word2vec.fit(inp) > > model.save(sc, "s3://pysparkml/word2vecresult2/") > sc.stop() > > > Spark-submit Command: > spark-submit --master yarn --conf > 'spark.executor.extraJavaOptions=-XX:+HeapDumpOnOutOfMemoryError > -XX:HeapDumpPath=/mnt/tmp -XX:+UseG1GC -XX:+UseG1GC -XX:+PrintFlagsFinal > -XX:+PrintReferenceGC -verbose:gc -XX:+PrintGCDetails > -XX:+PrintGCTimeStamps -XX:+PrintAdaptiveSizePolicy > -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark' --num-executors 4 > --executor-cores 2 --executor-memory 20g Word2VecExample.py > > > -- > Selvam Raman > "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து" > -- Selvam Raman "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
how to create List in pyspark
documentDF = spark.createDataFrame([ ("Hi I heard about Spark".split(" "), ), ("I wish Java could use case classes".split(" "), ), ("Logistic regression models are neat".split(" "), ) ], ["text"]) How can i achieve the same df while i am reading from source? doc = spark.read.text("/Users/rs/Desktop/nohup.out") how can i create array type with "sentences" column from doc(dataframe) The below one creates more than one column. rdd.map(lambda rdd: rdd[0]).map(lambda row:row.split(" ")) -- Selvam Raman "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
convert ps to jpg file
Hi, is there any good open source to convert the ps to jpg?. I am running spark job within that i am using Imagemagick/Graphicsmagick with Ghostscript to convert/resize image. IM/GM is took lot of memory/map memory/disk to convert KB of image file and took lot of time. Because of this issue frequently i got yan OOM and disk full issue. Could you please share your thoughts? -- Selvam Raman "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
how to get Cache size from storage
Hi All, I am having 100 GB of data(for use case). i am caching with MEMORY_AND_DISK. is there any log available to find how much data stored in memory and disk for the running or ran application. I could see the cache in UI with tab storage. So it should be available even after the job, where can i get those details. -- Selvam Raman "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
Re: How to convert Array of Json rows into Dataset of specific columns in Spark 2.2.0?
I just followed Hien Luu approach val empExplode = empInfoStrDF.select(explode(from_json('emp_info_str, empInfoSchema)).as("emp_info_withexplode")) empExplode.show(false) +---+ |emp_info_withexplode | +---+ |[foo,[CA,USA],WrappedArray([english,2016])]| |[bar,[OH,USA],WrappedArray([math,2017])] | +---+ empExplode.select($"emp_info_withexplode.name").show(false) ++ |name| ++ |foo | |bar | ++ empExplode.select($"emp_info_withexplode.address.state").show(false) +-+ |state| +-+ |CA | |OH | +-+ empExplode.select($"emp_info_withexplode.docs.subject").show(false) +-+ |subject | +-+ |[english]| |[math] | +-+ @Kant kodali, is that helpful for you? if not please let me know what changes are you expecting in this? On Sun, Jan 7, 2018 at 12:16 AM, Jules Damji wrote: > Here’s are couple tutorial that shows how to extract Structured nested > data > > https://databricks.com/blog/2017/06/27/4-sql-high-order- > lambda-functions-examine-complex-structured-data-databricks.html > > https://databricks.com/blog/2017/06/13/five-spark-sql- > utility-functions-extract-explore-complex-data-types.html > > Sent from my iPhone > Pardon the dumb thumb typos :) > > On Jan 6, 2018, at 11:42 AM, Hien Luu wrote: > > Hi Kant, > > I am not sure whether you had come up with a solution yet, but the > following > works for me (in Scala) > > val emp_info = """ > [ >{"name": "foo", "address": {"state": "CA", "country": "USA"}, > "docs":[{"subject": "english", "year": 2016}]}, >{"name": "bar", "address": {"state": "OH", "country": "USA"}, > "docs":[{"subject": "math", "year": 2017}]} > ]""" > > import org.apache.spark.sql.types._ > > val addressSchema = new StructType().add("state", > StringType).add("country", > StringType) > val docsSchema = ArrayType(new StructType().add("subject", > StringType).add("year", IntegerType)) > val employeeSchema = new StructType().add("name", > StringType).add("address", > addressSchema).add("docs", docsSchema) > > val empInfoSchema = ArrayType(employeeSchema) > > empInfoSchema.json > > val empInfoStrDF = Seq((emp_info)).toDF("emp_info_str") > empInfoStrDF.printSchema > empInfoStrDF.show(false) > > val empInfoDF = empInfoStrDF.select(from_json('emp_info_str, > empInfoSchema).as("emp_info")) > empInfoDF.printSchema > > empInfoDF.select(struct("*")).show(false) > > empInfoDF.select("emp_info.name", "emp_info.address", > "emp_info.docs").show(false) > > empInfoDF.select(explode('emp_info.getItem("name"))).show > > > > > -- > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > -- Selvam Raman "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
Pyspark UDF/map fucntion throws pickling exception
e(v) File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/pickle.py", line 521, in save self.save_reduce(obj=obj, *rv) File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/pyspark/cloudpickle.py", line 600, in save_reduce save(state) File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/pickle.py", line 476, in save f(self, obj) # Call unbound method with explicit self File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/pickle.py", line 821, in save_dict self._batch_setitems(obj.items()) File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/pickle.py", line 847, in _batch_setitems save(v) File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/pickle.py", line 521, in save self.save_reduce(obj=obj, *rv) File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/pyspark/cloudpickle.py", line 582, in save_reduce save(args) File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/pickle.py", line 476, in save f(self, obj) # Call unbound method with explicit self File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/pickle.py", line 751, in save_tuple save(element) File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/pickle.py", line 476, in save f(self, obj) # Call unbound method with explicit self File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/pyspark/cloudpickle.py", line 368, in save_builtin_function return self.save_function(obj) File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/pyspark/cloudpickle.py", line 247, in save_function if islambda(obj) or obj.__code__.co_filename == '' or themodule is None: AttributeError: 'builtin_function_or_method' object has no attribute '__code__' please help me. -- Selvam Raman "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
Re: Pyspark UDF/map fucntion throws pickling exception
ework/Versions/3.6/lib/python3.6/pickle.py", line 476, in save f(self, obj) # Call unbound method with explicit self File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/pickle.py", line 821, in save_dict self._batch_setitems(obj.items()) File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/pickle.py", line 852, in _batch_setitems save(v) File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/pickle.py", line 521, in save self.save_reduce(obj=obj, *rv) File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/pyspark/cloudpickle.py", line 600, in save_reduce save(state) File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/pickle.py", line 476, in save f(self, obj) # Call unbound method with explicit self File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/pickle.py", line 821, in save_dict self._batch_setitems(obj.items()) File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/pickle.py", line 847, in _batch_setitems save(v) File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/pickle.py", line 521, in save self.save_reduce(obj=obj, *rv) File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/pyspark/cloudpickle.py", line 582, in save_reduce save(args) File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/pickle.py", line 476, in save f(self, obj) # Call unbound method with explicit self File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/pickle.py", line 751, in save_tuple save(element) File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/pickle.py", line 476, in save f(self, obj) # Call unbound method with explicit self File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/pyspark/cloudpickle.py", line 368, in save_builtin_function return self.save_function(obj) File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/pyspark/cloudpickle.py", line 247, in save_function if islambda(obj) or obj.__code__.co_filename == '' or themodule is None: AttributeError: 'builtin_function_or_method' object has no attribute '__code__' During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/Users/rs/PycharmProjects/SparkDemo/com/elsevier/vtw/ExtractDescription.py", line 30, in #description.rdd.flatMap(lambda row: getPhrases(row.desc)).foreach(f) File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/pyspark/rdd.py", line 782, in foreach self.mapPartitions(processPartition).count() # Force evaluation File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/pyspark/rdd.py", line 1041, in count return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/pyspark/rdd.py", line 1032, in sum return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add) File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/pyspark/rdd.py", line 906, in fold vals = self.mapPartitions(func).collect() File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/pyspark/rdd.py", line 809, in collect port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd()) File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/pyspark/rdd.py", line 2455, in _jrdd self._jrdd_deserializer, profiler) File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/pyspark/rdd.py", line 2388, in _wrap_function pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command) File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/pyspark/rdd.py", line 2374, in _prepare_for_python_RDD pickled_command = ser.dumps(command) File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/pyspark/serializers.py", line 464, in dumps return cloudpickle.dumps(obj, 2) File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/pyspark/cloudpickle.py", line 704, in dumps cp.dump(obj) File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/pyspark/cloudpickle.py", line 162, in dump raise pickle.PicklingError(msg) _pickle.PicklingError: Could not serialize object: AttributeError: 'builtin_function_or_method' object has n
pyspark+spacy throwing pickling exception
import spacy nlp = spacy.load('en') def getPhrases(content): phrases = [] doc = nlp(str(content)) for chunks in doc.noun_chunks: phrases.append(chunks.text) return phrases the above function will retrieve the noun phrases from the content and return list of phrases. def f(x) : print(x) description = xmlData.filter(col("dcterms:description").isNotNull()).select(col("dcterms:description").alias("desc")) description.rdd.flatMap(lambda row: getPhrases(row.desc)).foreach(f) when i am trying to access getphrases i am getting below exception -- Selvam Raman "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
Re: pyspark+spacy throwing pickling exception
Hi , i solved the issue when i extract the method into another class. Failure: Class extract.py - contains the whole implementation. Because of this single class driver trying to serialize spacy(english) object and sending to executor. There i am facing pickling exception. Success: Class extract.py - it referring getPhrase method of spacyutils Class spacytuils.py Now, spacy initialized in executor, there is no need of serialization. Please let me know my understanding is correct. On Thu, Feb 15, 2018 at 12:14 PM, Holden Karau wrote: > So you left out the exception. On one hand I’m also not sure how well > spacy serializes, so to debug this I would start off by moving the nlp = > inside of my function and see if it still fails. > > On Thu, Feb 15, 2018 at 9:08 PM Selvam Raman wrote: > >> import spacy >> >> nlp = spacy.load('en') >> >> >> >> def getPhrases(content): >> phrases = [] >> doc = nlp(str(content)) >> for chunks in doc.noun_chunks: >> phrases.append(chunks.text) >> return phrases >> >> the above function will retrieve the noun phrases from the content and >> return list of phrases. >> >> >> def f(x) : print(x) >> >> >> description = >> xmlData.filter(col("dcterms:description").isNotNull()).select(col("dcterms:description").alias("desc")) >> >> description.rdd.flatMap(lambda row: getPhrases(row.desc)).foreach(f) >> >> when i am trying to access getphrases i am getting below exception >> >> >> >> -- >> Selvam Raman >> "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து" >> > -- > Twitter: https://twitter.com/holdenkarau > -- Selvam Raman "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
Spark-Solr -- unresolved dependencies
Hi, spark version - EMR 2.0.0 spark-shell --packages com.lucidworks.spark:spark-solr:3.0.1 when i tired about command, am getting below error :: :: UNRESOLVED DEPENDENCIES :: :: :: org.restlet.jee#org.restlet;2.3.0: not found :: org.restlet.jee#org.restlet.ext.servlet;2.3.0: not found :: :: USE VERBOSE OR DEBUG MESSAGE LEVEL FOR MORE DETAILS Exception in thread "main" java.lang.RuntimeException: [unresolved dependency: org.restlet.jee#org.restlet;2.3.0: not found, unresolved dependency: org.restlet.jee#org.restlet.ext.servlet;2.3.0: not found] at org.apache.spark.deploy.SparkSubmitUtils$.resolveMavenCoordinates(SparkSubmit.scala:1066) at org.apache.spark.deploy.SparkSubmit$.prepareSubmitEnvironment(SparkSubmit.scala:294) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:158) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) -- Selvam Raman "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
Spark EMR executor-core vs Vcores
Hi, spark version - 2.0.0 spark distribution - EMR 5.0.0 Spark Cluster - one master, 5 slaves Master node - m3.xlarge - 8 vCore, 15 GiB memory, 80 SSD GB storage Slave node - m3.2xlarge - 16 vCore, 30 GiB memory, 160 SSD GB storage Cluster Metrics Apps SubmittedApps PendingApps RunningApps CompletedContainers RunningMemory UsedMemory TotalMemory ReservedVCores UsedVCores TotalVCores ReservedActive NodesDecommissioning NodesDecommissioned NodesLost NodesUnhealthy NodesRebooted Nodes 16 0 1 15 5 88.88 GB 90.50 GB 22 GB 5 79 1 5 <http://localhost:8088/cluster/nodes> 0 <http://localhost:8088/cluster/nodes/decommissioning> 0 <http://localhost:8088/cluster/nodes/decommissioned> 5 <http://localhost:8088/cluster/nodes/lost> 0 <http://localhost:8088/cluster/nodes/unhealthy> 0 <http://localhost:8088/cluster/nodes/rebooted> I have submitted job with below configuration --num-executors 5 --executor-cores 10 --executor-memory 20g spark.task.cpus - be default 1 My understanding is there will be 5 executore each can run 10 task at a time and task can share total memory of 20g. Here, i could see only 5 vcores used which means 1 executor instance use 20g+10%overhead ram(22gb), 10 core(number of threads), 1 Vcore(cpu). please correct me if my understand is wrong. how can i utilize number of vcore in EMR effectively. Will Vcore boost performance? -- Selvam Raman "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
Re: Spark EMR executor-core vs Vcores
Master Node details: lscpu Architecture: x86_64 CPU op-mode(s):32-bit, 64-bit Byte Order:Little Endian CPU(s):4 On-line CPU(s) list: 0-3 Thread(s) per core:4 Core(s) per socket:1 Socket(s): 1 NUMA node(s): 1 Vendor ID: GenuineIntel CPU family:6 Model: 62 Model name:Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz Stepping: 4 CPU MHz: 2494.066 BogoMIPS: 4988.13 Hypervisor vendor: Xen Virtualization type: full L1d cache: 32K L1i cache: 32K L2 cache: 256K L3 cache: 25600K NUMA node0 CPU(s): 0-3 Slave Node Details: Architecture: x86_64 CPU op-mode(s):32-bit, 64-bit Byte Order:Little Endian CPU(s):8 On-line CPU(s) list: 0-7 Thread(s) per core:8 Core(s) per socket:1 Socket(s): 1 NUMA node(s): 1 Vendor ID: GenuineIntel CPU family:6 Model: 62 Model name:Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz Stepping: 4 CPU MHz: 2500.054 BogoMIPS: 5000.10 Hypervisor vendor: Xen Virtualization type: full L1d cache: 32K L1i cache: 32K L2 cache: 256K L3 cache: 25600K NUMA node0 CPU(s): 0-7 On Mon, Feb 26, 2018 at 10:20 AM, Selvam Raman wrote: > Hi, > > spark version - 2.0.0 > spark distribution - EMR 5.0.0 > > Spark Cluster - one master, 5 slaves > > Master node - m3.xlarge - 8 vCore, 15 GiB memory, 80 SSD GB storage > Slave node - m3.2xlarge - 16 vCore, 30 GiB memory, 160 SSD GB storage > > > Cluster Metrics > Apps SubmittedApps PendingApps RunningApps CompletedContainers RunningMemory > UsedMemory TotalMemory ReservedVCores UsedVCores TotalVCores ReservedActive > NodesDecommissioning NodesDecommissioned NodesLost NodesUnhealthy > NodesRebooted > Nodes > 16 0 1 15 5 88.88 GB 90.50 GB 22 GB 5 79 1 5 > <http://localhost:8088/cluster/nodes> 0 > <http://localhost:8088/cluster/nodes/decommissioning> 0 > <http://localhost:8088/cluster/nodes/decommissioned> 5 > <http://localhost:8088/cluster/nodes/lost> 0 > <http://localhost:8088/cluster/nodes/unhealthy> 0 > <http://localhost:8088/cluster/nodes/rebooted> > I have submitted job with below configuration > --num-executors 5 --executor-cores 10 --executor-memory 20g > > > > spark.task.cpus - be default 1 > > > My understanding is there will be 5 executore each can run 10 task at a > time and task can share total memory of 20g. Here, i could see only 5 > vcores used which means 1 executor instance use 20g+10%overhead ram(22gb), > 10 core(number of threads), 1 Vcore(cpu). > > please correct me if my understand is wrong. > > how can i utilize number of vcore in EMR effectively. Will Vcore boost > performance? > > > -- > Selvam Raman > "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து" > -- Selvam Raman "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
Re: Spark EMR executor-core vs Vcores
Hi Fawze, Yes, it is true that i am running in yarn mode, 5 containers represents 4executor and 1 master. But i am not expecting this details as i already aware of this. What i want to know is relationship between Vcores(Emr yarn) vs executor-core(Spark). >From my slave configuration i understand that only 8 thread available in my slave machine which means 8 thread run at a time at max. Thread(s) per core:8 Core(s) per socket:1 Socket(s): 1 so i don't think so it is valid to give executore-core-10 in my spark-submission. On Mon, Feb 26, 2018 at 10:54 AM, Fawze Abujaber wrote: > It's recommended to sue executor-cores of 5. > > Each executor here will utilize 20 GB which mean the spark job will > utilize 50 cpu cores and 100GB memory. > > You can not run more than 4 executors because your cluster doesn't have > enough memory. > > Use see 5 executor because 4 for the job and one for the application > master. > > serr the used menory and the total memory. > > On Mon, Feb 26, 2018 at 12:20 PM, Selvam Raman wrote: > >> Hi, >> >> spark version - 2.0.0 >> spark distribution - EMR 5.0.0 >> >> Spark Cluster - one master, 5 slaves >> >> Master node - m3.xlarge - 8 vCore, 15 GiB memory, 80 SSD GB storage >> Slave node - m3.2xlarge - 16 vCore, 30 GiB memory, 160 SSD GB storage >> >> >> Cluster Metrics >> Apps SubmittedApps PendingApps RunningApps CompletedContainers RunningMemory >> UsedMemory TotalMemory ReservedVCores UsedVCores TotalVCores ReservedActive >> NodesDecommissioning NodesDecommissioned NodesLost NodesUnhealthy >> NodesRebooted >> Nodes >> 16 0 1 15 5 88.88 GB 90.50 GB 22 GB 5 79 1 5 >> <http://localhost:8088/cluster/nodes> 0 >> <http://localhost:8088/cluster/nodes/decommissioning> 0 >> <http://localhost:8088/cluster/nodes/decommissioned> 5 >> <http://localhost:8088/cluster/nodes/lost> 0 >> <http://localhost:8088/cluster/nodes/unhealthy> 0 >> <http://localhost:8088/cluster/nodes/rebooted> >> I have submitted job with below configuration >> --num-executors 5 --executor-cores 10 --executor-memory 20g >> >> >> >> spark.task.cpus - be default 1 >> >> >> My understanding is there will be 5 executore each can run 10 task at a >> time and task can share total memory of 20g. Here, i could see only 5 >> vcores used which means 1 executor instance use 20g+10%overhead ram(22gb), >> 10 core(number of threads), 1 Vcore(cpu). >> >> please correct me if my understand is wrong. >> >> how can i utilize number of vcore in EMR effectively. Will Vcore boost >> performance? >> >> >> -- >> Selvam Raman >> "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து" >> > > -- Selvam Raman "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
Re: Spark EMR executor-core vs Vcores
Thanks. That’s make sense. I want to know one more think , available vcore per machine is 16 but threads per node 8. Am I missing to relate here. What I m thinking now is number of vote = number of threads. On Mon, 26 Feb 2018 at 18:45, Vadim Semenov wrote: > All used cores aren't getting reported correctly in EMR, and YARN itself > has no control over it, so whatever you put in `spark.executor.cores` will > be used, > but in the ResourceManager you will only see 1 vcore used per nodemanager. > > On Mon, Feb 26, 2018 at 5:20 AM, Selvam Raman wrote: > >> Hi, >> >> spark version - 2.0.0 >> spark distribution - EMR 5.0.0 >> >> Spark Cluster - one master, 5 slaves >> >> Master node - m3.xlarge - 8 vCore, 15 GiB memory, 80 SSD GB storage >> Slave node - m3.2xlarge - 16 vCore, 30 GiB memory, 160 SSD GB storage >> >> >> Cluster Metrics >> Apps SubmittedApps PendingApps RunningApps CompletedContainers RunningMemory >> UsedMemory TotalMemory ReservedVCores UsedVCores TotalVCores ReservedActive >> NodesDecommissioning NodesDecommissioned NodesLost NodesUnhealthy >> NodesRebooted >> Nodes >> 16 0 1 15 5 88.88 GB 90.50 GB 22 GB 5 79 1 5 >> <http://localhost:8088/cluster/nodes> 0 >> <http://localhost:8088/cluster/nodes/decommissioning> 0 >> <http://localhost:8088/cluster/nodes/decommissioned> 5 >> <http://localhost:8088/cluster/nodes/lost> 0 >> <http://localhost:8088/cluster/nodes/unhealthy> 0 >> <http://localhost:8088/cluster/nodes/rebooted> >> I have submitted job with below configuration >> --num-executors 5 --executor-cores 10 --executor-memory 20g >> >> >> >> spark.task.cpus - be default 1 >> >> >> My understanding is there will be 5 executore each can run 10 task at a >> time and task can share total memory of 20g. Here, i could see only 5 >> vcores used which means 1 executor instance use 20g+10%overhead ram(22gb), >> 10 core(number of threads), 1 Vcore(cpu). >> >> please correct me if my understand is wrong. >> >> how can i utilize number of vcore in EMR effectively. Will Vcore boost >> performance? >> >> >> -- >> Selvam Raman >> "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து" >> > > -- Selvam Raman "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
Spark Higher order function
Dear All, i read about higher order function in databricks blog. https://docs.databricks.com/spark/latest/spark-sql/higher-order-functions-lambda-functions.html does higher order functionality available in our spark(open source)? -- Selvam Raman "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
Sequence file to Image in spark
Hi All, I am trying to convert sequence file to image in spark. i found that when i was reading bytearrayinputstream from bytes it throws serialization exception. Any insight will be helpful. scala> sc.sequenceFile[NullWritable,BytesWritable]("D:/seqImage").map(x => {ImageIO.write(ImageIO.read(newByteArrayInputStream(x._2.copyBytes())),"png",new File("D:/ima"))}).collect 2018-04-28 15:45:52 ERROR Executor:91 - Exception in task 0.0 in stage 8.0 (TID 14) java.lang.IllegalArgumentException: image == null! at javax.imageio.ImageTypeSpecifier.createFromRenderedImage(Unknown Sour ce) at javax.imageio.ImageIO.getWriter(Unknown Source) at javax.imageio.ImageIO.write(Unknown Source) at $line117.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(:31) at $line117.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(:31) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala: 59) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala: 104) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala: 48) at scala.collection.TraversableOnce$class.to (TraversableOnce.scala:310) at scala.collection.AbstractIterator.to(Iterator.scala:1336) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala :302) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala: 289) at scala.collection.AbstractIterator.toArray(Iterator.scala:1336) at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.sca la:939) at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.sca la:939) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.sc ala:2067) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.sc ala:2067) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.lang.Thread.run(Unknown Source) 2018-04-28 15:45:52 WARN TaskSetManager:66 - Lost task 0.0 in stage 8.0 (TID 14 , localhost, executor driver): java.lang.IllegalArgumentException: image == null ! at javax.imageio.ImageTypeSpecifier.createFromRenderedImage(Unknown Sour ce) -- Selvam Raman "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
how to decide broadcast join data size
Hi, I could not find useful formula or documentation which will help me to decide the broadcast join data size depends on the cluster size. Please let me know is there thumb rule available to find. For example cluster size - 20 node cluster, 32 gb per node and 8 core per node. executor-memory = 8gb, executor-core=4 Memory: 8gb(0.4% per internal) - 4.8gb for actual computation and storage. lets consider i have not done any persist in this case i could utilize 4.8gb per executor. IS IT POSSIBLE FOR ME TO USE 400MB file for BROADCAST JOIN? -- Selvam Raman "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"