Null pointer exception when using com.databricks.spark.csv

2016-03-29 Thread Selvam Raman
Hi,

i am using spark 1.6.0 prebuilt hadoop 2.6.0 version in my windows machine.

i was trying to use databricks csv format to read csv file. i used the
below command.

[image: Inline image 1]

I got null pointer exception. Any help would be greatly appreciated.

[image: Inline image 2]

-- 
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


Re: Null pointer exception when using com.databricks.spark.csv

2016-03-29 Thread Selvam Raman
Hi,

i can able to load and extract the data. only problem when i using this
databricks library.

thanks,
selvam R

On Wed, Mar 30, 2016 at 9:33 AM, Hyukjin Kwon  wrote:

> Hi,
>
> I guess this is not a CSV-datasource specific problem.
>
> Does loading any file (eg. textFile()) work as well?
>
> I think this is related with this thread,
> http://apache-spark-user-list.1001560.n3.nabble.com/Error-while-running-example-scala-application-using-spark-submit-td10056.html
> .
>
>
> 2016-03-30 12:44 GMT+09:00 Selvam Raman :
>
>> Hi,
>>
>> i am using spark 1.6.0 prebuilt hadoop 2.6.0 version in my windows
>> machine.
>>
>> i was trying to use databricks csv format to read csv file. i used the
>> below command.
>>
>> [image: Inline image 1]
>>
>> I got null pointer exception. Any help would be greatly appreciated.
>>
>> [image: Inline image 2]
>>
>> --
>> Selvam Raman
>> "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
>>
>
>


-- 
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


HiveContext in spark

2016-04-12 Thread Selvam Raman
I Could not able to use Insert , update and delete command in HiveContext.

i am using spark 1.6.1 version and hive 1.1.0

Please find the error below.



​scala> hc.sql("delete from  trans_detail where counter=1");
16/04/12 14:58:45 INFO ParseDriver: Parsing command: delete from
 trans_detail where counter=1
16/04/12 14:58:45 INFO ParseDriver: Parse Completed
16/04/12 14:58:45 INFO ParseDriver: Parsing command: delete from
 trans_detail where counter=1
16/04/12 14:58:45 INFO ParseDriver: Parse Completed
16/04/12 14:58:45 INFO BlockManagerInfo: Removed broadcast_2_piece0 on
localhost:60409 in memory (size: 46.9 KB, free: 536.7 MB)
16/04/12 14:58:46 INFO ContextCleaner: Cleaned accumulator 3
16/04/12 14:58:46 INFO BlockManagerInfo: Removed broadcast_4_piece0 on
localhost:60409 in memory (size: 3.6 KB, free: 536.7 MB)
org.apache.spark.sql.AnalysisException:
Unsupported language features in query: delete from  trans_detail where
counter=1
TOK_DELETE_FROM 1, 0,11, 13
  TOK_TABNAME 1, 5,5, 13
trans_detail 1, 5,5, 13
  TOK_WHERE 1, 7,11, 39
= 1, 9,11, 39
  TOK_TABLE_OR_COL 1, 9,9, 32
counter 1, 9,9, 32
  1 1, 11,11, 40

scala.NotImplementedError: No parse rules for TOK_DELETE_FROM:
 TOK_DELETE_FROM 1, 0,11, 13
  TOK_TABNAME 1, 5,5, 13
trans_detail 1, 5,5, 13
  TOK_WHERE 1, 7,11, 39
= 1, 9,11, 39
  TOK_TABLE_OR_COL 1, 9,9, 32
counter 1, 9,9, 32
  1 1, 11,11, 40

org.apache.spark.sql.hive.HiveQl$.nodeToPlan(HiveQl.scala:1217)
​



-- 
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


next on empty iterator though i used hasNext

2016-04-25 Thread Selvam Raman
I am reading a data from Kinesis stream (merging shard values with union
stream) to spark streaming. then doing the following code to push the data
to DB.
​

splitCSV.foreachRDD(new VoidFunction2,Time>()
{

private static final long serialVersionUID = 1L;

public void call(JavaRDD rdd, Time time) throws Exception
{
JavaRDD varMapRDD = rdd.map(new Function()
{
private static final long serialVersionUID = 1L;

public SFieldBean call(String[] values) throws Exception
{
.
);

varMapRDD.foreachPartition(new VoidFunction>(
{
private static final long serialVersionUID = 1L;
MySQLConnectionHelper.getConnection("urlinfo");
@Override
public void call(Iterator iterValues) throws Exception
{

while(iterValues.hasNext())
{

}
}

Though I am using hasNext but it throws the follwing error
​
Caused by: java.util.NoSuchElementException: next on empty iterator
at scala.collection.Iterator$$anon$2.next(Iterator.scala:39)
at scala.collection.Iterator$$anon$2.next(Iterator.scala:37)
at
scala.collection.IndexedSeqLike$Elements.next(IndexedSeqLike.scala:64)
at
org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at
scala.collection.convert.Wrappers$IteratorWrapper.next(Wrappers.scala:30)
at com.test.selva.KinesisToSpark$2$2.call(KinesisToSpark.java:319)
at com.test.selva.KinesisToSpark$2$2.call(KinesisToSpark.java:288)
at
org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:225)
at
org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:225)
at
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920)
at
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
... 3 more


-- 
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


Re: Release Announcement: XGBoost4J - Portable Distributed XGBoost in Spark, Flink and Dataflow

2016-05-25 Thread Selvam Raman
XGBoost4J could integrate with spark from 1.6 version.

Currently I am using spark 1.5.2. Can I use XGBoost instead of XGBoost4j.

Will both provides same result.

Thanks,
Selvam R
+91-97877-87724
On Mar 15, 2016 9:23 PM, "Nan Zhu"  wrote:

> Dear Spark Users and Developers,
>
> We (Distributed (Deep) Machine Learning Community (http://dmlc.ml/)) are
> happy to announce the release of XGBoost4J (
> http://dmlc.ml/2016/03/14/xgboost4j-portable-distributed-xgboost-in-spark-flink-and-dataflow.html),
> a Portable Distributed XGBoost in Spark, Flink and Dataflow
>
> XGBoost is an optimized distributed gradient boosting library designed to
> be highly *efficient*, *flexible* and *portable*.XGBoost provides a
> parallel tree boosting (also known as GBDT, GBM) that solve many data
> science problems in a fast and accurate way. It has been the winning
> solution for many machine learning scenarios, ranging from Machine Learning
> Challenges (
> https://github.com/dmlc/xgboost/tree/master/demo#machine-learning-challenge-winning-solutions)
> to Industrial User Cases (
> https://github.com/dmlc/xgboost/tree/master/demo#usecases)
>
> *XGBoost4J* is a new package in XGBoost aiming to provide the clean
> Scala/Java APIs and the seamless integration with the mainstream data
> processing platform, like Apache Spark. With XGBoost4J, users can run
> XGBoost as a stage of Spark job and build a unified pipeline from ETL to
> Model training to data product service within Spark, instead of jumping
> across two different systems, i.e. XGBoost and Spark. (Example:
> https://github.com/dmlc/xgboost/blob/master/jvm-packages/xgboost4j-example/src/main/scala/ml/dmlc/xgboost4j/scala/example/spark/DistTrainWithSpark.scala
> )
>
> Today, we release the first version of XGBoost4J to bring more choices to
> the Spark users who are seeking the solutions to build highly efficient
> data analytic platform and enrich the Spark ecosystem. We will keep moving
> forward to integrate with more features of Spark. Of course, you are more
> than welcome to join us and contribute to the project!
>
> For more details of distributed XGBoost, you can refer to the
> recently published paper: http://arxiv.org/abs/1603.02754
>
> Best,
>
> --
> Nan Zhu
> http://codingcat.me
>


Windows Rstudio to Linux spakR

2016-06-01 Thread Selvam Raman
Hi ,

How to connect to sparkR (which is available in Linux env) using
Rstudio(Windows env).

Please help me.

-- 
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


Skew data

2016-06-16 Thread Selvam Raman
Hi,

What is skew data.

I read that if the data was skewed while joining it would take long time to
finish the job.(99 percent finished in seconds where 1 percent of task
taking minutes to hour).

How to handle skewed data in spark.

Thanks,
Selvam R
+91-97877-87724


Sqoop On Spark

2016-08-01 Thread Selvam Raman
 Hi Team,

how can i use spark as execution engine in sqoop2. i see the patch(S
QOOP-1532 <https://issues.apache.org/jira/browse/SQOOP-1532>) but it shows
in progess.

so can not we use sqoop on spark.

Please help me if you have an any idea.

-- 
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


SparkSession for RDBMS

2016-08-03 Thread Selvam Raman
Hi All,

I would like to read the data from RDBMS to spark (2.0) using sparksession.
How can i decide upper boundary, lower boundary and partitions.
is there any specific approach available.

How Sqoop2 decides number of partitions, upper and lower boundary if we are
not specifying anything.

-- 
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


Get distinct column data from grouped data

2016-08-09 Thread Selvam Raman
Example:

sel1 test
sel1 test
sel1 ok
sel2 ok
sel2 test


expected result:

sel1, [test,ok]
sel2,[test,ok]

How to achieve the above result using spark dataframe.

please suggest me.
-- 
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


Re: Get distinct column data from grouped data

2016-08-09 Thread Selvam Raman
my frined suggest this way

val fil = sc.textFile("hdfs:///user/vijayc/data/test-spk.tx")

val res =fil.map(l => l.split(",")).map(l =>( l(0),l(1))).groupByKey.map(rd
=>(rd._1,rd._2.toList.distinct))


another useful function is *collect_set* in dataframe.


Thanks,

selvam R

On Tue, Aug 9, 2016 at 4:19 PM, Selvam Raman  wrote:

> Example:
>
> sel1 test
> sel1 test
> sel1 ok
> sel2 ok
> sel2 test
>
>
> expected result:
>
> sel1, [test,ok]
> sel2,[test,ok]
>
> How to achieve the above result using spark dataframe.
>
> please suggest me.
> --
> Selvam Raman
> "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
>



-- 
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


Data frame Performance

2016-08-16 Thread Selvam Raman
Hi All,

Please suggest me the best approach to achieve result. [ Please comment if
the existing logic is fine or not]

Input Record :

ColA ColB ColC
1 2 56
1 2 46
1 3 45
1 5 34
1 5 90
2 1 89
2 5 45
​
Expected Result

ResA ResB
12:2|3:3|5:5
2   1:1|5:5

I followd the below Spark steps

(Spark version - 1.5.0)

def valsplit(elem :scala.collection.mutable.WrappedArray[String]) : String
=
{

elem.map(e => e+":"+e).mkString("|")
}

sqlContext.udf.register("valudf",valsplit(_:scala.collection.mutable.WrappedArray[String]))


val x =sqlContext.sql("select site,valudf(collect_set(requests)) as test
from sel group by site").first



-- 
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


Re: Data frame Performance

2016-08-16 Thread Selvam Raman
Hi Mich,

The input and output are just for example and it s not exact column name.
Colc not needed.

The code which I shared is working fine but need to confirm, was it right
approach and effect performance.

Thanks,
Selvam R
+91-97877-87724
On Aug 16, 2016 5:18 PM, "Mich Talebzadeh" 
wrote:

> Hi Selvan,
>
> is table called sel,?
>
> And are these assumptions correct?
>
> site -> ColA
> requests -> ColB
>
> I don't think you are using ColC here?
>
> HTH
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 16 August 2016 at 12:06, Selvam Raman  wrote:
>
>> Hi All,
>>
>> Please suggest me the best approach to achieve result. [ Please comment
>> if the existing logic is fine or not]
>>
>> Input Record :
>>
>> ColA ColB ColC
>> 1 2 56
>> 1 2 46
>> 1 3 45
>> 1 5 34
>> 1 5 90
>> 2 1 89
>> 2 5 45
>> ​
>> Expected Result
>>
>> ResA ResB
>> 12:2|3:3|5:5
>> 2   1:1|5:5
>>
>> I followd the below Spark steps
>>
>> (Spark version - 1.5.0)
>>
>> def valsplit(elem :scala.collection.mutable.WrappedArray[String]) :
>> String =
>> {
>>
>> elem.map(e => e+":"+e).mkString("|")
>> }
>>
>> sqlContext.udf.register("valudf",valsplit(_:scala.collection
>> .mutable.WrappedArray[String]))
>>
>>
>> val x =sqlContext.sql("select site,valudf(collect_set(requests)) as test
>> from sel group by site").first
>>
>>
>>
>> --
>> Selvam Raman
>> "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
>>
>
>


Extract year from string format of date

2016-08-17 Thread Selvam Raman
Spark Version : 1.5.0


Record:
01-Jan-16

Expected Result:
2016

I used the below code which is shared in user group.

from_unixtime(unix_timestamp($"Creation Date","dd-MMM-yy"),""))

is this right approach or do we have any other approach.

NOTE:
i tried *year() *function but it gives only null values for the string same
for *to_date()* function.
-- 
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


Re: Windows operation orderBy desc

2016-08-24 Thread Selvam Raman
Hi all,

i am using window function to find out the latest record using row_number;

the hive table is partitioned.

when i run the function it runs for 545.

what is the reason for 545 task.

Thanks,
selvam R

On Mon, Aug 1, 2016 at 8:09 PM, Mich Talebzadeh 
wrote:

> You need to get the position right
>
>
> val wSpec = Window.partitionBy("col1").orderBy(desc("col2"))
>
> HTH
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 1 August 2016 at 14:56, Ashok Kumar 
> wrote:
>
>> Hi,
>>
>> in the following Window spec I want orderBy ("") to be displayed
>> in descending order please
>>
>> val W = Window.partitionBy("col1").orderBy("col2")
>>
>> If I Do
>>
>> val W = Window.partitionBy("col1").orderBy("col2".desc)
>>
>> It throws error
>>
>> console>:26: error: value desc is not a member of String
>>
>> How can I achieve that?
>>
>> Thanking you
>>
>
>


-- 
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


Insert non-null values from dataframe

2016-08-25 Thread Selvam Raman
Hi ,

Dataframe:
colA colB colC colD colE
1 2 3 4 5
1 2 3 null null
1 null null  null 5
null null  3 4 5

I want to insert dataframe to nosql database, where null occupies
values(Cassandra). so i have to insert the column which has non-null values
in the row.

Expected:

Record 1: (1,2,3,4,5)
Record 2:(1,2,3)
Record 3:(1,5)
Record 4:(3,4,5)

-- 
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


Re: Insert non-null values from dataframe

2016-08-28 Thread Selvam Raman
Thanks for the update. we are using 2.0 version.

so planning to write own custom logic to remove the null values.

Thanks,
selvam R

On Fri, Aug 26, 2016 at 9:08 PM, Russell Spitzer 
wrote:

> Cassandra does not differentiate between null and empty, so when reading
> from C* all empty values are reported as null. To avoid inserting nulls
> (avoiding tombstones) see
>
> https://github.com/datastax/spark-cassandra-connector/
> blob/master/doc/5_saving.md#globally-treating-all-nulls-as-unset
>
> This will not prevent those columns from being read as null though, it
> will only skip writing tombstones.
>
> On Thu, Aug 25, 2016, 1:23 PM Selvam Raman  wrote:
>
>> Hi ,
>>
>> Dataframe:
>> colA colB colC colD colE
>> 1 2 3 4 5
>> 1 2 3 null null
>> 1 null null  null 5
>> null null  3 4 5
>>
>> I want to insert dataframe to nosql database, where null occupies
>> values(Cassandra). so i have to insert the column which has non-null values
>> in the row.
>>
>> Expected:
>>
>> Record 1: (1,2,3,4,5)
>> Record 2:(1,2,3)
>> Record 3:(1,5)
>> Record 4:(3,4,5)
>>
>> --
>> Selvam Raman
>> "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
>>
>


-- 
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


Need a help in row repetation

2016-09-03 Thread Selvam Raman
I have my dataset as dataframe. Using spark 1.5.0 version


cola,colb,colc,cold,cole,colf,colg,colh,coli -> columns in row

In the above column date fileds column  are (colc,colf,colh,coli).

scenario:((colc -2016,colf -2016,colh -2016,coli -2016)
if all the  year are same, no need of any logic. just remains same record.


scenario:((colc -2016,colf -2017,colh -2016,coli -2018) -> unque values are
2016,2017,2018
if all the year(in date fields) are different then we need repeat the
record as distinct years(ie. the above column has three year so we need to
repeat the same row twice)

please give me any suggestion in terms of dataframe.



-- 
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


spark cassandra issue

2016-09-04 Thread Selvam Raman
Please help me to solve the issue.

spark-shell --packages
com.datastax.spark:spark-cassandra-connector_2.10:1.3.0 --conf
spark.cassandra.connection.host=**

val df = sqlContext.read.
 | format("org.apache.spark.sql.cassandra").
 | options(Map( "table" -> "", "keyspace" -> "***")).
 | load()
java.util.NoSuchElementException: key not found: c_table
at scala.collection.MapLike$class.default(MapLike.scala:228)
at
org.apache.spark.sql.execution.datasources.CaseInsensitiveMap.default(ddl.scala:151)
at scala.collection.MapLike$class.apply(MapLike.scala:141)
at
org.apache.spark.sql.execution.datasources.CaseInsensitiveMap.apply(ddl.scala:151)
at
org.apache.spark.sql.cassandra.DefaultSource$.TableRefAndOptions(DefaultSource.scala:120)
at
org.apache.spark.sql.cassandra.DefaultSource.createRelation(DefaultSource.scala:56)
at
org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:125)
a

-- 
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


Re: spark cassandra issue

2016-09-04 Thread Selvam Raman
its very urgent. please help me guys.

On Sun, Sep 4, 2016 at 8:05 PM, Selvam Raman  wrote:

> Please help me to solve the issue.
>
> spark-shell --packages com.datastax.spark:spark-cassandra-connector_2.10:1.3.0
> --conf spark.cassandra.connection.host=**
>
> val df = sqlContext.read.
>  | format("org.apache.spark.sql.cassandra").
>  | options(Map( "table" -> "", "keyspace" -> "***")).
>  | load()
> java.util.NoSuchElementException: key not found: c_table
> at scala.collection.MapLike$class.default(MapLike.scala:228)
> at org.apache.spark.sql.execution.datasources.
> CaseInsensitiveMap.default(ddl.scala:151)
> at scala.collection.MapLike$class.apply(MapLike.scala:141)
> at org.apache.spark.sql.execution.datasources.
> CaseInsensitiveMap.apply(ddl.scala:151)
> at org.apache.spark.sql.cassandra.DefaultSource$.
> TableRefAndOptions(DefaultSource.scala:120)
> at org.apache.spark.sql.cassandra.DefaultSource.
> createRelation(DefaultSource.scala:56)
> at org.apache.spark.sql.execution.datasources.
> ResolvedDataSource$.apply(ResolvedDataSource.scala:125)
> a
>
> --
> Selvam Raman
> "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
>



-- 
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


Re: spark cassandra issue

2016-09-04 Thread Selvam Raman
Hey Mich,

I am using the same one right now. Thanks for the reply.
import org.apache.spark.sql.cassandra._
import com.datastax.spark.connector._ //Loads implicit functions
sc.cassandraTable("keyspace name", "table name")


On Sun, Sep 4, 2016 at 8:48 PM, Mich Talebzadeh 
wrote:

> Hi Selvan.
>
> I don't deal with Cassandra but have you tried other options as described
> here
>
> https://github.com/datastax/spark-cassandra-connector/
> blob/master/doc/2_loading.md
>
> To get a Spark RDD that represents a Cassandra table, call the
> cassandraTable method on the SparkContext object.
>
> import com.datastax.spark.connector._ //Loads implicit functions
> sc.cassandraTable("keyspace name", "table name")
>
>
>
> HTH
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 4 September 2016 at 15:52, Selvam Raman  wrote:
>
>> its very urgent. please help me guys.
>>
>> On Sun, Sep 4, 2016 at 8:05 PM, Selvam Raman  wrote:
>>
>>> Please help me to solve the issue.
>>>
>>> spark-shell --packages 
>>> com.datastax.spark:spark-cassandra-connector_2.10:1.3.0
>>> --conf spark.cassandra.connection.host=**
>>>
>>> val df = sqlContext.read.
>>>  | format("org.apache.spark.sql.cassandra").
>>>  | options(Map( "table" -> "", "keyspace" -> "***")).
>>>  | load()
>>> java.util.NoSuchElementException: key not found: c_table
>>> at scala.collection.MapLike$class.default(MapLike.scala:228)
>>> at org.apache.spark.sql.execution.datasources.CaseInsensitiveMa
>>> p.default(ddl.scala:151)
>>> at scala.collection.MapLike$class.apply(MapLike.scala:141)
>>> at org.apache.spark.sql.execution.datasources.CaseInsensitiveMa
>>> p.apply(ddl.scala:151)
>>> at org.apache.spark.sql.cassandra.DefaultSource$.TableRefAndOpt
>>> ions(DefaultSource.scala:120)
>>> at org.apache.spark.sql.cassandra.DefaultSource.createRelation(
>>> DefaultSource.scala:56)
>>> at org.apache.spark.sql.execution.datasources.ResolvedDataSourc
>>> e$.apply(ResolvedDataSource.scala:125)
>>> a
>>>
>>> --
>>> Selvam Raman
>>> "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
>>>
>>
>>
>>
>> --
>> Selvam Raman
>> "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
>>
>
>


-- 
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


Re: spark cassandra issue

2016-09-04 Thread Selvam Raman
  at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at org.apache.spark.sql.cassandra.DataTypeConverter$.<
init>(DataTypeConverter.scala:32)
at org.apache.spark.sql.cassandra.DataTypeConverter$.<
clinit>(DataTypeConverter.scala)
at org.apache.spark.sql.cassandra.CassandraSourceRelation$$
anonfun$schema$1$$anonfun$apply$1.apply(CassandraSourceRelation.scala:58)
at org.apache.spark.sql.cassandra.CassandraSourceRelation$$
anonfun$schema$1$$anonfun$apply$1.apply(CassandraSourceRelation.scala:58)
at scala.collection.TraversableLike$$anonfun$map$
1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$
1.apply(TraversableLike.scala:244)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.
scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(
TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.sql.cassandra.CassandraSourceRelation$$
anonfun$schema$1.apply(CassandraSourceRelation.scala:58)
at org.apache.spark.sql.cassandra.CassandraSourceRelation$$
anonfun$schema$1.apply(CassandraSourceRelation.scala:58)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.sql.cassandra.CassandraSourceRelation.schema(
CassandraSourceRelation.scala:58)
at org.apache.spark.sql.execution.datasources.
LogicalRelation.(LogicalRelation.scala:37)
at org.apache.spark.sql.DataFrameReader.load(
DataFrameReader.scala:120)
at com.zebra.avp.oracle11i.OracleRepairData$.main(
OracleRepairData.scala:298)
at com.zebra.avp.oracle11i.OracleRepairData.main(
OracleRepairData.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(
NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(
DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$
deploy$SparkSubmit$$runMain(SparkSubmit.scala:672)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(
SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit$.submit(
SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.types.
PrimitiveType
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)

On Sun, Sep 4, 2016 at 10:04 PM, Russell Spitzer 
wrote:

> This would also be a better question for the SCC user list :)
> https://groups.google.com/a/lists.datastax.com/forum/#!
> forum/spark-connector-user
>
> On Sun, Sep 4, 2016 at 9:31 AM Russell Spitzer 
> wrote:
>
>> https://github.com/datastax/spark-cassandra-connector/
>> blob/v1.3.1/doc/14_data_frames.md
>> In Spark 1.3 it was illegal to use "table" as a key in Spark SQL so in
>> that version of Spark the connector needed to use the option "c_table"
>>
>>
>> val df = sqlContext.read.
>>  | format("org.apache.spark.sql.cassandra").
>>  | options(Map( "c_table" -> "", "keyspace" -> "***")).
>>  | load()
>>
>>
>> On Sun, Sep 4, 2016 at 8:32 AM Mich Talebzadeh 
>> wrote:
>>
>>> and your Cassandra table is there etc?
>>>
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>&g

Cassandra timestamp to spark Date field

2016-09-05 Thread Selvam Raman
Hi All,

As per datastax report Cassandra to spark type
timestamp Long, java.util.Date, java.sql.Date, org.joda.time.DateTime

Please help me with your input.

I have a Cassandra table with 30 fields. Out of it 3 are timestamp.

I read cassandratable using sc.cassandraTable
 
[com.datastax.spark.connector.rdd.CassandraTableScanRDD[com.datastax.spark.connector.CassandraRow]
= CassandraTableScanRDD[9] at RDD at CassandraRDD.scala:15]

then I have converted to row of rdd

*val* exis_repair_fact = sqlContext.createDataFrame(rddrepfact.map(r =>
org.apache.spark.sql.Row.fromSeq(r.columnValues)),schema)

in schema fields I have mentioned timestamp as

*StructField*("shipped_datetime", *DateType*),


when I try to show the result, it throws java.util.Date can not convert to
java.sql.Date.


so how can I solve the issue.


First I have converted cassandrascanrdd to



-- 
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


Spark Checkpoint for JDBC/ODBC

2016-09-06 Thread Selvam Raman
Hi,

Need your input to take decision.

We have an n-number of databases(ie oracle, MySQL,etc). I want to read a
data from the sources but how it is maintaining fault tolerance in source
side.

if source side system went down. how the spark system reads the data.

-- 
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


Spark CSV skip lines

2016-09-10 Thread Selvam Raman
Hi,

I am using spark csv to read csv file. The issue is my files first n lines
contains some report and followed by actual data (header and rest of the
data).

So how can i skip first n lines in spark csv. I dont have any specific
comment character in the first byte.

Please give me some idea.

-- 
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


Re: Spark CSV skip lines

2016-09-10 Thread Selvam Raman
Hi,

I saw this two option already anyway thanks for the idea.

i am using wholetext file to read my data(cause there are  \n middle of it)
and using opencsv to parse the data. In my data first two lines are just
some report. how can i eliminate.

*How to eliminate first two lines after reading from wholetextfiles.*

val test = wholeTextFiles.flatMap{ case (_, txt) =>
 | val reader = new CSVReader(new StringReader(txt));
 | reader.readAll().map(data =>
Row(data(3),data(4),data(7),data(9),data(14)))}

The above code throws arrayoutofbounce exception for empty line and report
line.


On Sat, Sep 10, 2016 at 3:02 PM, Hyukjin Kwon  wrote:

> Hi Selvam,
>
> If your report is commented with any character (e.g. #), you can skip
> these lines via comment option [1].
>
> If you are using Spark 1.x, then you might be able to do this by manually
> skipping from the RDD and then making this to DataFrame as below:
>
> I haven’t tested this but I think this should work.
>
> val rdd = sparkContext.textFile("...")
> val filteredRdd = rdd.mapPartitionsWithIndex { (idx, iter) =>
>   if (idx == 0) {
> iter.drop(10)
>   } else {
> iter
>   }
> }
> val df = new CsvParser().csvRdd(sqlContext, filteredRdd)
>
> If you are using Spark 2.0, then it seems there is no way to manually
> modifying the source data because loading existing RDD or DataSet[String]
> to DataFrame is not yet supported.
>
> There is an issue open[2]. I hope this is helpful.
>
> Thanks.
>
> [1] https://github.com/apache/spark/blob/27209252f09ff73c58e60c6df8aaba
> 73b308088c/sql/core/src/main/scala/org/apache/spark/sql/
> DataFrameReader.scala#L369
> [2] https://issues.apache.org/jira/browse/SPARK-15463
>
>
> ​
>
>
> On 10 Sep 2016 6:14 p.m., "Selvam Raman"  wrote:
>
>> Hi,
>>
>> I am using spark csv to read csv file. The issue is my files first n
>> lines contains some report and followed by actual data (header and rest of
>> the data).
>>
>> So how can i skip first n lines in spark csv. I dont have any specific
>> comment character in the first byte.
>>
>> Please give me some idea.
>>
>> --
>> Selvam Raman
>> "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
>>
>


-- 
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


Spark S3

2016-10-10 Thread Selvam Raman
Hi,

How spark reads data from s3 and runs parallel task.

Assume I have a s3 bucket size of 35 GB( parquet file).

How the sparksession will read the data and process the data parallel. How
it splits the s3 data and assign to each executor task.

​Please share me your points.

Note:
if we have RDD , then we can look at the partitions.size or length to check
how many partition for a file. But how this will be accomplished in terms
of S3 bucket.​

-- 
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


Re: Spark S3

2016-10-10 Thread Selvam Raman
I mentioned parquet as input format.
On Oct 10, 2016 11:06 PM, "ayan guha"  wrote:

> It really depends on the input format used.
> On 11 Oct 2016 08:46, "Selvam Raman"  wrote:
>
>> Hi,
>>
>> How spark reads data from s3 and runs parallel task.
>>
>> Assume I have a s3 bucket size of 35 GB( parquet file).
>>
>> How the sparksession will read the data and process the data parallel.
>> How it splits the s3 data and assign to each executor task.
>>
>> ​Please share me your points.
>>
>> Note:
>> if we have RDD , then we can look at the partitions.size or length to
>> check how many partition for a file. But how this will be accomplished in
>> terms of S3 bucket.​
>>
>> --
>> Selvam Raman
>> "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
>>
>


Spark-Sql 2.0 nullpointerException

2016-10-12 Thread Selvam Raman
ssPetDB.main(ProcessPetDB.java:46)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:497)

at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:736)

at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)

at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)

at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)

at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Caused by: java.lang.NullPointerException

at
org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:112)

at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:110)

at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:582)

at com.elsevier.datasearch.ExecuteSQL.executeQuery(ExecuteSQL.java:11)

at com.elsevier.datasearch.ProcessPetDB$1.call(ProcessPetDB.java:53)

at com.elsevier.datasearch.ProcessPetDB$1.call(ProcessPetDB.java:1)

at org.apache.spark.sql.Dataset$$anonfun$foreach$2.apply(Dataset.scala:2118)

at org.apache.spark.sql.Dataset$$anonfun$foreach$2.apply(Dataset.scala:2118)

at scala.collection.Iterator$class.foreach(Iterator.scala:893)

at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)

at
org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$27.apply(RDD.scala:894)

at
org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$27.apply(RDD.scala:894)

at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1916)

at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1916)

at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)

at org.apache.spark.scheduler.Task.run(Task.scala:86)

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:745)

16/10/12 15:59:53 INFO SparkContext: Invoking stop() from shutdown hook
​
​Please let me know if i am missing anything. Thank you for the help.​

-- 
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


Re: Spark-Sql 2.0 nullpointerException

2016-10-12 Thread Selvam Raman
What i am trying to achieve is

Trigger query to get number(i.e.,1,2,3,...n)
for every number i have to trigger another 3 queries.

Thanks,
selvam R

On Wed, Oct 12, 2016 at 4:10 PM, Selvam Raman  wrote:

> Hi ,
>
> I am reading  parquet file and creating temp table. when i am trying to
> execute query outside of foreach function it is working fine.
> throws nullpointerexception within data frame.foreach function.
>
> code snippet:
>
> String CITATION_QUERY = "select c.citation_num, c.title, c.publisher from
> test c";
>
> Dataset citation_query = spark.sql(CITATION_QUERY);
>
> System.out.println("mistery:"+citation_query.count());
>
>
> // Iterator iterofresulDF = resultDF.toLocalIterator();
>
>
> resultDF.foreach(new ForeachFunction()
>
> {
>
> private static final long serialVersionUID = 1L;
>
> public void call(Row line)
>
> {
>
> Dataset row = spark.sql(CITATION_QUERY);
>
> System.out.println("mistery row count:"+row.count());
>
> }
>
> });
>
>
> ​Error trace:
>
> 16/10/12 15:59:53 INFO CodecPool: Got brand-new decompressor [.snappy]
>
> 16/10/12 15:59:53 ERROR Executor: Exception in task 0.0 in stage 5.0 (TID
> 5)
>
> java.lang.NullPointerException
>
> at org.apache.spark.sql.SparkSession.sessionState$
> lzycompute(SparkSession.scala:112)
>
> at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:110)
>
> at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:582)
>
> at com.elsevier.datasearch.ExecuteSQL.executeQuery(ExecuteSQL.java:11)
>
> at com.elsevier.datasearch.ProcessPetDB$1.call(ProcessPetDB.java:53)
>
> at com.elsevier.datasearch.ProcessPetDB$1.call(ProcessPetDB.java:1)
>
> at org.apache.spark.sql.Dataset$$anonfun$foreach$2.apply(
> Dataset.scala:2118)
>
> at org.apache.spark.sql.Dataset$$anonfun$foreach$2.apply(
> Dataset.scala:2118)
>
> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
>
> at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$
> apply$27.apply(RDD.scala:894)
>
> at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$
> apply$27.apply(RDD.scala:894)
>
> at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(
> SparkContext.scala:1916)
>
> at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(
> SparkContext.scala:1916)
>
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
>
> at org.apache.spark.scheduler.Task.run(Task.scala:86)
>
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
>
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
>
> at java.lang.Thread.run(Thread.java:745)
>
>
>
>
> Driver stacktrace:
>
> at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$
> scheduler$DAGScheduler$$failJobAndIndependentStages(
> DAGScheduler.scala:1454)
>
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(
> DAGScheduler.scala:1442)
>
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(
> DAGScheduler.scala:1441)
>
> at scala.collection.mutable.ResizableArray$class.foreach(
> ResizableArray.scala:59)
>
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>
> at org.apache.spark.scheduler.DAGScheduler.abortStage(
> DAGScheduler.scala:1441)
>
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$
> handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
>
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$
> handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
>
> at scala.Option.foreach(Option.scala:257)
>
> at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(
> DAGScheduler.scala:811)
>
> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.
> doOnReceive(DAGScheduler.scala:1667)
>
> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.
> onReceive(DAGScheduler.scala:1622)
>
> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.
> onReceive(DAGScheduler.scala:1611)
>
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>
> at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
>
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1890)
>
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1903)
>
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1916)
>
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1930)
>
> at org.apache.spark.rdd.RDD$$anonfun$fore

PostgresSql queries vs spark sql

2016-10-17 Thread Selvam Raman
Hi,

Please share me some idea if you work on this earlier.
How can i develop postgres CROSSTAB function in spark.

Postgres Example

Example 1:

SELECT mthreport.*
FROM
*crosstab*('SELECT i.item_name::text As row_name,
to_char(if.action_date, ''mon'')::text As bucket,
SUM(if.num_used)::integer As bucketvalue
FROM inventory As i INNER JOIN inventory_flow As if
ON i.item_id = if.item_id
  AND action_date BETWEEN date ''2007-01-01'' and date ''2007-12-31 
23:59''
GROUP BY i.item_name, to_char(if.action_date, ''mon''),
date_part(''month'', if.action_date)
ORDER BY i.item_name',
'SELECT to_char(date ''2007-01-01'' + (n || '' month'')::interval,
''mon'') As short_mname
FROM generate_series(0,11) n')
As mthreport(item_name text, jan integer, feb integer, mar 
integer,
apr integer, may integer, jun integer, jul integer,
aug integer, sep integer, oct integer, nov integer,
dec integer)

The output of the above crosstab looks as follows:
[image: crosstab source_sql cat_sql example]

Example 2:

CREATE TABLE ct(id SERIAL, rowid TEXT, attribute TEXT, value TEXT);
INSERT INTO ct(rowid, attribute, value) VALUES('test1','att1','val1');
INSERT INTO ct(rowid, attribute, value) VALUES('test1','att2','val2');
INSERT INTO ct(rowid, attribute, value) VALUES('test1','att3','val3');
INSERT INTO ct(rowid, attribute, value) VALUES('test1','att4','val4');
INSERT INTO ct(rowid, attribute, value) VALUES('test2','att1','val5');
INSERT INTO ct(rowid, attribute, value) VALUES('test2','att2','val6');
INSERT INTO ct(rowid, attribute, value) VALUES('test2','att3','val7');
INSERT INTO ct(rowid, attribute, value) VALUES('test2','att4','val8');

SELECT *
FROM crosstab(
  'select rowid, attribute, value
   from ct
   where attribute = ''att2'' or attribute = ''att3''
   order by 1,2')
AS ct(row_name text, category_1 text, category_2 text, category_3 text);

 row_name | category_1 | category_2 | category_3
--+++
 test1| val2   | val3   |
 test2| val6   | val7   |


-- 
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


Spark SQL parallelize

2016-10-20 Thread Selvam Raman
Hi,

I am having 40+ structured data stored in s3 bucket as parquet file .

I am going to use 20 table in the use case.

There s a Main table which drive the whole flow. Main table contains 1k
record.

My use case is for every record in the main table process the rest of
table( join group by depends on main table field).

How can I parallel the process.

What I done was read the main table and create tocaliterator for df then do
the rest of the processing.
This one run one by one record.

Please share me your ideas.

Thank you.


Re: PostgresSql queries vs spark sql

2016-10-23 Thread Selvam Raman
I found it. We can use pivot which is similar to cross tab
In postgres.

Thank you.
On Oct 17, 2016 10:00 PM, "Selvam Raman"  wrote:

> Hi,
>
> Please share me some idea if you work on this earlier.
> How can i develop postgres CROSSTAB function in spark.
>
> Postgres Example
>
> Example 1:
>
> SELECT mthreport.*
>   FROM
>   *crosstab*('SELECT i.item_name::text As row_name, 
> to_char(if.action_date, ''mon'')::text As bucket,
>   SUM(if.num_used)::integer As bucketvalue
>   FROM inventory As i INNER JOIN inventory_flow As if
>   ON i.item_id = if.item_id
> AND action_date BETWEEN date ''2007-01-01'' and date ''2007-12-31 
> 23:59''
>   GROUP BY i.item_name, to_char(if.action_date, ''mon''), 
> date_part(''month'', if.action_date)
>   ORDER BY i.item_name',
>   'SELECT to_char(date ''2007-01-01'' + (n || '' month'')::interval, 
> ''mon'') As short_mname
>   FROM generate_series(0,11) n')
>   As mthreport(item_name text, jan integer, feb integer, mar 
> integer,
>   apr integer, may integer, jun integer, jul integer,
>   aug integer, sep integer, oct integer, nov integer,
>   dec integer)
>
> The output of the above crosstab looks as follows:
> [image: crosstab source_sql cat_sql example]
>
> Example 2:
>
> CREATE TABLE ct(id SERIAL, rowid TEXT, attribute TEXT, value TEXT);
> INSERT INTO ct(rowid, attribute, value) VALUES('test1','att1','val1');
> INSERT INTO ct(rowid, attribute, value) VALUES('test1','att2','val2');
> INSERT INTO ct(rowid, attribute, value) VALUES('test1','att3','val3');
> INSERT INTO ct(rowid, attribute, value) VALUES('test1','att4','val4');
> INSERT INTO ct(rowid, attribute, value) VALUES('test2','att1','val5');
> INSERT INTO ct(rowid, attribute, value) VALUES('test2','att2','val6');
> INSERT INTO ct(rowid, attribute, value) VALUES('test2','att3','val7');
> INSERT INTO ct(rowid, attribute, value) VALUES('test2','att4','val8');
>
> SELECT *
> FROM crosstab(
>   'select rowid, attribute, value
>from ct
>where attribute = ''att2'' or attribute = ''att3''
>order by 1,2')
> AS ct(row_name text, category_1 text, category_2 text, category_3 text);
>
>  row_name | category_1 | category_2 | category_3
> --+++
>  test1| val2   | val3   |
>  test2| val6   | val7   |
>
>
> --
> Selvam Raman
> "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
>


Spark Sql 2.0 throws null pointer exception

2016-10-24 Thread Selvam Raman
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1437)

at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)

at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)

at scala.Option.foreach(Option.scala:257)

at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)

at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1659)

at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618)

at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607)

at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)

at org.apache.spark.SparkContext.runJob(SparkContext.scala:1871)

at org.apache.spark.SparkContext.runJob(SparkContext.scala:1884)

at org.apache.spark.SparkContext.runJob(SparkContext.scala:1897)

at org.apache.spark.SparkContext.runJob(SparkContext.scala:1911)

at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:883)

at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:881)

at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)

at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)

at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)

at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:881)

at
org.apache.spark.api.java.JavaRDDLike$class.foreachPartition(JavaRDDLike.scala:218)

at
org.apache.spark.api.java.AbstractJavaRDDLike.foreachPartition(JavaRDDLike.scala:45)

at com.elsevier.datasearch.CitationTest.main(CitationTest.java:108)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:729)

at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)

at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)

at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)

at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Caused by: java.lang.NullPointerException

at
org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:112)

at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:110)

at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:582)

at com.elsevier.datasearch.CitationTest$1.call(CitationTest.java:124)

at com.elsevier.datasearch.CitationTest$1.call(CitationTest.java:1)

at
org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:218)

at
org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:218)

at
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:883)

at
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:883)

at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1897)

at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1897)

at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)

at org.apache.spark.scheduler.Task.run(Task.scala:85)

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)​

-- 
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


Re: Spark Sql 2.0 throws null pointer exception

2016-10-24 Thread Selvam Raman
​Why i could not able to access sparksession instance within
foreachpartition(i have created sparksession instance within main fucntion).
spark.sql("select 1").count or any sql queries which return within
foreachpartition throws nullpointer exception.
please give me some idea if you have faced the problem earlier.

Thanks,
Selvam R​

On Mon, Oct 24, 2016 at 10:23 AM, Selvam Raman  wrote:

> Hi All,
>
> Please help me.
>
> I have 10 (tables data) parquet file in s3.
>
> I am reading and storing as Dataset then registered as temp table.
>
> One table driving whole flow so i am doing below.(When i am triggering
> query from
>
>
> Code Base:
>
> SparkSession spark = SparkSession.builder().appName("Test").getOrCreate();
>
> Dataset citationDF = spark.read().parquet("s3://...")
>
> ...
>
> ...
>
> citationDF.createOrReplaceTempView("citation");
>
> ...
>
> 
>
> cit_num.javaRDD().foreachPartition(new VoidFunction>()
>
> {
>
>   /**
>
> *
>
> */
>
> private static final long serialVersionUID = 1L;
>
>
> @Override
>
>   public void call(Iterator iter)
>
>   {
>
> while (iter.hasNext())
>
> {
>
>   Row record=iter.next();
>
>   int citation_num=record.getInt(0);
>
>   String ci_query="select queries ";//(i can execute this
> query outside of foreach)
>
>   System.out.println("citation num:"+citation_num+" count:"+spark
> .sql(ci_query).count());
>
>   accum.add(1);
>
>   System.out.println("accumulator count:"+accum);
>
> }
>
>   }
>
> });
> ​Error:
>
> 16/10/24 09:08:12 WARN TaskSetManager: Lost task 1.0 in stage 30.0 (TID
> 83, ip-10-95-36-172.dev): java.lang.NullPointerException
>
> at org.apache.spark.sql.SparkSession.sessionState$
> lzycompute(SparkSession.scala:112)
>
> at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:110)
>
> at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:582)
>
> at com.elsevier.datasearch.CitationTest$1.call(CitationTest.java:124)
>
> at com.elsevier.datasearch.CitationTest$1.call(CitationTest.java:1)
>
> at org.apache.spark.api.java.JavaRDDLike$$anonfun$
> foreachPartition$1.apply(JavaRDDLike.scala:218)
>
> at org.apache.spark.api.java.JavaRDDLike$$anonfun$
> foreachPartition$1.apply(JavaRDDLike.scala:218)
>
> at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$
> anonfun$apply$28.apply(RDD.scala:883)
>
> at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$
> anonfun$apply$28.apply(RDD.scala:883)
>
> at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(
> SparkContext.scala:1897)
>
> at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(
> SparkContext.scala:1897)
>
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
>
> at org.apache.spark.scheduler.Task.run(Task.scala:85)
>
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
>
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
>
>
>
> 16/10/24 09:08:12 INFO YarnScheduler: Stage 30 was cancelled
>
> 16/10/24 09:08:12 INFO DAGScheduler: ResultStage 30 (foreachPartition at
> CitationTest.java:108) failed in 0.421 s
>
> 16/10/24 09:08:12 INFO DAGScheduler: Job 23 failed: foreachPartition at
> CitationTest.java:108, took 0.578050 s
>
> Exception in thread "main" org.apache.spark.SparkException: Job aborted
> due to stage failure: Task 6 in stage 30.0 failed 4 times, most recent
> failure: Lost task 6.3 in stage 30.0 (TID 99, ip-dev):
> java.lang.NullPointerException
>
> at org.apache.spark.sql.SparkSession.sessionState$
> lzycompute(SparkSession.scala:112)
>
> at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:110)
>
> at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:582)
>
> at com.elsevier.datasearch.CitationTest$1.call(CitationTest.java:124)
>
> at com.elsevier.datasearch.CitationTest$1.call(CitationTest.java:1)
>
> at org.apache.spark.api.java.JavaRDDLike$$anonfun$
> foreachPartition$1.apply(JavaRDDLike.scala:218)
>
> at org.apache.spark.api.java.JavaRDDLike$$anonfun$
> foreachPartition$1.apply(JavaRDDLike.scala:218)
>
> at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$
> anonfun$apply$28.apply(RDD.scala:883)
>
> at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$
> anonfun$apply$

Spark Sql - "broadcast-exchange-1" java.lang.OutOfMemoryError: Java heap space

2016-10-25 Thread Selvam Raman
Hi,

Need a help to figure out and solve heap space problem.

I have query which contains 15+ table and when i trying to print out the
result(Just 23 rows) it throws heap space error.

Following command i tried in standalone mode:
(My mac machine having 8 core and 15GB ram)

spark.conf().set("spark.sql.shuffle.partitions", 20);

./spark-submit --master spark://selva:7077 --executor-memory 2g
--total-executor-cores 4 --class MemIssue --conf
'spark.executor.extraJavaOptions=-XX:+UseG1GC -XX:+UseG1GC
-XX:+PrintFlagsFinal -XX:+PrintReferenceGC -verbose:gc -XX:+PrintGCDetails
-XX:+PrintGCTimeStamps -XX:+PrintAdaptiveSizePolicy
-XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark'
/Users/rs/Desktop/test.jar

This is my below query:

select concat(sf1.scode, ''-'', m.mcode, ''-'', rf.rnum) , sf1.scode ,
concat(p.lname,'', '',ci.pyear), at.atext Alias, m.mcode Method, mt.mcode,
v.vname, nd.vmeas " +

" from  result r " +

"  join  var v on v.vnum = r.vnum " +

"  join  numa nd on nd.rnum = r.num " +

"  join  feat  fa on fa.fnum = r.fnum " +

"  join  samp  sf1 on sf1.snum = fa.snum " +

"  join  spe  sp on sf1.snum = sp.snum and sp.mnum not in
(1,2)" +

"  join  act  a on a.anum = fa.anum " +

"  join  met  m on m.mnum = a.mnum " +

"  join  sampl  sfa on sfa.snum = sf1.snum " +

"  join  ann  at on at.anum = sfa.anum AND at.atypenum = 11 " +

"  join  data  dr on r.rnum = dr.rnum " +

"  join  cit  cd on dr.dnum = cd.dnum " +

"  join  cit  on cd.cnum = ci.cnum " +

"  join  aut  al on ci.cnum = al.cnum and al.aorder = 1 " +

"  join  per  p on al.pnum = p.pnum " +

"  left join  rel  rf on sf1.snum = rf.snum " +

"  left join  samp sf2 on rf.rnum = sf2.snum " +

"  left join  spe  s on s.snum = sf1.snum " +

"  left join  mat  mt on mt.mnum = s.mnum " +

" where sf1.sampling_feature_code = '1234test''" +

" order by 1,2


spark.sql(query).show


When i checked wholstagecode, first it reads all data from the table. Why
it is reading all the data from table and doing sort merge join for 3 or 4
tables. Why it is not applying any filtering value.


Though i have given large memory for executor it is still throws the same
error. when spark sql do the joining how it is utilizing memory and cores.

Any guidelines would be greatly welcome.
-- 
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


How Spark determines Parquet partition size

2016-11-08 Thread Selvam Raman
Hi,

Can you please tell me how parquet partitions the data while saving the
dataframe.

I have a dataframe which contains 10 values like below

++

|field_num|

++

| 139|

| 140|

|  40|

|  41|

| 148|

| 149|

| 151|

| 152|

| 153|

| 154|

++


df.write.partitionBy("field_num").parquet("/Users/rs/parti/")

it saves the file like (field_num=140,.filed_num=154)..


when i try the below command it gives 5.

scala> spark.read.parquet("file:///Users/rs/parti").rdd.partitions.length

res4: Int = 5


​so how does parquet partitioning the data in spark?​


-- 
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


Re: Dataframe broadcast join hint not working

2016-11-26 Thread Selvam Raman
Hi,

Which version of spark you are using.

Less than 10Mb automatically converted as broadcast join in spark.

\Thanks,
selvam R

On Sat, Nov 26, 2016 at 6:51 PM, Swapnil Shinde 
wrote:

> Hello
> I am trying a broadcast join on dataframes but it is still doing
> SortMergeJoin. I even try setting spark.sql.autoBroadcastJoinThreshold
> higher but still no luck.
>
> Related piece of code-
>   val c = a.join(braodcast(b), "id")
>
> On a side note, if I do SizeEstimator.estimate(b) and it is really
> high(460956584 bytes) compared to data it contains. b has just 85 rows and
> around 4964 bytes.
> Help is very much appreciated!!
>
> Thanks
> Swapnil
>
>
>


-- 
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


Java Collections.emptyList inserted as null object in cassandra

2016-11-29 Thread Selvam Raman
Filed Type in cassandra : List

I am trying to insert  Collections.emptyList() from spark to cassandra
list field. In cassandra it stores as null object.

How can i avoid null values here.

-- 
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


Spark Job not exited and shows running

2016-11-29 Thread Selvam Raman
Hi,

I have submitted spark job in yarn client mode. The executor and cores were
dynamically allocated. In the job i have 20 partitions, so 5 container each
with 4 core has been submitted. It almost processed all the records but it
never exit the job and in the application master container i am seeing the
below error message.

 INFO yarn.YarnAllocator: Canceling requests for 0 executor containers
 WARN yarn.YarnAllocator: Expected to find pending requests, but found none.



​The same job i ran it for only 1000 records which successfully finished. ​

Can anyone help me to sort out this issue.

Spark version:2.0( AWS EMR).

-- 
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


Re: Spark Job not exited and shows running

2016-12-01 Thread Selvam Raman
Hi,

I have run the job in cluster mode as well. The job is not ending. After
sometime the container just do nothing but it shows running.

In my code, every record has been inserted into solr and cassandra as well.
When i ran it only for solr the job completed successfully. Still i did not
test cassandra part. Will check and update.

does anyone have faced this issue earlier.

I added sparsession.stop after foreachpartition ends.


My code overview:

SparkSession
read parquet file(20 partition- roughly 90k records)
foreachpartition
  every record do some compution
  insert into cassandra(  i am using insert command )
 index into solr

stop the sparksession
exit the code.




Thanks,
selvam R

On Thu, Dec 1, 2016 at 7:03 AM, Daniel van der Ende <
daniel.vandere...@gmail.com> wrote:

> Hi,
>
> I've seen this a few times too. Usually it indicates that your driver
> doesn't have enough resources to process the result. Sometimes increasing
> driver memory is enough (yarn memory overhead can also help). Is there any
> specific reason for you to run in client mode and not in cluster mode?
> Having run into this a number of times (and wanting to spare the resources
> of our submitting machines) we have now switched to use yarn cluster mode
> by default. This seems to resolve the problem.
>
> Hope this helps,
>
> Daniel
>
> On 29 Nov 2016 11:20 p.m., "Selvam Raman"  wrote:
>
>> Hi,
>>
>> I have submitted spark job in yarn client mode. The executor and cores
>> were dynamically allocated. In the job i have 20 partitions, so 5 container
>> each with 4 core has been submitted. It almost processed all the records
>> but it never exit the job and in the application master container i am
>> seeing the below error message.
>>
>>  INFO yarn.YarnAllocator: Canceling requests for 0 executor containers
>>  WARN yarn.YarnAllocator: Expected to find pending requests, but found none.
>>
>>
>>
>> ​The same job i ran it for only 1000 records which successfully finished.
>> ​
>>
>> Can anyone help me to sort out this issue.
>>
>> Spark version:2.0( AWS EMR).
>>
>> --
>> Selvam Raman
>> "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
>>
>


-- 
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


Spark Batch checkpoint

2016-12-15 Thread Selvam Raman
Hi,

is there any provision in spark batch for checkpoint.

I am having huge data, it takes more than 3 hours to process all data. I am
currently having 100 partitions.

if the job fails after two hours, lets say it has processed 70 partition.
should i start spark job from the beginning or is there way for checkpoint
provision.

Checkpoint,what i am expecting is start from 71 partition to till end.

Please give me your suggestions.

-- 
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


Re: Spark Batch checkpoint

2016-12-15 Thread Selvam Raman
I am using java. I will try and let u know.
On Dec 15, 2016 8:45 PM, "Irving Duran"  wrote:

> Not sure what programming language you are using, but in python you can do
> "sc.setCheckpointDir('~/apps/spark-2.0.1-bin-hadoop2.7/checkpoint/')".
> This will store checkpoints on that directory that I called checkpoint.
>
>
> Thank You,
>
> Irving Duran
>
> On Thu, Dec 15, 2016 at 10:33 AM, Selvam Raman  wrote:
>
>> Hi,
>>
>> is there any provision in spark batch for checkpoint.
>>
>> I am having huge data, it takes more than 3 hours to process all data. I
>> am currently having 100 partitions.
>>
>> if the job fails after two hours, lets say it has processed 70 partition.
>> should i start spark job from the beginning or is there way for checkpoint
>> provision.
>>
>> Checkpoint,what i am expecting is start from 71 partition to till end.
>>
>> Please give me your suggestions.
>>
>> --
>> Selvam Raman
>> "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
>>
>
>


Re: Spark Batch checkpoint

2016-12-16 Thread Selvam Raman
Hi,

Acutally my requiremnt is read the parquet file which is 100 partition.
Then i use foreachpartition to read the data and process it.

My sample code

public static void main(String[] args) {


SparkSession sparkSession = SparkSession.builder().appName("checkpoint
verification").getOrCreate();

sparkSession.implicits();

sparkSession.sparkContext().setCheckpointDir("Checkpoint/Dec16");

Dataset sampleData=sparkSession.read().parquet("filepath");

sampleData.foreachPartition(new ForeachPartitionFunction(){


/**

*

*/

private static final long serialVersionUID = 1L;


@Override

public void call(Iterator row) throws Exception

{


while(row.hasNext())

{

//Process data and insert into No-Sql DB

}

}

});

}

}



Now where can i apply rdd.checkpoint().



Thanks,

selvam



On Thu, Dec 15, 2016 at 10:44 PM, Selvam Raman  wrote:

> I am using java. I will try and let u know.
> On Dec 15, 2016 8:45 PM, "Irving Duran"  wrote:
>
>> Not sure what programming language you are using, but in python you can
>> do "sc.setCheckpointDir('~/apps/spark-2.0.1-bin-hadoop2.7/checkpoint/')".
>> This will store checkpoints on that directory that I called checkpoint.
>>
>>
>> Thank You,
>>
>> Irving Duran
>>
>> On Thu, Dec 15, 2016 at 10:33 AM, Selvam Raman  wrote:
>>
>>> Hi,
>>>
>>> is there any provision in spark batch for checkpoint.
>>>
>>> I am having huge data, it takes more than 3 hours to process all data. I
>>> am currently having 100 partitions.
>>>
>>> if the job fails after two hours, lets say it has processed 70
>>> partition. should i start spark job from the beginning or is there way for
>>> checkpoint provision.
>>>
>>> Checkpoint,what i am expecting is start from 71 partition to till end.
>>>
>>> Please give me your suggestions.
>>>
>>> --
>>> Selvam Raman
>>> "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
>>>
>>
>>


-- 
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


Spark dump in slave Node EMR

2016-12-16 Thread Selvam Raman
Hi,

how can i take heap dump in EMR slave node to analyze.

I have one master and two slave.

if i enter jps command in Master, i could see sparksubmit with pid.

But i could not see anything in slave node.

how can i take heap dump for spark job.

-- 
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


Re: Spark dump in slave Node EMR

2016-12-16 Thread Selvam Raman
If i want to take specifically for the task number which got failed. is it
possible to take heap dump.


"16/12/16 12:25:54 WARN YarnSchedulerBackend$YarnSchedulerEndpoint:
Container killed by YARN for exceeding memory limits. 20.0 GB of 19.8 GB
physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.

16/12/16 12:25:54 ERROR YarnClusterScheduler: Lost executor 1 on
ip-.dev: Container killed by YARN for exceeding memory limits. 20.0 GB
of 19.8 GB physical memory used. Consider boosting
spark.yarn.executor.memoryOverhead.
16/12/16 12:25:55 WARN TaskSetManager: Lost task 7.0 in stage 1.0 (TID
9, ip.dev): ExecutorLostFailure (executor 1 exited caused by one of
the running tasks) Reason: Container killed by YARN for exceeding
memory limits. 20.0 GB of 19.8 GB physical memory used. Consider
boosting spark.yarn.executor.memoryOverhead.
16/12/16 12:25:55 INFO BlockManagerMasterEndpoint: Trying to remove
executor 1 from BlockManagerMaster.
16/12/16 12:25:55 INFO BlockManagerMaster: Removal of executor 1 requested
16/12/16 12:25:55 INFO YarnSchedulerBackend$YarnDriverEndpoint: Asked
to remove non-existent executor 1

"

thanks,
selvam R

On Fri, Dec 16, 2016 at 12:30 PM, Selvam Raman  wrote:

> Hi,
>
> how can i take heap dump in EMR slave node to analyze.
>
> I have one master and two slave.
>
> if i enter jps command in Master, i could see sparksubmit with pid.
>
> But i could not see anything in slave node.
>
> how can i take heap dump for spark job.
>
> --
> Selvam Raman
> "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
>



-- 
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


Reading xls and xlsx files

2016-12-19 Thread Selvam Raman
Hi,

Is there a way to read xls and xlsx files using spark?.

is there any hadoop inputformat available to read xls and xlsx files which
could be used in spark?

-- 
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


Writing into parquet throws Array out of bounds exception

2016-12-21 Thread Selvam Raman
Hi,

When i am trying to write dataset to parquet or to show(1,fase), my job
throws array out of bounce exception.

16/12/21 12:38:50 WARN TaskSetManager: Lost task 7.0 in stage 36.0 (TID 81,
ip-10-95-36-69.dev): java.lang.ArrayIndexOutOfBoundsException: 63

at
org.apache.spark.unsafe.types.UTF8String.numBytesForFirstByte(UTF8String.java:156)

at org.apache.spark.unsafe.types.UTF8String.indexOf(UTF8String.java:565)

at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
Source)

at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)

at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)

at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)

at
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:147)

at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)

at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)

at org.apache.spark.scheduler.Task.run(Task.scala:85)

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:745)


16/12/21 12:38:50 INFO TaskSetManager: Starting task 7.1 in stage 36.0 (TID
106, ip-10-95-36-70.dev, partition 7, RACK_LOCAL, 6020 bytes)

16/12/21 12:38:50 INFO YarnSchedulerBackend$YarnDriverEndpoint: Launching
task 106 on executor id: 4 hostname: ip-10-95-36-70.dev.

16/12/21 12:38:50 WARN TaskSetManager: Lost task 4.0 in stage 36.0 (TID 78,
ip-10-95-36-70.dev): java.lang.ArrayIndexOutOfBoundsException: 62

at
org.apache.spark.unsafe.types.UTF8String.numBytesForFirstByte(UTF8String.java:156)

at org.apache.spark.unsafe.types.UTF8String.indexOf(UTF8String.java:565)

at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
Source)

at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)

at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)

at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)

at
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:147)

at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)

at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)

at org.apache.spark.scheduler.Task.run(Task.scala:85)

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:745)


In my dataset there is one column which is longblob, if i convert to
unbase64. I face this problem. i could able to write to parquet without
conversion.


So is there some limit for bytes per line?. Please give me your suggestion.

-- 
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


is it possible to read .mdb file in spark

2017-01-25 Thread Selvam Raman
-- 
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


how to read object field within json file

2017-03-23 Thread Selvam Raman
Hi,

{
"id": "test1",
"source": {
"F1": {
  "id": "4970",
  "eId": "F1",
  "description": "test1",
},
"F2": {
  "id": "5070",
  "eId": "F2",
  "description": "test2",
},
"F3": {
  "id": "5170",
  "eId": "F3",
  "description": "test3",
},
"F4":{}
  etc..
  "F999":{}
}

I am having bzip json files like above format.
some json row contains two objects within source(like F1 and F2), sometime
five(F1,F2,F3,F4,F5),etc. So the final schema will contains combination of
all objects for the source field.

Now, every row will contain n number of objects but only some contains
valid records.
how can i retreive the value of "description" in "source" field.

source.F1.description - returns the result but how can i get all
description result for every row..(something like this
"source.*.description").

-- 
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


Re: how to read object field within json file

2017-03-25 Thread Selvam Raman
Thank you Armbust.

On Fri, Mar 24, 2017 at 7:02 PM, Michael Armbrust 
wrote:

> I'm not sure you can parse this as an Array, but you can hint to the
> parser that you would like to treat source as a map instead of as a
> struct.  This is a good strategy when you have dynamic columns in your data.
>
> Here is an example of the schema you can use to parse this JSON and also
> how to use explode to turn it into separate rows
> <https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1023043053387187/679071429109042/2840265927289860/latest.html>.
> This blog post has more on working with semi-structured data in Spark
> <https://databricks.com/blog/2017/02/23/working-complex-data-formats-structured-streaming-apache-spark-2-1.html>
> .
>
> On Thu, Mar 23, 2017 at 2:49 PM, Yong Zhang  wrote:
>
>> That's why your "source" should be defined as an Array[Struct] type
>> (which makes sense in this case, it has an undetermined length  , so you
>> can explode it and get the description easily.
>>
>> Now you need write your own UDF, maybe can do what you want.
>>
>> Yong
>>
>> --
>> *From:* Selvam Raman 
>> *Sent:* Thursday, March 23, 2017 5:03 PM
>> *To:* user
>> *Subject:* how to read object field within json file
>>
>> Hi,
>>
>> {
>> "id": "test1",
>> "source": {
>> "F1": {
>>   "id": "4970",
>>   "eId": "F1",
>>   "description": "test1",
>> },
>> "F2": {
>>   "id": "5070",
>>   "eId": "F2",
>>   "description": "test2",
>> },
>> "F3": {
>>   "id": "5170",
>>   "eId": "F3",
>>   "description": "test3",
>> },
>> "F4":{}
>>   etc..
>>   "F999":{}
>> }
>>
>> I am having bzip json files like above format.
>> some json row contains two objects within source(like F1 and F2),
>> sometime five(F1,F2,F3,F4,F5),etc. So the final schema will contains
>> combination of all objects for the source field.
>>
>> Now, every row will contain n number of objects but only some contains
>> valid records.
>> how can i retreive the value of "description" in "source" field.
>>
>> source.F1.description - returns the result but how can i get all
>> description result for every row..(something like this
>> "source.*.description").
>>
>> --
>> Selvam Raman
>> "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
>>
>
>


-- 
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


Convert Dataframe to Dataset in pyspark

2017-04-01 Thread Selvam Raman
In Scala,
val ds = sqlContext.read.text("/home/spark/1.6/lines").as[String]

what is the equivalent code in pyspark?

-- 
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


Update DF record with delta data in spark

2017-04-02 Thread Selvam Raman
Hi,

Table 1:(old File)

name number  salray
Test1 1 1
Test2 2 1
Table 2: (Delta File)

namenumber  salray
Test1 1 4
Test3 3 2


​i do not have date stamp field in this table. Having composite key of name
and number fields.

Expected Result

name number  salray
Test1 1 4
Test2 2 1
Test3 3 2


Current approach:

1) Delete row in table1 where table1.composite key = table2.composite key.
2) Union all table and table2 to get updated result.


is this right approach?. is there any other way to achieve it?​

-- 
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


Pyspark - pickle.PicklingError: Can't pickle

2017-04-03 Thread Selvam Raman
I ran the below code in my Standalone mode. Python version 2.7.6. Spacy
1.7+ version. Spark 2.0.1 version.

I'm new pie to pyspark. please help me to understand the below two versions
of code.

why first version run fine whereas second throws pickle.PicklingError:
Can't pickle . at 0x107e39110>.

(i was doubting that Second approach failure because it could not serialize
the object and sent it to worker).

*1) Run-Success:*

*(SpacyExample-Module)*

import spacy

nlp = spacy.load('en_default')

def spacyChunks(content):

doc = nlp(content)

mp=[]

for chunk in doc.noun_chunks:

phrase = content[chunk.start_char: chunk.end_char]

mp.append(phrase)

#print(mp)

return mp



if __name__ == '__main__':

pass


*Main-Module:*

spark = SparkSession.builder.appName("readgzip"
).config(conf=conf).getOrCreate()

gzfile = spark.read.schema(schema).json("")

...

...

textresult.rdd.map(lambda x:x[0]).\

flatMap(lambda data: SpacyExample.spacyChunks(data)).saveAsTextFile("")




*2) Run-Failure:*

*MainModule:*

nlp= spacy.load('en_default')

def spacyChunks(content):

doc = nlp(content)

mp=[]

for chunk in doc.noun_chunks:

phrase = content[chunk.start_char: chunk.end_char]

mp.append(phrase)

#print(mp)

return mp


if __name__ == '__main__'

create spraksession,read file,

file.rdd.map(..).flatmap(lambdat data:spacyChunks(data).saveAsTextFile()


Stack Trace:

  File
"/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py",
line 286, in save

f(self, obj) # Call unbound method with explicit self

  File
"/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py",
line 649, in save_dict

self._batch_setitems(obj.iteritems())

  File
"/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py",
line 681, in _batch_setitems

save(v)

  File
"/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py",
line 331, in save

self.save_reduce(obj=obj, *rv)

  File
"/Users/rs/Downloads/spark-2.0.1-bin-hadoop2.7/python/pyspark/cloudpickle.py",
line 535, in save_reduce

save(args)

  File
"/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py",
line 286, in save

f(self, obj) # Call unbound method with explicit self

  File
"/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py",
line 562, in save_tuple

save(element)

  File
"/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py",
line 286, in save

f(self, obj) # Call unbound method with explicit self

  File
"/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py",
line 649, in save_dict

self._batch_setitems(obj.iteritems())

  File
"/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py",
line 681, in _batch_setitems

save(v)

  File
"/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py",
line 317, in save

self.save_global(obj, rv)

  File
"/Users/rs/Downloads/spark-2.0.1-bin-hadoop2.7/python/pyspark/cloudpickle.py",
line 390, in save_global

raise pickle.PicklingError("Can't pickle %r" % obj)

pickle.PicklingError: Can't pickle . at
0x107e39110>

-- 
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


Spark Mlib - java.lang.OutOfMemoryError: Java heap space

2017-04-24 Thread Selvam Raman
Hi,

I have 1 master and 4 slave node. Input data size is 14GB.
Slave Node config : 32GB Ram,16 core


I am trying to train word embedding model using spark. It is going out of
memory. To train 14GB of data how much memory do i require?.


I have givem 20gb per executor but below shows it is using 11.8GB out of 20
GB.
BlockManagerInfo: Added broadcast_1_piece0 in memory on ip-.-.-.dev:35035
(size: 4.6 KB, free: 11.8 GB)


This is the code
if __name__ == "__main__":
sc = SparkContext(appName="Word2VecExample")  # SparkContext

# $example on$
inp =
sc.textFile("s3://word2vec/data/word2vec_word_data.txt/").map(lambda row:
row.split(" "))

word2vec = Word2Vec()
model = word2vec.fit(inp)

model.save(sc, "s3://pysparkml/word2vecresult2/")
sc.stop()


Spark-submit Command:
spark-submit --master yarn --conf
'spark.executor.extraJavaOptions=-XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=/mnt/tmp -XX:+UseG1GC -XX:+UseG1GC -XX:+PrintFlagsFinal
-XX:+PrintReferenceGC -verbose:gc -XX:+PrintGCDetails
-XX:+PrintGCTimeStamps -XX:+PrintAdaptiveSizePolicy
-XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark' --num-executors 4
--executor-cores 2 --executor-memory 20g Word2VecExample.py


-- 
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


Re: Spark Mlib - java.lang.OutOfMemoryError: Java heap space

2017-04-24 Thread Selvam Raman
This is where job going out of memory

17/04/24 10:09:22 INFO TaskSetManager: Finished task 122.0 in stage 1.0
(TID 356) in 4260 ms on ip-...-45.dev (124/234)
17/04/24 10:09:26 INFO BlockManagerInfo: Removed taskresult_361 on
ip-10...-185.dev:36974 in memory (size: 5.2 MB, free: 8.5 GB)
17/04/24 10:09:26 INFO BlockManagerInfo: Removed taskresult_362 on
ip-...-45.dev:40963 in memory (size: 5.2 MB, free: 8.9 GB)
17/04/24 10:09:26 INFO TaskSetManager: Finished task 125.0 in stage 1.0
(TID 359) in 4383 ms on ip-...-45.dev (125/234)
#
# java.lang.OutOfMemoryError: Java heap space
# -XX:OnOutOfMemoryError="kill -9 %p"
#   Executing /bin/sh -c "kill -9 15090"...
Killed

Node-45.dev contains 8.9GB free while it throws out of memory. Can anyone
please help me to understand the issue?

On Mon, Apr 24, 2017 at 11:22 AM, Selvam Raman  wrote:

> Hi,
>
> I have 1 master and 4 slave node. Input data size is 14GB.
> Slave Node config : 32GB Ram,16 core
>
>
> I am trying to train word embedding model using spark. It is going out of
> memory. To train 14GB of data how much memory do i require?.
>
>
> I have givem 20gb per executor but below shows it is using 11.8GB out of
> 20 GB.
> BlockManagerInfo: Added broadcast_1_piece0 in memory on ip-.-.-.dev:35035
> (size: 4.6 KB, free: 11.8 GB)
>
>
> This is the code
> if __name__ == "__main__":
> sc = SparkContext(appName="Word2VecExample")  # SparkContext
>
> # $example on$
> inp = sc.textFile("s3://word2vec/data/word2vec_word_data.txt/").map(lambda
> row: row.split(" "))
>
> word2vec = Word2Vec()
> model = word2vec.fit(inp)
>
> model.save(sc, "s3://pysparkml/word2vecresult2/")
> sc.stop()
>
>
> Spark-submit Command:
> spark-submit --master yarn --conf 
> 'spark.executor.extraJavaOptions=-XX:+HeapDumpOnOutOfMemoryError
> -XX:HeapDumpPath=/mnt/tmp -XX:+UseG1GC -XX:+UseG1GC -XX:+PrintFlagsFinal
> -XX:+PrintReferenceGC -verbose:gc -XX:+PrintGCDetails
> -XX:+PrintGCTimeStamps -XX:+PrintAdaptiveSizePolicy
> -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark' --num-executors 4
> --executor-cores 2 --executor-memory 20g Word2VecExample.py
>
>
> --
> Selvam Raman
> "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
>



-- 
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


how to create List in pyspark

2017-04-24 Thread Selvam Raman
documentDF = spark.createDataFrame([

("Hi I heard about Spark".split(" "), ),

("I wish Java could use case classes".split(" "), ),

("Logistic regression models are neat".split(" "), )

], ["text"])


How can i achieve the same df while i am reading from source?

doc = spark.read.text("/Users/rs/Desktop/nohup.out")

how can i create array type with "sentences" column from
doc(dataframe)


The below one creates more than one column.

rdd.map(lambda rdd: rdd[0]).map(lambda row:row.split(" "))

-- 
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


convert ps to jpg file

2017-05-26 Thread Selvam Raman
Hi,

is there any good open source to convert the ps to jpg?.

I am running spark job within that i am using Imagemagick/Graphicsmagick
with Ghostscript to convert/resize image.

IM/GM is took lot of memory/map memory/disk to convert KB of image file and
took lot of time. Because of this issue frequently i got yan OOM and disk
full issue.

Could you please share your thoughts?

-- 
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


how to get Cache size from storage

2017-09-04 Thread Selvam Raman
Hi All,

I am having 100 GB of data(for use case). i am caching with MEMORY_AND_DISK.

is there any log available to find how much data stored in memory and disk
for the running or ran application.

I could see the cache in UI with tab storage. So it should be available
even after the job, where can i get those details.

-- 
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


Re: How to convert Array of Json rows into Dataset of specific columns in Spark 2.2.0?

2018-01-10 Thread Selvam Raman
I just followed Hien Luu approach

val empExplode = empInfoStrDF.select(explode(from_json('emp_info_str,
  empInfoSchema)).as("emp_info_withexplode"))


empExplode.show(false)

+---+
|emp_info_withexplode   |
+---+
|[foo,[CA,USA],WrappedArray([english,2016])]|
|[bar,[OH,USA],WrappedArray([math,2017])]   |
+---+

empExplode.select($"emp_info_withexplode.name").show(false)


++
|name|
++
|foo |
|bar |
++

empExplode.select($"emp_info_withexplode.address.state").show(false)

+-+
|state|
+-+
|CA   |
|OH   |
+-+

empExplode.select($"emp_info_withexplode.docs.subject").show(false)

+-+
|subject  |
+-+
|[english]|
|[math]   |
+-+


@Kant kodali, is that helpful for you? if not please let me know what
changes are you expecting in this?




On Sun, Jan 7, 2018 at 12:16 AM, Jules Damji  wrote:

> Here’s are couple tutorial that shows how to extract Structured nested
> data
>
> https://databricks.com/blog/2017/06/27/4-sql-high-order-
> lambda-functions-examine-complex-structured-data-databricks.html
>
> https://databricks.com/blog/2017/06/13/five-spark-sql-
> utility-functions-extract-explore-complex-data-types.html
>
> Sent from my iPhone
> Pardon the dumb thumb typos :)
>
> On Jan 6, 2018, at 11:42 AM, Hien Luu  wrote:
>
> Hi Kant,
>
> I am not sure whether you had come up with a solution yet, but the
> following
> works for me (in Scala)
>
> val emp_info = """
>  [
>{"name": "foo", "address": {"state": "CA", "country": "USA"},
> "docs":[{"subject": "english", "year": 2016}]},
>{"name": "bar", "address": {"state": "OH", "country": "USA"},
> "docs":[{"subject": "math", "year": 2017}]}
>  ]"""
>
> import org.apache.spark.sql.types._
>
> val addressSchema = new StructType().add("state",
> StringType).add("country",
> StringType)
> val docsSchema = ArrayType(new StructType().add("subject",
> StringType).add("year", IntegerType))
> val employeeSchema = new StructType().add("name",
> StringType).add("address",
> addressSchema).add("docs", docsSchema)
>
> val empInfoSchema = ArrayType(employeeSchema)
>
> empInfoSchema.json
>
> val empInfoStrDF = Seq((emp_info)).toDF("emp_info_str")
> empInfoStrDF.printSchema
> empInfoStrDF.show(false)
>
> val empInfoDF = empInfoStrDF.select(from_json('emp_info_str,
> empInfoSchema).as("emp_info"))
> empInfoDF.printSchema
>
> empInfoDF.select(struct("*")).show(false)
>
> empInfoDF.select("emp_info.name", "emp_info.address",
> "emp_info.docs").show(false)
>
> empInfoDF.select(explode('emp_info.getItem("name"))).show
>
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


-- 
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


Pyspark UDF/map fucntion throws pickling exception

2018-02-15 Thread Selvam Raman
e(v)
  File
"/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/pickle.py",
line 521, in save
self.save_reduce(obj=obj, *rv)
  File
"/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/pyspark/cloudpickle.py",
line 600, in save_reduce
save(state)
  File
"/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/pickle.py",
line 476, in save
f(self, obj) # Call unbound method with explicit self
  File
"/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/pickle.py",
line 821, in save_dict
self._batch_setitems(obj.items())
  File
"/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/pickle.py",
line 847, in _batch_setitems
save(v)
  File
"/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/pickle.py",
line 521, in save
self.save_reduce(obj=obj, *rv)
  File
"/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/pyspark/cloudpickle.py",
line 582, in save_reduce
save(args)
  File
"/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/pickle.py",
line 476, in save
f(self, obj) # Call unbound method with explicit self
  File
"/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/pickle.py",
line 751, in save_tuple
save(element)
  File
"/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/pickle.py",
line 476, in save
f(self, obj) # Call unbound method with explicit self
  File
"/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/pyspark/cloudpickle.py",
line 368, in save_builtin_function
return self.save_function(obj)
  File
"/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/pyspark/cloudpickle.py",
line 247, in save_function
if islambda(obj) or obj.__code__.co_filename == '' or themodule
is None:
AttributeError: 'builtin_function_or_method' object has no attribute
'__code__'



please help me.



-- 
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


Re: Pyspark UDF/map fucntion throws pickling exception

2018-02-15 Thread Selvam Raman
ework/Versions/3.6/lib/python3.6/pickle.py",
line 476, in save
f(self, obj) # Call unbound method with explicit self
  File
"/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/pickle.py",
line 821, in save_dict
self._batch_setitems(obj.items())
  File
"/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/pickle.py",
line 852, in _batch_setitems
save(v)
  File
"/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/pickle.py",
line 521, in save
self.save_reduce(obj=obj, *rv)
  File
"/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/pyspark/cloudpickle.py",
line 600, in save_reduce
save(state)
  File
"/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/pickle.py",
line 476, in save
f(self, obj) # Call unbound method with explicit self
  File
"/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/pickle.py",
line 821, in save_dict
self._batch_setitems(obj.items())
  File
"/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/pickle.py",
line 847, in _batch_setitems
save(v)
  File
"/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/pickle.py",
line 521, in save
self.save_reduce(obj=obj, *rv)
  File
"/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/pyspark/cloudpickle.py",
line 582, in save_reduce
save(args)
  File
"/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/pickle.py",
line 476, in save
f(self, obj) # Call unbound method with explicit self
  File
"/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/pickle.py",
line 751, in save_tuple
save(element)
  File
"/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/pickle.py",
line 476, in save
f(self, obj) # Call unbound method with explicit self
  File
"/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/pyspark/cloudpickle.py",
line 368, in save_builtin_function
return self.save_function(obj)
  File
"/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/pyspark/cloudpickle.py",
line 247, in save_function
if islambda(obj) or obj.__code__.co_filename == '' or themodule
is None:
AttributeError: 'builtin_function_or_method' object has no attribute
'__code__'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File
"/Users/rs/PycharmProjects/SparkDemo/com/elsevier/vtw/ExtractDescription.py",
line 30, in 
#description.rdd.flatMap(lambda row: getPhrases(row.desc)).foreach(f)
  File
"/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/pyspark/rdd.py",
line 782, in foreach
self.mapPartitions(processPartition).count()  # Force evaluation
  File
"/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/pyspark/rdd.py",
line 1041, in count
return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
  File
"/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/pyspark/rdd.py",
line 1032, in sum
return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
  File
"/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/pyspark/rdd.py",
line 906, in fold
vals = self.mapPartitions(func).collect()
  File
"/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/pyspark/rdd.py",
line 809, in collect
port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
  File
"/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/pyspark/rdd.py",
line 2455, in _jrdd
self._jrdd_deserializer, profiler)
  File
"/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/pyspark/rdd.py",
line 2388, in _wrap_function
pickled_command, broadcast_vars, env, includes =
_prepare_for_python_RDD(sc, command)
  File
"/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/pyspark/rdd.py",
line 2374, in _prepare_for_python_RDD
pickled_command = ser.dumps(command)
  File
"/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/pyspark/serializers.py",
line 464, in dumps
return cloudpickle.dumps(obj, 2)
  File
"/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/pyspark/cloudpickle.py",
line 704, in dumps
cp.dump(obj)
  File
"/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/pyspark/cloudpickle.py",
line 162, in dump
raise pickle.PicklingError(msg)
_pickle.PicklingError: Could not serialize object: AttributeError:
'builtin_function_or_method' object has n

pyspark+spacy throwing pickling exception

2018-02-15 Thread Selvam Raman
import spacy

nlp = spacy.load('en')



def getPhrases(content):
phrases = []
doc = nlp(str(content))
for chunks in doc.noun_chunks:
phrases.append(chunks.text)
return phrases

the above function will retrieve the noun phrases from the content and
return list of phrases.


def f(x) : print(x)


description = 
xmlData.filter(col("dcterms:description").isNotNull()).select(col("dcterms:description").alias("desc"))

description.rdd.flatMap(lambda row: getPhrases(row.desc)).foreach(f)

when i am trying to access getphrases i am getting below exception



-- 
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


Re: pyspark+spacy throwing pickling exception

2018-02-15 Thread Selvam Raman
Hi ,

i solved the issue when i extract the method into another class.

Failure:
Class extract.py - contains the whole implementation.
Because of this single class driver trying to serialize spacy(english)
object and sending to executor. There i am facing pickling exception.

Success:
Class extract.py - it referring getPhrase method of spacyutils
Class spacytuils.py

Now, spacy initialized in executor, there is no need of serialization.

Please let me know my understanding is correct.


On Thu, Feb 15, 2018 at 12:14 PM, Holden Karau  wrote:

> So you left out the exception. On one hand I’m also not sure how well
> spacy serializes, so to debug this I would start off by moving the nlp =
> inside of my function and see if it still fails.
>
> On Thu, Feb 15, 2018 at 9:08 PM Selvam Raman  wrote:
>
>> import spacy
>>
>> nlp = spacy.load('en')
>>
>>
>>
>> def getPhrases(content):
>> phrases = []
>> doc = nlp(str(content))
>> for chunks in doc.noun_chunks:
>> phrases.append(chunks.text)
>> return phrases
>>
>> the above function will retrieve the noun phrases from the content and
>> return list of phrases.
>>
>>
>> def f(x) : print(x)
>>
>>
>> description = 
>> xmlData.filter(col("dcterms:description").isNotNull()).select(col("dcterms:description").alias("desc"))
>>
>> description.rdd.flatMap(lambda row: getPhrases(row.desc)).foreach(f)
>>
>> when i am trying to access getphrases i am getting below exception
>>
>>
>>
>> --
>> Selvam Raman
>> "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
>>
> --
> Twitter: https://twitter.com/holdenkarau
>



-- 
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


Spark-Solr -- unresolved dependencies

2018-02-23 Thread Selvam Raman
Hi,

spark version - EMR 2.0.0

spark-shell --packages com.lucidworks.spark:spark-solr:3.0.1

when i tired about command, am getting below error


::

::  UNRESOLVED DEPENDENCIES ::

::

:: org.restlet.jee#org.restlet;2.3.0: not found

:: org.restlet.jee#org.restlet.ext.servlet;2.3.0: not found

::



:: USE VERBOSE OR DEBUG MESSAGE LEVEL FOR MORE DETAILS
Exception in thread "main" java.lang.RuntimeException: [unresolved
dependency: org.restlet.jee#org.restlet;2.3.0: not found, unresolved
dependency: org.restlet.jee#org.restlet.ext.servlet;2.3.0: not found]
at
org.apache.spark.deploy.SparkSubmitUtils$.resolveMavenCoordinates(SparkSubmit.scala:1066)
at
org.apache.spark.deploy.SparkSubmit$.prepareSubmitEnvironment(SparkSubmit.scala:294)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:158)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

-- 
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


Spark EMR executor-core vs Vcores

2018-02-26 Thread Selvam Raman
Hi,

spark version - 2.0.0
spark distribution - EMR 5.0.0

Spark Cluster - one master, 5 slaves

Master node - m3.xlarge - 8 vCore, 15 GiB memory, 80 SSD GB storage
Slave node - m3.2xlarge - 16 vCore, 30 GiB memory, 160 SSD GB storage


Cluster Metrics
Apps SubmittedApps PendingApps RunningApps CompletedContainers RunningMemory
UsedMemory TotalMemory ReservedVCores UsedVCores TotalVCores ReservedActive
NodesDecommissioning NodesDecommissioned NodesLost NodesUnhealthy NodesRebooted
Nodes
16 0 1 15 5 88.88 GB 90.50 GB 22 GB 5 79 1 5
<http://localhost:8088/cluster/nodes> 0
<http://localhost:8088/cluster/nodes/decommissioning> 0
<http://localhost:8088/cluster/nodes/decommissioned> 5
<http://localhost:8088/cluster/nodes/lost> 0
<http://localhost:8088/cluster/nodes/unhealthy> 0
<http://localhost:8088/cluster/nodes/rebooted>
I have submitted job with below configuration
--num-executors 5 --executor-cores 10 --executor-memory 20g



spark.task.cpus - be default 1


My understanding is there will be 5 executore each can run 10 task at a
time and task can share total memory of 20g. Here, i could see only 5
vcores used which means 1 executor instance use 20g+10%overhead ram(22gb),
10 core(number of threads), 1 Vcore(cpu).

please correct me if my understand is wrong.

how can i utilize number of vcore in EMR effectively. Will Vcore boost
performance?


-- 
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


Re: Spark EMR executor-core vs Vcores

2018-02-26 Thread Selvam Raman
Master Node details:
lscpu
Architecture:  x86_64
CPU op-mode(s):32-bit, 64-bit
Byte Order:Little Endian
CPU(s):4
On-line CPU(s) list:   0-3
Thread(s) per core:4
Core(s) per socket:1
Socket(s): 1
NUMA node(s):  1
Vendor ID: GenuineIntel
CPU family:6
Model: 62
Model name:Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
Stepping:  4
CPU MHz:   2494.066
BogoMIPS:  4988.13
Hypervisor vendor: Xen
Virtualization type:   full
L1d cache: 32K
L1i cache: 32K
L2 cache:  256K
L3 cache:  25600K
NUMA node0 CPU(s): 0-3




Slave Node Details:
Architecture:  x86_64
CPU op-mode(s):32-bit, 64-bit
Byte Order:Little Endian
CPU(s):8
On-line CPU(s) list:   0-7
Thread(s) per core:8
Core(s) per socket:1
Socket(s): 1
NUMA node(s):  1
Vendor ID: GenuineIntel
CPU family:6
Model: 62
Model name:Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
Stepping:  4
CPU MHz:   2500.054
BogoMIPS:  5000.10
Hypervisor vendor: Xen
Virtualization type:   full
L1d cache: 32K
L1i cache: 32K
L2 cache:  256K
L3 cache:  25600K
NUMA node0 CPU(s): 0-7

On Mon, Feb 26, 2018 at 10:20 AM, Selvam Raman  wrote:

> Hi,
>
> spark version - 2.0.0
> spark distribution - EMR 5.0.0
>
> Spark Cluster - one master, 5 slaves
>
> Master node - m3.xlarge - 8 vCore, 15 GiB memory, 80 SSD GB storage
> Slave node - m3.2xlarge - 16 vCore, 30 GiB memory, 160 SSD GB storage
>
>
> Cluster Metrics
> Apps SubmittedApps PendingApps RunningApps CompletedContainers RunningMemory
> UsedMemory TotalMemory ReservedVCores UsedVCores TotalVCores ReservedActive
> NodesDecommissioning NodesDecommissioned NodesLost NodesUnhealthy 
> NodesRebooted
> Nodes
> 16 0 1 15 5 88.88 GB 90.50 GB 22 GB 5 79 1 5
> <http://localhost:8088/cluster/nodes> 0
> <http://localhost:8088/cluster/nodes/decommissioning> 0
> <http://localhost:8088/cluster/nodes/decommissioned> 5
> <http://localhost:8088/cluster/nodes/lost> 0
> <http://localhost:8088/cluster/nodes/unhealthy> 0
> <http://localhost:8088/cluster/nodes/rebooted>
> I have submitted job with below configuration
> --num-executors 5 --executor-cores 10 --executor-memory 20g
>
>
>
> spark.task.cpus - be default 1
>
>
> My understanding is there will be 5 executore each can run 10 task at a
> time and task can share total memory of 20g. Here, i could see only 5
> vcores used which means 1 executor instance use 20g+10%overhead ram(22gb),
> 10 core(number of threads), 1 Vcore(cpu).
>
> please correct me if my understand is wrong.
>
> how can i utilize number of vcore in EMR effectively. Will Vcore boost
> performance?
>
>
> --
> Selvam Raman
> "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
>



-- 
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


Re: Spark EMR executor-core vs Vcores

2018-02-26 Thread Selvam Raman
Hi Fawze,

Yes, it is true that i am running in yarn mode, 5 containers represents
4executor and 1 master.
But i am not expecting this details as i already aware of this. What i want
to know is relationship between Vcores(Emr yarn) vs executor-core(Spark).


>From my slave configuration i understand that only 8 thread available in my
slave machine which means 8 thread run at a time at max.

Thread(s) per core:8
Core(s) per socket:1
Socket(s): 1


so i don't think so it is valid to give executore-core-10 in my
spark-submission.

On Mon, Feb 26, 2018 at 10:54 AM, Fawze Abujaber  wrote:

> It's recommended to sue executor-cores of 5.
>
> Each executor here will utilize 20 GB which mean the spark job will
> utilize 50 cpu cores and 100GB memory.
>
> You can not run more than 4 executors because your cluster doesn't have
> enough memory.
>
> Use see 5 executor because 4 for the job and one for the application
> master.
>
> serr the used menory and the total memory.
>
> On Mon, Feb 26, 2018 at 12:20 PM, Selvam Raman  wrote:
>
>> Hi,
>>
>> spark version - 2.0.0
>> spark distribution - EMR 5.0.0
>>
>> Spark Cluster - one master, 5 slaves
>>
>> Master node - m3.xlarge - 8 vCore, 15 GiB memory, 80 SSD GB storage
>> Slave node - m3.2xlarge - 16 vCore, 30 GiB memory, 160 SSD GB storage
>>
>>
>> Cluster Metrics
>> Apps SubmittedApps PendingApps RunningApps CompletedContainers RunningMemory
>> UsedMemory TotalMemory ReservedVCores UsedVCores TotalVCores ReservedActive
>> NodesDecommissioning NodesDecommissioned NodesLost NodesUnhealthy 
>> NodesRebooted
>> Nodes
>> 16 0 1 15 5 88.88 GB 90.50 GB 22 GB 5 79 1 5
>> <http://localhost:8088/cluster/nodes> 0
>> <http://localhost:8088/cluster/nodes/decommissioning> 0
>> <http://localhost:8088/cluster/nodes/decommissioned> 5
>> <http://localhost:8088/cluster/nodes/lost> 0
>> <http://localhost:8088/cluster/nodes/unhealthy> 0
>> <http://localhost:8088/cluster/nodes/rebooted>
>> I have submitted job with below configuration
>> --num-executors 5 --executor-cores 10 --executor-memory 20g
>>
>>
>>
>> spark.task.cpus - be default 1
>>
>>
>> My understanding is there will be 5 executore each can run 10 task at a
>> time and task can share total memory of 20g. Here, i could see only 5
>> vcores used which means 1 executor instance use 20g+10%overhead ram(22gb),
>> 10 core(number of threads), 1 Vcore(cpu).
>>
>> please correct me if my understand is wrong.
>>
>> how can i utilize number of vcore in EMR effectively. Will Vcore boost
>> performance?
>>
>>
>> --
>> Selvam Raman
>> "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
>>
>
>


-- 
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


Re: Spark EMR executor-core vs Vcores

2018-02-26 Thread Selvam Raman
Thanks. That’s make sense.

I want to know one more think , available vcore per machine is 16 but
threads per node 8. Am I missing to relate here.

What I m thinking now is number of vote = number of threads.



On Mon, 26 Feb 2018 at 18:45, Vadim Semenov  wrote:

> All used cores aren't getting reported correctly in EMR, and YARN itself
> has no control over it, so whatever you put in `spark.executor.cores` will
> be used,
> but in the ResourceManager you will only see 1 vcore used per nodemanager.
>
> On Mon, Feb 26, 2018 at 5:20 AM, Selvam Raman  wrote:
>
>> Hi,
>>
>> spark version - 2.0.0
>> spark distribution - EMR 5.0.0
>>
>> Spark Cluster - one master, 5 slaves
>>
>> Master node - m3.xlarge - 8 vCore, 15 GiB memory, 80 SSD GB storage
>> Slave node - m3.2xlarge - 16 vCore, 30 GiB memory, 160 SSD GB storage
>>
>>
>> Cluster Metrics
>> Apps SubmittedApps PendingApps RunningApps CompletedContainers RunningMemory
>> UsedMemory TotalMemory ReservedVCores UsedVCores TotalVCores ReservedActive
>> NodesDecommissioning NodesDecommissioned NodesLost NodesUnhealthy 
>> NodesRebooted
>> Nodes
>> 16 0 1 15 5 88.88 GB 90.50 GB 22 GB 5 79 1 5
>> <http://localhost:8088/cluster/nodes> 0
>> <http://localhost:8088/cluster/nodes/decommissioning> 0
>> <http://localhost:8088/cluster/nodes/decommissioned> 5
>> <http://localhost:8088/cluster/nodes/lost> 0
>> <http://localhost:8088/cluster/nodes/unhealthy> 0
>> <http://localhost:8088/cluster/nodes/rebooted>
>> I have submitted job with below configuration
>> --num-executors 5 --executor-cores 10 --executor-memory 20g
>>
>>
>>
>> spark.task.cpus - be default 1
>>
>>
>> My understanding is there will be 5 executore each can run 10 task at a
>> time and task can share total memory of 20g. Here, i could see only 5
>> vcores used which means 1 executor instance use 20g+10%overhead ram(22gb),
>> 10 core(number of threads), 1 Vcore(cpu).
>>
>> please correct me if my understand is wrong.
>>
>> how can i utilize number of vcore in EMR effectively. Will Vcore boost
>> performance?
>>
>>
>> --
>> Selvam Raman
>> "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
>>
>
> --
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


Spark Higher order function

2018-03-05 Thread Selvam Raman
Dear All,

 i read about higher order function in databricks blog.

https://docs.databricks.com/spark/latest/spark-sql/higher-order-functions-lambda-functions.html

does higher order functionality available in our spark(open source)?


-- 
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


Sequence file to Image in spark

2018-04-28 Thread Selvam Raman
Hi All,

I am trying to convert sequence file to image in spark.

i found that when i was reading bytearrayinputstream from bytes it throws
serialization exception. Any insight will be helpful.

scala> sc.sequenceFile[NullWritable,BytesWritable]("D:/seqImage").map(x =>
{ImageIO.write(ImageIO.read(newByteArrayInputStream(x._2.copyBytes())),"png",new
File("D:/ima"))}).collect

2018-04-28 15:45:52 ERROR Executor:91 - Exception in task 0.0 in stage 8.0
(TID

14)

java.lang.IllegalArgumentException: image == null!

at javax.imageio.ImageTypeSpecifier.createFromRenderedImage(Unknown
Sour

ce)

at javax.imageio.ImageIO.getWriter(Unknown Source)

at javax.imageio.ImageIO.write(Unknown Source)

at
$line117.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(:31)

at
$line117.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(:31)

at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)

at scala.collection.Iterator$class.foreach(Iterator.scala:893)

at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)

at
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:

59)

at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:

104)

at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:

48)

at scala.collection.TraversableOnce$class.to
(TraversableOnce.scala:310)

at scala.collection.AbstractIterator.to(Iterator.scala:1336)

at
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala

:302)

at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)

at
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:

289)

at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)

at
org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.sca

la:939)

at
org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.sca

la:939)

at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.sc

ala:2067)

at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.sc

ala:2067)

at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)

at org.apache.spark.scheduler.Task.run(Task.scala:109)

at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)



at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
Source)

at java.lang.Thread.run(Unknown Source)

2018-04-28 15:45:52 WARN  TaskSetManager:66 - Lost task 0.0 in stage 8.0
(TID 14

, localhost, executor driver): java.lang.IllegalArgumentException: image ==
null

!

at javax.imageio.ImageTypeSpecifier.createFromRenderedImage(Unknown
Sour

ce)


-- 
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


how to decide broadcast join data size

2018-07-14 Thread Selvam Raman
Hi,

I could not find useful formula or documentation which will help me to
decide the broadcast join data size depends on the cluster size.

Please let me know is there thumb rule available to find.

For example
cluster size - 20 node cluster, 32 gb per node and 8 core per node.

executor-memory = 8gb, executor-core=4

Memory:
8gb(0.4% per internal) - 4.8gb for actual computation and storage. lets
consider i have not done any persist in this case i could utilize 4.8gb per
executor.
IS IT POSSIBLE FOR ME TO USE 400MB file for BROADCAST JOIN?

-- 
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"