Spark DataSet class is not truly private[sql]

2020-03-04 Thread Nirav Patel
I see Spark dataset is defined as:

class Dataset[T] private[sql](

@transient val sparkSession: SparkSession,

@DeveloperApi @InterfaceStability.Unstable @transient val queryExecution:
QueryExecution,

encoder: Encoder[T])


However it has public constructors which allows DataSet to be extended
which I don't think is intended by Developer.


  def this(sparkSession: SparkSession, logicalPlan: LogicalPlan, encoder:
Encoder[T]) = {
this(sparkSession, sparkSession.sessionState.executePlan(logicalPlan),
encoder)
  }

  def this(sqlContext: SQLContext, logicalPlan: LogicalPlan, encoder:
Encoder[T]) = {
this(sqlContext.sparkSession, logicalPlan, encoder)
  }

-- 
 


 




Spark 2.4 and Hive 2.3 - Performance issue with concurrent hive DDL queries

2020-01-30 Thread Nirav Patel
Hi,

I am trying to do 1000s of update parquet partition operations on different
hive tables parallely from my client application. I am using sparksql in
local mode with hive enabled in my application to submit hive query.
Spark is being used in local mode because all the operations we do are
pretty simple DDL queries so we don't want to use cluster resources in this
case.

Spark hive config property is:

hive.metastore.uris=thrift://hivebox:9083

Example sql query that we want to execute in parallel:

spark.sql(" ALTER TABLE mytable PARTITION (a=3, b=3) SET LOCATION
'/newdata/mytable/a=3/b=3/part.parquet")


I can see all the queries are submitted via different threads from my
fork-join pool. i couldn't scale this operation however way i tweak the
thread pool. Then I started observing hive metastore logs and I see that
only thread is making all writes.

2020-01-29T16:27:15,638  INFO [pool-6-thread-163]
metastore.HiveMetaStore: 163: source:10.250.70.14 get_table : db=mydb
tbl=mytable1
2020-01-29T16:27:15,638  INFO [*pool-6-thread-163*]
HiveMetaStore.audit: ugi=mycomp   ip=10.250.70.14
cmd=*source:10.250.70.14* get_table : db=mydb tbl=mytable1
2020-01-29T16:27:15,653  INFO [*pool-6-thread-163*]
metastore.HiveMetaStore: 163: source:10.250.70.14 get_database: mydb
2020-01-29T16:27:15,653  INFO [*pool-6-thread-163*]
HiveMetaStore.audit: ugi=mycomp   ip=10.250.70.14
cmd=*source:10.250.70.14* get_database: mydb
2020-01-29T16:27:15,655  INFO [pool-6-thread-163]
metastore.HiveMetaStore: 163: source:10.250.70.14 get_table : db=mydb
tbl=mytable2
2020-01-29T16:27:15,656  INFO [pool-6-thread-163] HiveMetaStore.audit:
ugi=mycomp   ip=10.250.70.14 cmd=*source:10.250.70.14* get_table :
db=mydb tbl=mytable2
2020-01-29T16:27:15,670  INFO [pool-6-thread-163]
metastore.HiveMetaStore: 163: source:10.250.70.14 get_database: mydb
2020-01-29T16:27:15,670  INFO [pool-6-thread-163] HiveMetaStore.audit:
ugi=mycomp   ip=10.250.70.14 cmd=source:10.250.70.14 get_database:
mydb
2020-01-29T16:27:15,672  INFO [pool-6-thread-163]
metastore.HiveMetaStore: 163: source:10.250.70.14 get_table : db=mydb
tbl=mytable3
2020-01-29T16:27:15,672  INFO [pool-6-thread-163] HiveMetaStore.audit:
ugi=mycomp   ip=10.250.70.14 cmd=source:10.250.70.14 get_table :
db=mydb tbl=mytable3

All actions are performed by only one thread pool-6-thread-163 I have
scanned 100s of lines and it just same thread. I don't see much log in
hiverserver.log file.

Is it bound to consumer IP because i see in log record source:10.250.70.14?
which would make sense as I am submitting all jobs from single machine. If
that's the case how do I scale this? Am I missing any configuration or is
there any issue with how hive handles connection from spark client?

I know the workaround could be to run my application in cluster in which
case queries will be submitted by different client machines (worker nodes)
but we really just want to use spark in local mode.

Thanks,

Nirav

-- 
 


 




Stage or Tasks level logs missing

2019-02-13 Thread Nirav Patel
Currently there seems to be 3 places to check task level logs:
1) Using spark UI
2) `yarn application log`
3) log aggregation  on hdfs (if enabled)

All above only give you log at executor(container) level. However one
executor can have multiple threads and each might be running part of
different stages(stg1, stg2)) and within that different tasks(tid1,
tid2...) . It's hard to track particular task activities  in Executor logs.

It'd be nice if:
1) mark all log entries with stageId followed by taskId
2) have a separate log file for each task (taskId)
3) have a separated log file for of stage level logs

If I missed something let me know

Thanks,
Nirav


Spark 2.2.1 - Operation not allowed: alter table replace columns

2018-12-17 Thread Nirav Patel
I see that similar issue if fixed for `ALTER TABLE table_name ADD
COLUMNS(..)` stmt.

https://issues.apache.org/jira/browse/SPARK-19261

Is it also fixed for `REPLACE COLUMNS` in any subsequent version?

Thanks

-- 


 

 
   
   
      



Re: CSV parser - is there a way to find malformed csv record

2018-10-09 Thread Nirav Patel
Thanks Shuporno . That mode worked. I found out couple records having
quotes inside quotes which needed to be escaped.



On Tue, Oct 9, 2018 at 1:40 PM Taylor Cox  wrote:

> Hey Nirav,
>
>
>
> Here’s an idea:
>
>
>
> Suppose your file.csv has N records, one for each line. Read the csv
> line-by-line (without spark) and attempt to parse each line. If a record is
> malformed, catch the exception and rethrow it with the line number. That
> should show you where the problematic record(s) can be found.
>
>
>
> *From:* Nirav Patel 
> *Sent:* Monday, October 8, 2018 11:57 AM
> *To:* spark users 
> *Subject:* CSV parser - is there a way to find malformed csv record
>
>
>
> I am getting `RuntimeException: Malformed CSV record` while parsing csv
> record and attaching schema at same time. Most likely there are additional
> commas or json data in some field which are not escaped properly. Is there
> a way CSV parser tells me which record is malformed?
>
>
>
>
>
> This is what I am using:
>
>
>
> val df2 = sparkSession.read
>
>   .option("inferSchema", true)
>
>   .option("multiLine", true)
>
>   .schema(headerDF.schema) // this only works without column mismatch
>
>   .csv(dataPath)
>
>
>
> Thanks
>
>
>
>
> [image: Image removed by sender. What's New with Xactly]
> <https://na01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fwww.xactlycorp.com%2Femail-click%2F=02%7C01%7CTaylor.Cox%40microsoft.com%7C99917500d9d546c8bef308d62d4fe469%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C636746218498409102=Q648xF6kZthiaWtDpXXsy3jSnKT%2FYVF7DFKSp9Mahtk%3D=0>
>
> [image: Image removed by sender.]
> <https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fwww.instagram.com%2Fxactlycorp%2F=02%7C01%7CTaylor.Cox%40microsoft.com%7C99917500d9d546c8bef308d62d4fe469%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C636746218498419112=Rz6ft6lLLRJ9FJVtRMSlKfpKZriwi1yQiiOix0P3PiM%3D=0>
>   [image: Image removed by sender.]
> <https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fwww.linkedin.com%2Fcompany%2Fxactly-corporation=02%7C01%7CTaylor.Cox%40microsoft.com%7C99917500d9d546c8bef308d62d4fe469%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C636746218498419112=htCoZq07XYbOkkB%2Fojwpo4FMTT32LvMsq0%2F8vdp4cD0%3D=0>
>   [image: Image removed by sender.]
> <https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Ftwitter.com%2FXactly=02%7C01%7CTaylor.Cox%40microsoft.com%7C99917500d9d546c8bef308d62d4fe469%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C636746218498429117=dx4hY7uwBbthUahdZ%2FlsWPaWBvsBS6zskgOfZj%2BBHCY%3D=0>
>   [image: Image removed by sender.]
> <https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fwww.facebook.com%2FXactlyCorp=02%7C01%7CTaylor.Cox%40microsoft.com%7C99917500d9d546c8bef308d62d4fe469%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C636746218498429117=KohVt7EXC9P5GiwKKGUMXxvM507o4ZnNozXofMxvn78%3D=0>
>   [image: Image removed by sender.]
> <https://na01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fwww.youtube.com%2Fxactlycorporation=02%7C01%7CTaylor.Cox%40microsoft.com%7C99917500d9d546c8bef308d62d4fe469%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C636746218498439126=wphFwmIuci%2BZlrdWYmRdaSOvynU48UmAs0xEFI2BRh0%3D=0>
>

-- 


 <http://www.xactlycorp.com/email-click/>

 
<https://www.instagram.com/xactlycorp/>   
<https://www.linkedin.com/company/xactly-corporation>   
<https://twitter.com/Xactly>   <https://www.facebook.com/XactlyCorp>   
<http://www.youtube.com/xactlycorporation>


CSV parser - is there a way to find malformed csv record

2018-10-08 Thread Nirav Patel
I am getting `RuntimeException: Malformed CSV record` while parsing csv
record and attaching schema at same time. Most likely there are additional
commas or json data in some field which are not escaped properly. Is there
a way CSV parser tells me which record is malformed?


This is what I am using:

val df2 = sparkSession.read
  .option("inferSchema", true)
  .option("multiLine", true)
  .schema(headerDF.schema) // this only works without column mismatch
  .csv(dataPath)

Thanks

-- 


 

 
   
   
      



Re: CSV parser - how to parse column containing json data

2018-10-02 Thread Nirav Patel
I need to inferSchema from CSV as well. As per your solution, I am creating
SructType only for Json field. So how am I going to mix and match here?
i.e. do type inference for all fields but json field and use custom
json_schema for json field.





On Thu, Aug 30, 2018 at 5:29 PM Brandon Geise 
wrote:

> If you know your json schema you can create a struct and then apply that
> using from_json:
>
>
>
> val json_schema = StructType(Array(StructField(“x”, StringType, true),
> StructField(“y”, StringType, true), StructField(“z”, IntegerType, true)))
>
>
>
> .withColumn("_c3", from_json(col("_c3_signals"),json_schema))
>
>
>
> *From: *Nirav Patel 
> *Date: *Thursday, August 30, 2018 at 7:19 PM
> *To: *spark users 
> *Subject: *CSV parser - how to parse column containing json data
>
>
>
> Is there a way to parse csv file with some column in middle containing
> json data structure?
>
>
>
> "a",102,"c","{"x":"xx","y":false,"z":123}","d","e",102.2
>
>
>
>
>
> Thanks,
>
> Nirav
>
>
>
>
> [image: What's New with Xactly] <http://www.xactlycorp.com/email-click/>
>
> [image: https://www.xactlycorp.com/wp-content/uploads/2017/09/insta.png]
> <https://www.instagram.com/xactlycorp/>  [image:
> https://www.xactlycorp.com/wp-content/uploads/2017/09/linkedin.png]
> <https://www.linkedin.com/company/xactly-corporation>  [image:
> https://www.xactlycorp.com/wp-content/uploads/2017/09/twitter.png]
> <https://twitter.com/Xactly>  [image:
> https://www.xactlycorp.com/wp-content/uploads/2017/09/facebook.png]
> <https://www.facebook.com/XactlyCorp>  [image:
> https://www.xactlycorp.com/wp-content/uploads/2017/09/youtube.png]
> <http://www.youtube.com/xactlycorporation>
>

-- 


 <http://www.xactlycorp.com/email-click/>

 
<https://www.instagram.com/xactlycorp/>   
<https://www.linkedin.com/company/xactly-corporation>   
<https://twitter.com/Xactly>   <https://www.facebook.com/XactlyCorp>   
<http://www.youtube.com/xactlycorporation>


CSV parser - how to parse column containing json data

2018-08-30 Thread Nirav Patel
Is there a way to parse csv file with some column in middle containing json
data structure?

"a",102,"c","{"x":"xx","y":false,"z":123}","d","e",102.2


Thanks,
Nirav

-- 


 

 
   
   
      



csv reader performance with multiline option

2018-08-18 Thread Nirav Patel
does enabling 'multiLine' option impact performance? how? would it run read
entire file with just one thread?

Thanks

-- 


 

 
   
   
      



Re: Insert into dynamic partitioned hive/parquet table throws error - Partition spec contains non-partition columns

2018-08-07 Thread Nirav Patel
FYI, it works with static partitioning
spark.sql("insert overwrite table mytable PARTITION(P1=1085, P2=164590861)
select c1, c2,..cn, P1, P2 from updateTable")

On Thu, Aug 2, 2018 at 5:01 PM, Nirav Patel  wrote:

> I am trying to insert overwrite multiple partitions into existing
> partitioned hive/parquet table. Table was created using sparkSession.
>
> I have a table 'mytable' with partitions P1 and P2.
>
> I have following set on sparkSession object:
>
> .config("hive.exec.dynamic.partition", true)
>
> .config("hive.exec.dynamic.partition.mode", "nonstrict")
>
> val df = spark.read.csv(pathToNewData)
>
> df.createOrReplaceTempView("updateTable")
>
> here 'df' may contains data from multiple partitions. i.e. multiple values
> for P1 and P2 in data.
>
>
> spark.sql("insert overwrite table mytable PARTITION(P1, P2) select c1,
> c2,..cn, P1, P2 from updateTable") // I made sure that partition columns P1
> and P2 are at the end of projection list.
>
> I am getting following error:
>
> org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.meta
> data.Table.ValidationFailureSemanticException: Partition spec {p1=, p2=,
> P1=1085, P2=164590861} contains non-partition columns;
>
> dataframe 'df' have records for P1=1085, P2=164590861 .
>
>

-- 


 <http://www.xactlycorp.com/email-click/>

 
<https://www.instagram.com/xactlycorp/>   
<https://www.linkedin.com/company/xactly-corporation>   
<https://twitter.com/Xactly>   <https://www.facebook.com/XactlyCorp>   
<http://www.youtube.com/xactlycorporation>


Insert into dynamic partitioned hive/parquet table throws error - Partition spec contains non-partition columns

2018-08-02 Thread Nirav Patel
I am trying to insert overwrite multiple partitions into existing
partitioned hive/parquet table. Table was created using sparkSession.

I have a table 'mytable' with partitions P1 and P2.

I have following set on sparkSession object:

.config("hive.exec.dynamic.partition", true)

.config("hive.exec.dynamic.partition.mode", "nonstrict")

val df = spark.read.csv(pathToNewData)

df.createOrReplaceTempView("updateTable")

here 'df' may contains data from multiple partitions. i.e. multiple values
for P1 and P2 in data.


spark.sql("insert overwrite table mytable PARTITION(P1, P2) select c1,
c2,..cn, P1, P2 from updateTable") // I made sure that partition columns P1
and P2 are at the end of projection list.

I am getting following error:

org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.
metadata.Table.ValidationFailureSemanticException: Partition spec {p1=,
p2=, P1=1085, P2=164590861} contains non-partition columns;

dataframe 'df' have records for P1=1085, P2=164590861 .

-- 


 

 
   
   
      



Re: Saving dataframes with partitionBy: append partitions, overwrite within each

2018-08-02 Thread Nirav Patel
I tried following to explicitly specify partition columns in sql statement
and also tried different cases (upper and lower) fro partition columns.

insert overwrite table $tableName PARTITION(P1, P2) select A, B, C, P1, P2
from updateTable.

Still getting:

Caused by:
org.apache.hadoop.hive.ql.metadata.Table$ValidationFailureSemanticException:
Partition spec {p1=, p2=, P1=1085, P2=164590861} contains non-partition
columns



On Thu, Aug 2, 2018 at 11:37 AM, Nirav Patel  wrote:

> Thanks Koert. I'll check that out when we can update to 2.3
>
> Meanwhile, I am trying hive sql (INSERT OVERWRITE) statement to insert
> overwrite multiple partitions. (without loosing existing ones)
>
> It's giving me issues around partition columns.
>
> dataFrame.createOrReplaceTempView("updateTable") //here dataframe
> contains values from multiple partitions.
>
> dataFrame also have partition columns but I can't get any of following to
> execute:
>
> insert overwrite table $tableName PARTITION(P1, P2) select * from
> updateTable.
>
> org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.
> metadata.Table.ValidationFailureSemanticException: Partition spec {p1=,
> p2=, P1=__HIVE_DEFAULT_PARTITION__, P2=1} contains non-partition columns;
>
>
> Is above a right approach to update multiple partitions? Or should I be
> more specific updating each partition with separate command like following:
>
> //Pseudo code; yet to try
>
> df.createOrReplaceTempView("updateTable")
> df.rdd.groupBy(P1, P2).map { (key, Iterable[Row]) =>
>
>
>   spark.sql("INSERT OVERWRITE TABLE stats
>   PARTITION(P1 = key._1, P2 = key._2)
>   SELECT * from updateTable where P1 = key._1 and P2 = key._2")
> }
>
> Regards,
> Nirav
>
>
> On Wed, Aug 1, 2018 at 4:18 PM, Koert Kuipers  wrote:
>
>> this works for dataframes with spark 2.3 by changing a global setting,
>> and will be configurable per write in 2.4
>> see:
>> https://issues.apache.org/jira/browse/SPARK-20236
>> https://issues.apache.org/jira/browse/SPARK-24860
>>
>> On Wed, Aug 1, 2018 at 3:11 PM, Nirav Patel 
>> wrote:
>>
>>> Hi Peay,
>>>
>>> Have you find better solution yet? I am having same issue.
>>>
>>> Following says it works with spark 2.1 onward but only when you use
>>> sqlContext and not Dataframe
>>> https://medium.com/@anuvrat/writing-into-dynamic-partitions-
>>> using-spark-2e2b818a007a
>>>
>>> Thanks,
>>> Nirav
>>>
>>> On Mon, Oct 2, 2017 at 4:37 AM, Pavel Knoblokh 
>>> wrote:
>>>
>>>> If your processing task inherently processes input data by month you
>>>> may want to "manually" partition the output data by month as well as
>>>> by day, that is to save it with a file name including the given month,
>>>> i.e. "dataset.parquet/month=01". Then you will be able to use the
>>>> overwrite mode with each month partition. Hope this could be of some
>>>> help.
>>>>
>>>> --
>>>> Pavel Knoblokh
>>>>
>>>> On Fri, Sep 29, 2017 at 5:31 PM, peay  wrote:
>>>> > Hello,
>>>> >
>>>> > I am trying to use
>>>> > data_frame.write.partitionBy("day").save("dataset.parquet") to write
>>>> a
>>>> > dataset while splitting by day.
>>>> >
>>>> > I would like to run a Spark job  to process, e.g., a month:
>>>> > dataset.parquet/day=2017-01-01/...
>>>> > ...
>>>> >
>>>> > and then run another Spark job to add another month using the same
>>>> folder
>>>> > structure, getting me
>>>> > dataset.parquet/day=2017-01-01/
>>>> > ...
>>>> > dataset.parquet/day=2017-02-01/
>>>> > ...
>>>> >
>>>> > However:
>>>> > - with save mode "overwrite", when I process the second month, all of
>>>> > dataset.parquet/ gets removed and I lose whatever was already
>>>> computed for
>>>> > the previous month.
>>>> > - with save mode "append", then I can't get idempotence: if I run the
>>>> job to
>>>> > process a given month twice, I'll get duplicate data in all the
>>>> subfolders
>>>> > for that month.
>>>> >
>>>> > Is there a way to do "append in terms of the subfolders from
>>>> partitionBy,
>>>> > but overwrite within each such partitions? Any help would be
>>>> appreciated.
>>>> >
>>>> > Thanks!
>>>>
>>>>
>>>>
>>>> --
>>>> Pavel Knoblokh
>>>>
>>>> -
>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>>
>>>>
>>>
>>>
>>>
>>> [image: What's New with Xactly] <http://www.xactlycorp.com/email-click/>
>>>
>>> <https://www.instagram.com/xactlycorp/>
>>> <https://www.linkedin.com/company/xactly-corporation>
>>> <https://twitter.com/Xactly>   <https://www.facebook.com/XactlyCorp>
>>> <http://www.youtube.com/xactlycorporation>
>>
>>
>>
>

-- 


 <http://www.xactlycorp.com/email-click/>

 
<https://www.instagram.com/xactlycorp/>   
<https://www.linkedin.com/company/xactly-corporation>   
<https://twitter.com/Xactly>   <https://www.facebook.com/XactlyCorp>   
<http://www.youtube.com/xactlycorporation>


Re: Saving dataframes with partitionBy: append partitions, overwrite within each

2018-08-02 Thread Nirav Patel
Thanks Koert. I'll check that out when we can update to 2.3

Meanwhile, I am trying hive sql (INSERT OVERWRITE) statement to insert
overwrite multiple partitions. (without loosing existing ones)

It's giving me issues around partition columns.

dataFrame.createOrReplaceTempView("updateTable") //here dataframe
contains values from multiple partitions.

dataFrame also have partition columns but I can't get any of following to
execute:

insert overwrite table $tableName PARTITION(P1, P2) select * from
updateTable.

org.apache.spark.sql.AnalysisException:
org.apache.hadoop.hive.ql.metadata.Table.ValidationFailureSemanticException:
Partition spec {p1=, p2=, P1=__HIVE_DEFAULT_PARTITION__, P2=1} contains
non-partition columns;


Is above a right approach to update multiple partitions? Or should I be
more specific updating each partition with separate command like following:

//Pseudo code; yet to try

df.createOrReplaceTempView("updateTable")
df.rdd.groupBy(P1, P2).map { (key, Iterable[Row]) =>


  spark.sql("INSERT OVERWRITE TABLE stats
  PARTITION(P1 = key._1, P2 = key._2)
  SELECT * from updateTable where P1 = key._1 and P2 = key._2")
}

Regards,
Nirav


On Wed, Aug 1, 2018 at 4:18 PM, Koert Kuipers  wrote:

> this works for dataframes with spark 2.3 by changing a global setting, and
> will be configurable per write in 2.4
> see:
> https://issues.apache.org/jira/browse/SPARK-20236
> https://issues.apache.org/jira/browse/SPARK-24860
>
> On Wed, Aug 1, 2018 at 3:11 PM, Nirav Patel  wrote:
>
>> Hi Peay,
>>
>> Have you find better solution yet? I am having same issue.
>>
>> Following says it works with spark 2.1 onward but only when you use
>> sqlContext and not Dataframe
>> https://medium.com/@anuvrat/writing-into-dynamic-partitions-
>> using-spark-2e2b818a007a
>>
>> Thanks,
>> Nirav
>>
>> On Mon, Oct 2, 2017 at 4:37 AM, Pavel Knoblokh 
>> wrote:
>>
>>> If your processing task inherently processes input data by month you
>>> may want to "manually" partition the output data by month as well as
>>> by day, that is to save it with a file name including the given month,
>>> i.e. "dataset.parquet/month=01". Then you will be able to use the
>>> overwrite mode with each month partition. Hope this could be of some
>>> help.
>>>
>>> --
>>> Pavel Knoblokh
>>>
>>> On Fri, Sep 29, 2017 at 5:31 PM, peay  wrote:
>>> > Hello,
>>> >
>>> > I am trying to use
>>> > data_frame.write.partitionBy("day").save("dataset.parquet") to write a
>>> > dataset while splitting by day.
>>> >
>>> > I would like to run a Spark job  to process, e.g., a month:
>>> > dataset.parquet/day=2017-01-01/...
>>> > ...
>>> >
>>> > and then run another Spark job to add another month using the same
>>> folder
>>> > structure, getting me
>>> > dataset.parquet/day=2017-01-01/
>>> > ...
>>> > dataset.parquet/day=2017-02-01/
>>> > ...
>>> >
>>> > However:
>>> > - with save mode "overwrite", when I process the second month, all of
>>> > dataset.parquet/ gets removed and I lose whatever was already computed
>>> for
>>> > the previous month.
>>> > - with save mode "append", then I can't get idempotence: if I run the
>>> job to
>>> > process a given month twice, I'll get duplicate data in all the
>>> subfolders
>>> > for that month.
>>> >
>>> > Is there a way to do "append in terms of the subfolders from
>>> partitionBy,
>>> > but overwrite within each such partitions? Any help would be
>>> appreciated.
>>> >
>>> > Thanks!
>>>
>>>
>>>
>>> --
>>> Pavel Knoblokh
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>>
>>
>>
>> [image: What's New with Xactly] <http://www.xactlycorp.com/email-click/>
>>
>> <https://www.instagram.com/xactlycorp/>
>> <https://www.linkedin.com/company/xactly-corporation>
>> <https://twitter.com/Xactly>   <https://www.facebook.com/XactlyCorp>
>> <http://www.youtube.com/xactlycorporation>
>
>
>

-- 


 <http://www.xactlycorp.com/email-click/>

 
<https://www.instagram.com/xactlycorp/>   
<https://www.linkedin.com/company/xactly-corporation>   
<https://twitter.com/Xactly>   <https://www.facebook.com/XactlyCorp>   
<http://www.youtube.com/xactlycorporation>


Overwrite only specific partition with hive dynamic partitioning

2018-08-01 Thread Nirav Patel
Hi,

I have a hive partition table created using sparkSession. I would like to
insert/overwrite Dataframe data to specific set of partition without
loosing any other partition. In each run I have to update Set of partitions
not just one.

e.g. I have dataframe with bid=1, bid=2, bid=3 in first time and I can
write it  by using

`df.write.mode(SaveMode.Overwrite).partitionBy("bid").parquet(TableBase
Location)`


It generates dirs: bid=1, bid=2, bid=3  inside TableBaseLocation

But next time when I have a dataframe with  bid=1, bid=4 and use same code
above it removes bid=2 and bid=3. in other words I dont get idempotency.

I tried SaveMode.append but that creates duplicate data inside "bid=1"


I read
https://issues.apache.org/jira/browse/SPARK-18183

With that approach it seems like I may have to updated multiple partition
manually for each input partition. That seems like lot of work on every
update. Is there a better way for this?

Can this fix be apply to dataframe based approach as well?

Thanks

-- 


 

 
   
   
      



Re: Saving dataframes with partitionBy: append partitions, overwrite within each

2018-08-01 Thread Nirav Patel
Hi Peay,

Have you find better solution yet? I am having same issue.

Following says it works with spark 2.1 onward but only when you use
sqlContext and not Dataframe
https://medium.com/@anuvrat/writing-into-dynamic-partitions-using-spark-2e2b818a007a

Thanks,
Nirav

On Mon, Oct 2, 2017 at 4:37 AM, Pavel Knoblokh  wrote:

> If your processing task inherently processes input data by month you
> may want to "manually" partition the output data by month as well as
> by day, that is to save it with a file name including the given month,
> i.e. "dataset.parquet/month=01". Then you will be able to use the
> overwrite mode with each month partition. Hope this could be of some
> help.
>
> --
> Pavel Knoblokh
>
> On Fri, Sep 29, 2017 at 5:31 PM, peay  wrote:
> > Hello,
> >
> > I am trying to use
> > data_frame.write.partitionBy("day").save("dataset.parquet") to write a
> > dataset while splitting by day.
> >
> > I would like to run a Spark job  to process, e.g., a month:
> > dataset.parquet/day=2017-01-01/...
> > ...
> >
> > and then run another Spark job to add another month using the same folder
> > structure, getting me
> > dataset.parquet/day=2017-01-01/
> > ...
> > dataset.parquet/day=2017-02-01/
> > ...
> >
> > However:
> > - with save mode "overwrite", when I process the second month, all of
> > dataset.parquet/ gets removed and I lose whatever was already computed
> for
> > the previous month.
> > - with save mode "append", then I can't get idempotence: if I run the
> job to
> > process a given month twice, I'll get duplicate data in all the
> subfolders
> > for that month.
> >
> > Is there a way to do "append in terms of the subfolders from partitionBy,
> > but overwrite within each such partitions? Any help would be appreciated.
> >
> > Thanks!
>
>
>
> --
> Pavel Knoblokh
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

-- 


 

 
   
   
      



Re: Mulitple joins with same Dataframe throws AnalysisException: resolved attribute(s)

2018-07-19 Thread Nirav Patel
corrected subject line. It's missing attribute error not ambiguous
reference error.

On Thu, Jul 19, 2018 at 2:11 PM, Nirav Patel  wrote:

> I am getting attribute missing error after joining dataframe 'df2' twice .
>
> Exception in thread "main" org.apache.spark.sql.AnalysisException:
> resolved attribute(s) *fid#49 *missing from value#14,value#126,mgrId#15,
> name#16,d31#109,df2Id#125,df2Id#47,d4#130,d3#129,df1Id#13,name#128,
> *fId#127* in operator !Join LeftOuter, (mgrId#15 = fid#49);;
>
> !Join LeftOuter, (mgrId#15 = fid#49)
>
> :- Project [df1Id#13, value#14, mgrId#15, name#16, df2Id#47, d3#51 AS
> d31#109]
>
> :  +- Join Inner, (df1Id#13 = fid#49)
>
> : :- Project [_1#6 AS df1Id#13, _2#7 AS value#14, _3#8 AS mgrId#15,
> _4#9 AS name#16, _5#10 AS d1#17, _6#11 AS d2#18]
>
> : :  +- LocalRelation [_1#6, _2#7, _3#8, _4#9, _5#10, _6#11]
>
> : +- Project [_1#40 AS df2Id#47, _2#41 AS value#48, _3#42 AS fId#49,
> _4#43 AS name#50, _5#44 AS d3#51, _6#45 AS d4#52]
>
> :+- LocalRelation [_1#40, _2#41, _3#42, _4#43, _5#44, _6#45]
>
> +- Project [_1#40 AS df2Id#125, _2#41 AS value#126, _3#42 AS fId#127,
> _4#43 AS name#128, _5#44 AS d3#129, _6#45 AS d4#130]
>
>+- LocalRelation [_1#40, _2#41, _3#42, _4#43, _5#44, _6#45]
>
>
> at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.
> failAnalysis(CheckAnalysis.scala:40)
>
> at
>
>
> As you can see "fid" is present but spark is looking for fid#49 while
> there is another one fid#127.
>
> Physical Plan of original df2 is
>
> == Physical Plan ==
>
> LocalTableScan [df2Id#47, value#48, fId#49, name#50, d3#51, d4#52]
>
>
> But by looking at physical plan looks like there are multiple versions of
> 'fid' gets generated (fid#49, fid#127).
>
> Here's the full code.
>
>
> Code:
>
> val seq1 = Seq(
>
> (1,"a",1,"bla", "2014-01-01 00:00:00", "2014-01-01 00:00:00"),
>
> (2,"a",0,"bla", "2014-01-01 00:00:00", "2014-09-12 18:55:43"),
>
> (3,"a",2,"bla", "2000-12-01 00:00:00", "2000-01-01 00:00:00"),
>
> (4,"bb",1,"bla", "2014-01-01 00:00:00", "2014-01-01 00:00:00"),
>
> (5,"bb",2,"bla", "2014-01-01 00:00:00", "2014-01-01 00:00:00"),
>
> (6,"bb",0,"bla", "2014-01-01 00:00:00", "2014-01-01 00:00:00"))
>
> //val rdd1 = spark.sparkContext.parallelize(seq1)
>
> val df1= seq1.toDF("id","value","mgrId", "name", "d1", "d2")
>
> df1.show()
>
>
>
> val seq2 = Seq(
>
> (1,"a1",1,"duh", "2014-01-01 00:00:00", "2014-01-01 00:00:00"),
>
> (2,"a2",1,"duh", "2014-01-01 00:00:00", "2014-09-12 18:55:43"),
>
> (3,"a3",2,"jah", "2000-12-01 00:00:00", "2000-01-01 00:00:00"),
>
> (4,"a4",3,"duh", "2014-01-01 00:00:00", "2014-01-01 00:00:00"),
>
> (5,"a5",4,"jah", "2014-01-01 00:00:00", "2014-01-01 00:00:00"),
>
> (6,"a6",5,"jah", "2014-01-01 00:00:00", "2014-01-01 00:00:00"))
>
>
>
>
>
> val df2 = seq2.toDF("id","value","fId", "name", "d1", "d2")
>
> df2.explain()
>
> df2.show()
>
>
>
> val join1 = df1
>
>   .join(df2,
>
> df1("id") === df2("fid"))
>
>   .select(df1("id"), df1("value"), df1("mgrId"), df1("name"), df2("id"
> ).as("df2id"), df2("fid"), df2("value"))
>
> join1.printSchema()
>
> join1.show()
>
>
>
> val join2 = join1
>
>   .join(df2,
>
>   join1("mgrId") === df2("fid"),
>
>   "left")
>
>.select(join1("id"), join1("value"), join1("mgrId"), join1("name"),
> join1("df2id"),
>
>join1("fid"), df2("fid").as("df2fid"))
>
> join2.printSchema()
>
> join2.show()
>
>
>
>
>
>
>

-- 


 <http://www.xactlycorp.com/email-click/>

 
<https://www.instagram.com/xactlycorp/>   
<https://www.linkedin.com/company/xactly-corporation>   
<https://twitter.com/Xactly>   <https://www.facebook.com/XactlyCorp>   
<http://www.youtube.com/xactlycorporation>


Mulitple joins with same Dataframe throws Ambiguous reference error

2018-07-19 Thread Nirav Patel
I am getting attribute missing error after joining dataframe 'df2' twice .

Exception in thread "main" org.apache.spark.sql.AnalysisException: resolved
attribute(s) *fid#49 *missing from
value#14,value#126,mgrId#15,name#16,d31#109,df2Id#125,df2Id#47,d4#130,d3#129,df1Id#13,name#128,
*fId#127* in operator !Join LeftOuter, (mgrId#15 = fid#49);;

!Join LeftOuter, (mgrId#15 = fid#49)

:- Project [df1Id#13, value#14, mgrId#15, name#16, df2Id#47, d3#51 AS
d31#109]

:  +- Join Inner, (df1Id#13 = fid#49)

: :- Project [_1#6 AS df1Id#13, _2#7 AS value#14, _3#8 AS mgrId#15,
_4#9 AS name#16, _5#10 AS d1#17, _6#11 AS d2#18]

: :  +- LocalRelation [_1#6, _2#7, _3#8, _4#9, _5#10, _6#11]

: +- Project [_1#40 AS df2Id#47, _2#41 AS value#48, _3#42 AS fId#49,
_4#43 AS name#50, _5#44 AS d3#51, _6#45 AS d4#52]

:+- LocalRelation [_1#40, _2#41, _3#42, _4#43, _5#44, _6#45]

+- Project [_1#40 AS df2Id#125, _2#41 AS value#126, _3#42 AS fId#127, _4#43
AS name#128, _5#44 AS d3#129, _6#45 AS d4#130]

   +- LocalRelation [_1#40, _2#41, _3#42, _4#43, _5#44, _6#45]


at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(
CheckAnalysis.scala:40)

at


As you can see "fid" is present but spark is looking for fid#49 while there
is another one fid#127.

Physical Plan of original df2 is

== Physical Plan ==

LocalTableScan [df2Id#47, value#48, fId#49, name#50, d3#51, d4#52]


But by looking at physical plan looks like there are multiple versions of
'fid' gets generated (fid#49, fid#127).

Here's the full code.


Code:

val seq1 = Seq(

(1,"a",1,"bla", "2014-01-01 00:00:00", "2014-01-01 00:00:00"),

(2,"a",0,"bla", "2014-01-01 00:00:00", "2014-09-12 18:55:43"),

(3,"a",2,"bla", "2000-12-01 00:00:00", "2000-01-01 00:00:00"),

(4,"bb",1,"bla", "2014-01-01 00:00:00", "2014-01-01 00:00:00"),

(5,"bb",2,"bla", "2014-01-01 00:00:00", "2014-01-01 00:00:00"),

(6,"bb",0,"bla", "2014-01-01 00:00:00", "2014-01-01 00:00:00"))

//val rdd1 = spark.sparkContext.parallelize(seq1)

val df1= seq1.toDF("id","value","mgrId", "name", "d1", "d2")

df1.show()



val seq2 = Seq(

(1,"a1",1,"duh", "2014-01-01 00:00:00", "2014-01-01 00:00:00"),

(2,"a2",1,"duh", "2014-01-01 00:00:00", "2014-09-12 18:55:43"),

(3,"a3",2,"jah", "2000-12-01 00:00:00", "2000-01-01 00:00:00"),

(4,"a4",3,"duh", "2014-01-01 00:00:00", "2014-01-01 00:00:00"),

(5,"a5",4,"jah", "2014-01-01 00:00:00", "2014-01-01 00:00:00"),

(6,"a6",5,"jah", "2014-01-01 00:00:00", "2014-01-01 00:00:00"))





val df2 = seq2.toDF("id","value","fId", "name", "d1", "d2")

df2.explain()

df2.show()



val join1 = df1

  .join(df2,

df1("id") === df2("fid"))

  .select(df1("id"), df1("value"), df1("mgrId"), df1("name"), df2("id").
as("df2id"), df2("fid"), df2("value"))

join1.printSchema()

join1.show()



val join2 = join1

  .join(df2,

  join1("mgrId") === df2("fid"),

  "left")

   .select(join1("id"), join1("value"), join1("mgrId"), join1("name"),
join1("df2id"),

   join1("fid"), df2("fid").as("df2fid"))

join2.printSchema()

join2.show()

-- 


 

 
   
   
      



Re: Dataframe from partitioned parquet table missing partition columns from schema

2018-07-17 Thread Nirav Patel
Just found out that I need following option while reading:

.option("basePath", "hdfs://localhost:9000/ptest/")


https://stackoverflow.com/questions/43192940/why-is-partition-key-column-missing-from-dataframe



On Tue, Jul 17, 2018 at 3:48 PM, Nirav Patel  wrote:

> I created a hive table with parquet storage using sparkSql. Now in hive
> cli when I do describe and Select I can see partition columns in both as
> regular columns as well as partition column. However if I try to do same in
> sparkSql (Dataframe) I don't see partition columns.
>
> I need to do projection on partition column in spark. How do I do that now?
>

-- 


 <http://www.xactlycorp.com/email-click/>

 
<https://www.instagram.com/xactlycorp/>   
<https://www.linkedin.com/company/xactly-corporation>   
<https://twitter.com/Xactly>   <https://www.facebook.com/XactlyCorp>   
<http://www.youtube.com/xactlycorporation>


Dataframe from partitioned parquet table missing partition columns from schema

2018-07-17 Thread Nirav Patel
I created a hive table with parquet storage using sparkSql. Now in hive cli
when I do describe and Select I can see partition columns in both as
regular columns as well as partition column. However if I try to do same in
sparkSql (Dataframe) I don't see partition columns.

I need to do projection on partition column in spark. How do I do that now?

-- 


 

 
   
   
      



Dataset - withColumn and withColumnRenamed that accept Column type

2018-07-13 Thread Nirav Patel
Is there a version of withColumn or withColumnRenamed that accept Column
instead of String? That way I can specify FQN in case when there is
duplicate column names.

I can Drop column based on Column type argument then why can't I rename
them based on same type argument.

Use case is, I have Dataframe with duplicate columns at end of the join.
Most of the time I drop duplicate but now I need to rename one of those
column. I can not do it because there is no API that . I can rename it
before the join but that is not preferred.


def
withColumn(colName: String, col: Column): DataFrame
Returns a new Dataset by adding a column or replacing the existing column
that has the same name.

def
withColumnRenamed(existingName: String, newName: String): DataFrame
Returns a new Dataset with a column renamed.



I think there should also be this one:

def
withColumnRenamed(existingName: *Column*, newName: *Column*): DataFrame
Returns a new Dataset with a column renamed.

-- 


 

 
   
   
      



Re: How to avoid duplicate column names after join with multiple conditions

2018-07-12 Thread Nirav Patel
Hi Prem, dropping column, renaming column are working for me as a
workaround. I thought it just nice to have generic api that can handle that
for me. or some intelligence that since both columns are same it shouldn't
complain in subsequent Select clause that it doesn't know if I mean a#12 or
a#81. They are both same just pick one.

On Thu, Jul 12, 2018 at 9:38 AM, Prem Sure  wrote:

> Hi Nirav, did you try
> .drop(df1(a) after join
>
> Thanks,
> Prem
>
> On Thu, Jul 12, 2018 at 9:50 PM Nirav Patel  wrote:
>
>> Hi Vamshi,
>>
>> That api is very restricted and not generic enough. It imposes that all
>> conditions of joins has to have same column on both side and it also has to
>> be equijoin. It doesn't serve my usecase where some join predicates don't
>> have same column names.
>>
>> Thanks
>>
>> On Sun, Jul 8, 2018 at 7:39 PM, Vamshi Talla 
>> wrote:
>>
>>> Nirav,
>>>
>>> Spark does not create a duplicate column when you use the below join
>>> expression,  as an array of column(s) like below but that requires the
>>> column name to be same in both the data frames.
>>>
>>> Example: *df1.join(df2, [‘a’])*
>>>
>>> Thanks.
>>> Vamshi Talla
>>>
>>> On Jul 6, 2018, at 4:47 PM, Gokula Krishnan D 
>>> wrote:
>>>
>>> Nirav,
>>>
>>> withColumnRenamed() API might help but it does not different column and
>>> renames all the occurrences of the given column. either use select() API
>>> and rename as you want.
>>>
>>>
>>>
>>> Thanks & Regards,
>>> Gokula Krishnan* (Gokul)*
>>>
>>> On Mon, Jul 2, 2018 at 5:52 PM, Nirav Patel 
>>> wrote:
>>>
>>>> Expr is `df1(a) === df2(a) and df1(b) === df2(c)`
>>>>
>>>> How to avoid duplicate column 'a' in result? I don't see any api that
>>>> combines both. Rename manually?
>>>>
>>>>
>>>>
>>>> [image: What's New with Xactly]
>>>> <https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fwww.xactlycorp.com%2Femail-click%2F=02%7C01%7C%7C8ab8d95c23f44dfb156708d5e381c938%7C84df9e7fe9f640afb435%7C1%7C0%7C636665068928877949=p4D%2FKz%2B%2Fd8wWFg9EGtNMRNcnYk5LlZmjQKx0TeWleDE%3D=0>
>>>>
>>>>
>>>> <https://nam05.safelinks.protection.outlook.com/?url=https%3A%2F%2Fwww.instagram.com%2Fxactlycorp%2F=02%7C01%7C%7C8ab8d95c23f44dfb156708d5e381c938%7C84df9e7fe9f640afb435%7C1%7C0%7C636665068929034245=wtbLs3%2FABfsz8b1vN46EOcI22VZE1T5bhqOi9l1NFT0%3D=0>
>>>>
>>>> <https://nam05.safelinks.protection.outlook.com/?url=https%3A%2F%2Fwww.linkedin.com%2Fcompany%2Fxactly-corporation=02%7C01%7C%7C8ab8d95c23f44dfb156708d5e381c938%7C84df9e7fe9f640afb435%7C1%7C0%7C636665068929034245=vyQkePM9Y3zG94CKUFJNtuAcEk6M60AtvhOjsHxBhbY%3D=0>
>>>>
>>>> <https://nam05.safelinks.protection.outlook.com/?url=https%3A%2F%2Ftwitter.com%2FXactly=02%7C01%7C%7C8ab8d95c23f44dfb156708d5e381c938%7C84df9e7fe9f640afb435%7C1%7C0%7C636665068929034245=tRidhL1X4x4TPWdUHfH8%2Bcw8r7MT9jrRh1%2BJyU0LGCg%3D=0>
>>>>
>>>> <https://nam05.safelinks.protection.outlook.com/?url=https%3A%2F%2Fwww.facebook.com%2FXactlyCorp=02%7C01%7C%7C8ab8d95c23f44dfb156708d5e381c938%7C84df9e7fe9f640afb435%7C1%7C0%7C636665068929034245=kh0aKmjvcG1ox5%2FMjdI5Ib%2FMvTu4xomFPLUcWDyBir8%3D=0>
>>>>
>>>> <https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fwww.youtube.com%2Fxactlycorporation=02%7C01%7C%7C8ab8d95c23f44dfb156708d5e381c938%7C84df9e7fe9f640afb435%7C1%7C0%7C636665068929034245=sicYYnUCmLBbOnUpu2v3Mp7btkt%2FEGWVMHHC%2BqFIenE%3D=0>
>>>
>>>
>>>
>>>
>>
>>
>>
>> [image: What's New with Xactly] <http://www.xactlycorp.com/email-click/>
>>
>> <https://www.instagram.com/xactlycorp/>
>> <https://www.linkedin.com/company/xactly-corporation>
>> <https://twitter.com/Xactly>   <https://www.facebook.com/XactlyCorp>
>> <http://www.youtube.com/xactlycorporation>
>
>

-- 


 <http://www.xactlycorp.com/email-click/>

 
<https://www.instagram.com/xactlycorp/>   
<https://www.linkedin.com/company/xactly-corporation>   
<https://twitter.com/Xactly>   <https://www.facebook.com/XactlyCorp>   
<http://www.youtube.com/xactlycorporation>


Re: How to avoid duplicate column names after join with multiple conditions

2018-07-12 Thread Nirav Patel
Hi Vamshi,

That api is very restricted and not generic enough. It imposes that all
conditions of joins has to have same column on both side and it also has to
be equijoin. It doesn't serve my usecase where some join predicates don't
have same column names.

Thanks

On Sun, Jul 8, 2018 at 7:39 PM, Vamshi Talla  wrote:

> Nirav,
>
> Spark does not create a duplicate column when you use the below join
> expression,  as an array of column(s) like below but that requires the
> column name to be same in both the data frames.
>
> Example: *df1.join(df2, [‘a’])*
>
> Thanks.
> Vamshi Talla
>
> On Jul 6, 2018, at 4:47 PM, Gokula Krishnan D  wrote:
>
> Nirav,
>
> withColumnRenamed() API might help but it does not different column and
> renames all the occurrences of the given column. either use select() API
> and rename as you want.
>
>
>
> Thanks & Regards,
> Gokula Krishnan* (Gokul)*
>
> On Mon, Jul 2, 2018 at 5:52 PM, Nirav Patel  wrote:
>
>> Expr is `df1(a) === df2(a) and df1(b) === df2(c)`
>>
>> How to avoid duplicate column 'a' in result? I don't see any api that
>> combines both. Rename manually?
>>
>>
>>
>> [image: What's New with Xactly]
>> <https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fwww.xactlycorp.com%2Femail-click%2F=02%7C01%7C%7C8ab8d95c23f44dfb156708d5e381c938%7C84df9e7fe9f640afb435%7C1%7C0%7C636665068928877949=p4D%2FKz%2B%2Fd8wWFg9EGtNMRNcnYk5LlZmjQKx0TeWleDE%3D=0>
>>
>>
>> <https://nam05.safelinks.protection.outlook.com/?url=https%3A%2F%2Fwww.instagram.com%2Fxactlycorp%2F=02%7C01%7C%7C8ab8d95c23f44dfb156708d5e381c938%7C84df9e7fe9f640afb435%7C1%7C0%7C636665068929034245=wtbLs3%2FABfsz8b1vN46EOcI22VZE1T5bhqOi9l1NFT0%3D=0>
>>
>> <https://nam05.safelinks.protection.outlook.com/?url=https%3A%2F%2Fwww.linkedin.com%2Fcompany%2Fxactly-corporation=02%7C01%7C%7C8ab8d95c23f44dfb156708d5e381c938%7C84df9e7fe9f640afb435%7C1%7C0%7C636665068929034245=vyQkePM9Y3zG94CKUFJNtuAcEk6M60AtvhOjsHxBhbY%3D=0>
>>
>> <https://nam05.safelinks.protection.outlook.com/?url=https%3A%2F%2Ftwitter.com%2FXactly=02%7C01%7C%7C8ab8d95c23f44dfb156708d5e381c938%7C84df9e7fe9f640afb435%7C1%7C0%7C636665068929034245=tRidhL1X4x4TPWdUHfH8%2Bcw8r7MT9jrRh1%2BJyU0LGCg%3D=0>
>>
>> <https://nam05.safelinks.protection.outlook.com/?url=https%3A%2F%2Fwww.facebook.com%2FXactlyCorp=02%7C01%7C%7C8ab8d95c23f44dfb156708d5e381c938%7C84df9e7fe9f640afb435%7C1%7C0%7C636665068929034245=kh0aKmjvcG1ox5%2FMjdI5Ib%2FMvTu4xomFPLUcWDyBir8%3D=0>
>>
>> <https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fwww.youtube.com%2Fxactlycorporation=02%7C01%7C%7C8ab8d95c23f44dfb156708d5e381c938%7C84df9e7fe9f640afb435%7C1%7C0%7C636665068929034245=sicYYnUCmLBbOnUpu2v3Mp7btkt%2FEGWVMHHC%2BqFIenE%3D=0>
>
>
>
>

-- 


 <http://www.xactlycorp.com/email-click/>

 
<https://www.instagram.com/xactlycorp/>   
<https://www.linkedin.com/company/xactly-corporation>   
<https://twitter.com/Xactly>   <https://www.facebook.com/XactlyCorp>   
<http://www.youtube.com/xactlycorporation>


Dataframe multiple joins with same dataframe not able to resolve correct join columns

2018-07-11 Thread Nirav Patel
I am trying to joind df1 with df2 and result of which to again with df2.

df is a common dataframe.

val df3 = df1
  .join(*df2*,
  df1("PARTICIPANT_ID") === df2("PARTICIPANT_ID") and
  df1("BUSINESS_ID") === df2("BUSINESS_ID"))
  .drop(df1("BUSINESS_ID")) //dropping duplicates
  .drop(df1("PARTICIPANT_ID")) //dropping duplicates
  .select("EMPLOYEE_ID",...)

val df4 = df3
  .join(*df2*,
  df3("EMPLOYEE_ID") === df2("EMPLOYEE_ID") and
  df3("BUSINESS_ID") === df2("BUSINESS_ID"))
  .drop(df2("BUSINESS_ID")) //dropping duplicates
  .drop(df2("EMPLOYEE_ID")) //dropping duplicates
  .select(...)


I am getting following warning and most likely its an Cartesian join which
is not what I want.
14:30:32.193 12262 [main] WARN   org.apache.spark.sql.Column - *Constructing
trivially true equals predicate*, 'EMPLOYEE_ID#83 = EMPLOYEE_ID#83'.
Perhaps you need to use aliases.

14:30:32.195 12264 [main] WARN   org.apache.spark.sql.Column - Constructing
trivially true equals predicate, 'BUSINESS_ID#312 = BUSINESS_ID#312'.
Perhaps you need to use aliases.

As you can see,  one of my Join predicate is converted to "(EMPLOYEE_ID#83
= EMPLOYEE_ID#83)"  I think this should be okay because they should still
be columns from different dataframe (df3 and df2).

Just want to confirm that this warning is harmless in this scenario.

Problem is similar to this one:
https://stackoverflow.com/questions/32190828/spark-sql-performing-carthesian-join-instead-of-inner-join

-- 


 

 
   
   
      



Dataframe joins - UnsupportedOperationException: Unimplemented type: IntegerType

2018-07-09 Thread Nirav Patel
I am getting following error after performing joins between 2 dataframe. It
happens on call to .show() method. I assume it's an issue with incompatible
type but it's been really hard to identify which column of which dataframe
have that incompatibility.
Any pointers?


11:06:10.304 13700 [Executor task launch worker for task 16] WARN
 o.a.s.s.e.datasources.FileScanRDD - Skipped the rest of the content in the
corrupted file: path:
maprfs:///user/hive/warehouse/analytics.db/myTable/BUSINESS_ID=123/part-0-b01dbc82-9bc3-43c5-89c6-4c9b2d407106.c000.snappy.parquet,
range: 0-14248, partition values: [1085]
java.lang.UnsupportedOperationException: Unimplemented type: IntegerType
at
org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBinaryBatch(VectorizedColumnReader.java:431)
at
org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:203)
at
org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:230)
at
org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:137)
at
org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anon$2.getNext(FileScanRDD.scala:154)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:105)
at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:177)
at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:105)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown
Source)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
Source)
at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

-- 


 

 
   
   
      



How to avoid duplicate column names after join with multiple conditions

2018-07-02 Thread Nirav Patel
Expr is `df1(a) === df2(a) and df1(b) === df2(c)`

How to avoid duplicate column 'a' in result? I don't see any api that
combines both. Rename manually?

-- 


 

 
   
   
      



Spark sql creating managed table with location converts it to external table

2018-06-22 Thread Nirav Patel
http://www.gatorsmile.io/table-types-in-spark-external-or-managed/

"We do not allow users to create a MANAGED table with the users supplied
LOCATION."

Is this supposed to get resolved in 2.2 ?

Thanks

-- 


 

 
   
   
      



Re: Sharing spark executor pool across multiple long running spark applications

2018-02-10 Thread Nirav Patel
I did take a look at SJC earlier. It does look like fits oure use case. It
seems to integrated in Datastax too. Apache Livy looks promising as well. I
will look into these further.
I think for real-time app that needs subsecond latency, spark dynamic
allocation won't work.

Thanks!

On Wed, Feb 7, 2018 at 6:37 AM, Vadim Semenov <va...@datadoghq.com> wrote:

> The other way might be to launch a single SparkContext and then run jobs
> inside of it.
>
> You can take a look at these projects:
> - https://github.com/spark-jobserver/spark-jobserver#
> persistent-context-mode---faster--required-for-related-jobs
> - http://livy.incubator.apache.org
>
> Problems with this way:
> - Can't update the code of your job.
> - A single job can break the SparkContext.
>
>
> We evaluated this way and decided to go with the dynamic allocation,
> but we also had to rethink the way we write our jobs:
> - Can't use caching since it locks executors, have to use checkpointing,
> which adds up to computation time.
> - Use some unconventional methods like reusing the same DF to write out
> multiple separate things in one go.
> - Sometimes remove executors from within the job, like when we know how
> many we would need, so the executors could join other jobs.
>
> On Tue, Feb 6, 2018 at 3:00 PM, Nirav Patel <npa...@xactlycorp.com> wrote:
>
>> Currently sparkContext and it's executor pool is not shareable. Each
>> spakContext gets its own executor pool for entire life of an application.
>> So what is the best ways to share cluster resources across multiple long
>> running spark applications?
>>
>> Only one I see is spark dynamic allocation but it has high latency when
>> it comes to real-time application.
>>
>>
>>
>>
>> [image: What's New with Xactly] <http://www.xactlycorp.com/email-click/>
>>
>> <https://www.instagram.com/xactlycorp/>
>> <https://www.linkedin.com/company/xactly-corporation>
>> <https://twitter.com/Xactly>   <https://www.facebook.com/XactlyCorp>
>> <http://www.youtube.com/xactlycorporation>
>
>
>

-- 


[image: What's New with Xactly] <http://www.xactlycorp.com/email-click/>

<https://www.instagram.com/xactlycorp/>   
<https://www.linkedin.com/company/xactly-corporation>   
<https://twitter.com/Xactly>   <https://www.facebook.com/XactlyCorp>   
<http://www.youtube.com/xactlycorporation>


Sharing spark executor pool across multiple long running spark applications

2018-02-06 Thread Nirav Patel
Currently sparkContext and it's executor pool is not shareable. Each
spakContext gets its own executor pool for entire life of an application.
So what is the best ways to share cluster resources across multiple long
running spark applications?

Only one I see is spark dynamic allocation but it has high latency when it
comes to real-time application.

-- 


[image: What's New with Xactly] 

   
   
      



Project tungsten phase2 - SIMD and columnar in-memory storage

2017-06-29 Thread Nirav Patel
I read following future optimizations in Tungsten on databricks site.
https://databricks.com/blog/2015/04/28/project-tungsten-bringing-spark-closer-to-bare-metal.html

"There are also a handful of longer term possibilities for Tungsten. In
particular, we plan to investigate compilation to LLVM or OpenCL, so Spark
applications can leverage SSE/SIMD instructions out of modern CPUs and the
wide parallelism in GPUs to speed up operations in machine learning and
graph computation."

Are these optimization part of spark 2.x or on roadmap?

Thanks

-- 


[image: What's New with Xactly] 

  [image: LinkedIn] 
  [image: Twitter] 
  [image: Facebook] 
  [image: YouTube] 



Re: Spark standalone , client mode. How do I monitor?

2017-06-29 Thread Nirav Patel
you can use ganglia, ambari or nagios to monitor spark workers/masters.
Spark executors are resilient. There are may proprietary software companies
as well that just do hadoop application monitoring.



On Tue, Jun 27, 2017 at 5:03 PM, anna stax  wrote:

> Hi all,
>
> I have a spark standalone cluster. I am running a spark streaming
> application on it and the deploy mode is client. I am looking for the best
> way to monitor the cluster and application so that I will know when the
> application/cluster is down. I cannot move to cluster deploy mode now.
>
> I appreciate your thoughts.
>
> Thanks
> -Anna
>

-- 


[image: What's New with Xactly] 

  [image: LinkedIn] 
  [image: Twitter] 
  [image: Facebook] 
  [image: YouTube] 



Re: Does spark support Apache Arrow

2017-06-29 Thread Nirav Patel
Kwon,

Isn't that JIRA is part of integration with Arrow.  As far as arrow as
in-memory store goes it probably conflicts with spark's own tungsten memory
representation, right?

Thanks
Nir

On Thu, May 19, 2016 at 8:03 PM, Hyukjin Kwon  wrote:

> FYI, there is a JIRA for this, https://issues.apache.
> org/jira/browse/SPARK-13534
>
> I hope this link is helpful.
>
> Thanks!
>
>
> 2016-05-20 11:18 GMT+09:00 Sun Rui :
>
>> 1. I don’t think so
>> 2. Arrow is for in-memory columnar execution. While cache is for
>> in-memory columnar storage
>>
>> On May 20, 2016, at 10:16, Todd  wrote:
>>
>> From the official site http://arrow.apache.org/, Apache Arrow is used
>> for Columnar In-Memory storage. I have two quick questions:
>> 1. Does spark support Apache Arrow?
>> 2. When dataframe is cached in memory, the data are saved in columnar
>> in-memory style. What is the relationship between this feature and Apache
>> Arrow,that is,
>> when the data is in Apache Arrow format,does spark still need the effort
>> to cache the dataframe in columnar in-memory?
>>
>>
>>
>

-- 


[image: What's New with Xactly] 

  [image: LinkedIn] 
  [image: Twitter] 
  [image: Facebook] 
  [image: YouTube] 



Re: Apache Arrow + Spark examples?

2017-06-29 Thread Nirav Patel
bump.

I have same question at Petr. SPARK-13534 seem to only solve
de(serialization) issue involved between rdd and python objects. However,
isn't Arrow can be standard  for in-memory columnar representation? may be
alternative to spark current in-memory store (k-v blocks or tungsten)

Thanks
Nir

On Wed, Feb 24, 2016 at 3:56 AM, Petr Novak  wrote:

>
> How Arrows collide with Tungsten and its binary in-memory format. It will
> still has to convert between them. I assume they use similar
> concepts/layout hence it is likely the conversion can be quite efficient.
> Or is there a change that the current Tungsten in memory format would be
> replaced by Arrows in the future. The same applies for Impala, Drill and
> all others. Is the goal to unify internal in-memory representation for all
> of them or the benefit is going to be in conversions faster by e.g. order
> of magnitude?
>
> Many thanks for any explanation,
> Petr
>

-- 


[image: What's New with Xactly] 

  [image: LinkedIn] 
  [image: Twitter] 
  [image: Facebook] 
  [image: YouTube] 



Re: DataFrameWriter - Where to find list of Options applicable to particular format(datasource)

2017-03-14 Thread Nirav Patel
Thanks Kwon. Goal is to preserve whitespace. Not to alter data in general
or do it with user provided options. It's causing our downstream jobs to
fail.


On Mon, Mar 13, 2017 at 7:23 PM, Hyukjin Kwon <gurwls...@gmail.com> wrote:

> Hi, all the options are documented in https://spark.apache.org/
> docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameWriter
>
> It seems we don't have both options for writing. If the goal is trimming
> the whitespaces, I think we could do this within dataframe operations (as
> we talked in the JIRA - https://issues.apache.org/jira/browse/SPARK-18579
> ).
>
>
>
> 2017-03-14 9:20 GMT+09:00 Nirav Patel <npa...@xactlycorp.com>:
>
>> Hi,
>>
>> Is there a document for each datasource (csv, tsv, parquet, json, avro)
>> with available options ?  I need to find one for csv to
>> "ignoreLeadingWhiteSpace" and "ignoreTrailingWhiteSpace"
>>
>> Thanks
>>
>>
>>
>> [image: What's New with Xactly] <http://www.xactlycorp.com/email-click/>
>>
>> <https://www.nyse.com/quote/XNYS:XTLY>  [image: LinkedIn]
>> <https://www.linkedin.com/company/xactly-corporation>  [image: Twitter]
>> <https://twitter.com/Xactly>  [image: Facebook]
>> <https://www.facebook.com/XactlyCorp>  [image: YouTube]
>> <http://www.youtube.com/xactlycorporation>
>
>
>

-- 


[image: What's New with Xactly] <http://www.xactlycorp.com/email-click/>

<https://www.nyse.com/quote/XNYS:XTLY>  [image: LinkedIn] 
<https://www.linkedin.com/company/xactly-corporation>  [image: Twitter] 
<https://twitter.com/Xactly>  [image: Facebook] 
<https://www.facebook.com/XactlyCorp>  [image: YouTube] 
<http://www.youtube.com/xactlycorporation>


Re: Monitoring ongoing Spark Job when run in Yarn Cluster mode

2017-03-13 Thread Nirav Patel
I think it would be on port 4040 by default on the Node where driver is
running. You should be able to navigate to that via Resource Manager's
application master link as in cluster mode both AM and driver runs on same
node.


On Mon, Mar 13, 2017 at 6:53 AM, Sourav Mazumder <
sourav.mazumde...@gmail.com> wrote:

> Hi,
>
> Is there a way to monitor an ongoing Spark Job when running in Yarn
> Cluster mode ?
>
> In  my understanding in Yarn Cluster mode Spark Monitoring UI for the
> ongoing job would not be available in 4040 port. So is there an alternative
> ?
>
> Regards,
> Sourav
>

-- 


[image: What's New with Xactly] 

  [image: LinkedIn] 
  [image: Twitter] 
  [image: Facebook] 
  [image: YouTube] 



DataFrameWriter - Where to find list of Options applicable to particular format(datasource)

2017-03-13 Thread Nirav Patel
Hi,

Is there a document for each datasource (csv, tsv, parquet, json, avro)
with available options ?  I need to find one for csv to
"ignoreLeadingWhiteSpace" and "ignoreTrailingWhiteSpace"

Thanks

-- 


[image: What's New with Xactly] 

  [image: LinkedIn] 
  [image: Twitter] 
  [image: Facebook] 
  [image: YouTube] 



DataframeWriter - How to change filename extension

2017-02-22 Thread Nirav Patel
Hi,

I am writing Dataframe as TSV using DataframeWriter as follows:

myDF.write.mode("overwrite").option("sep","\t").csv("/out/path")

Problem is all part files have .csv extension instead of .tsv as follows:
part-r-00012-f9f06712-1648-4eb6-985b-8a9c79267eef.csv

All the records are stored in TSV format though.

How can I change extension as well to .tsv instead?

Thanks

-- 


[image: What's New with Xactly] 

  [image: LinkedIn] 
  [image: Twitter] 
  [image: Facebook] 
  [image: YouTube] 



Re: Dynamic Allocation not respecting spark.executor.cores

2017-01-04 Thread Nirav Patel
If this is not an expected behavior then its should be logged as an issue.

On Tue, Jan 3, 2017 at 2:51 PM, Nirav Patel <npa...@xactlycorp.com> wrote:

> When enabling dynamic scheduling I see that all executors are using only 1
> core even if I specify "spark.executor.cores" to 6. If dynamic scheduling
> is disable then each executors will have 6 cores. I have tested this
> against spark 1.5 . I wonder if this is the same behavior with 2.x as well.
>
> Thanks
>
>

-- 


[image: What's New with Xactly] <http://www.xactlycorp.com/email-click/>

<https://www.nyse.com/quote/XNYS:XTLY>  [image: LinkedIn] 
<https://www.linkedin.com/company/xactly-corporation>  [image: Twitter] 
<https://twitter.com/Xactly>  [image: Facebook] 
<https://www.facebook.com/XactlyCorp>  [image: YouTube] 
<http://www.youtube.com/xactlycorporation>


Dynamic scheduling not respecting spark.executor.cores

2017-01-03 Thread Nirav Patel
When enabling dynamic scheduling I see that all executors are using only 1
core even if I specify "spark.executor.cores" to 6. If dynamic scheduling
is disable then each executors will have 6 cores. I have tested this
against spark 1.5 . I wonder if this is the same behavior with 2.x as well.

Thanks

-- 


[image: What's New with Xactly] 

  [image: LinkedIn] 
  [image: Twitter] 
  [image: Facebook] 
  [image: YouTube] 



Re: Spark SQL UDF - passing map as a UDF parameter

2016-11-15 Thread Nirav Patel
Thanks. I tried following versions. They both compiles:

val colmap =  map(idxMap.flatMap(en => Iterator(lit(en._1),
lit(en._2))).toSeq: _*)
val colmap = map(idxMap.flatMap(x => x._1 :: x._2 :: Nil).toSeq.map(lit _):
_*)

However they fail on dataframe action like `show` with
org.apache.spark.SparkException:
Task not serializable error. I think it's an issue with interactive mode. I
am using zeppelin.

On Tue, Nov 15, 2016 at 12:38 AM, Takeshi Yamamuro <linguin@gmail.com>
wrote:

> Hi,
>
> Literal cannot handle Tuple2.
> Anyway, how about this?
>
> val rdd = sc.makeRDD(1 to 3).map(i => (i, 0))
> map(rdd.collect.flatMap(x => x._1 :: x._2 :: Nil).map(lit _): _*)
>
> // maropu
>
> On Tue, Nov 15, 2016 at 9:33 AM, Nirav Patel <npa...@xactlycorp.com>
> wrote:
>
>> I am trying to use following API from Functions to convert a map into
>> column so I can pass it to UDF.
>>
>> map(cols: Column
>> <http://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/Column.html>
>> *): Column
>> <http://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/Column.html>
>>
>> "Creates a new map column. The input columns must be grouped as key-value
>> pairs, e.g. (key1, value1, key2, value2, ...). The key columns must all
>> have the same data type, and can't be null. The value columns must all have
>> the same data type."
>>
>>
>> final val idxMap = idxMapRdd.collectAsMap
>> val colmap =  map(idxMapA.map(lit _): _*)
>>
>> But getting following error:
>>
>> :139: error: type mismatch;
>>  found   : Iterable[org.apache.spark.sql.Column]
>>  required: Seq[org.apache.spark.sql.Column]
>>val colmap =  map(idxMapArr.map(lit _): _*)
>>
>>
>> If I try:
>> val colmap =  map(idxMapArr.map(lit _).toSeq: _*)
>>
>> It says:
>>
>> java.lang.RuntimeException: Unsupported literal type class scala.Tuple2
>> (17.0,MBO)
>> at org.apache.spark.sql.catalyst.expressions.Literal$.apply(lit
>> erals.scala:57)
>> at org.apache.spark.sql.functions$.lit(functions.scala:101)
>> at $anonfun$1.apply(:153)
>>
>>
>>
>> What is the correct usage of a `map` api to convert hashmap into column?
>>
>>
>>
>>
>>
>>
>>
>> [image: What's New with Xactly] <http://www.xactlycorp.com/email-click/>
>>
>> <https://www.nyse.com/quote/XNYS:XTLY>  [image: LinkedIn]
>> <https://www.linkedin.com/company/xactly-corporation>  [image: Twitter]
>> <https://twitter.com/Xactly>  [image: Facebook]
>> <https://www.facebook.com/XactlyCorp>  [image: YouTube]
>> <http://www.youtube.com/xactlycorporation>
>
>
>
>
> --
> ---
> Takeshi Yamamuro
>

-- 


[image: What's New with Xactly] <http://www.xactlycorp.com/email-click/>

<https://www.nyse.com/quote/XNYS:XTLY>  [image: LinkedIn] 
<https://www.linkedin.com/company/xactly-corporation>  [image: Twitter] 
<https://twitter.com/Xactly>  [image: Facebook] 
<https://www.facebook.com/XactlyCorp>  [image: YouTube] 
<http://www.youtube.com/xactlycorporation>


Spark SQL UDF - passing map as a UDF parameter

2016-11-14 Thread Nirav Patel
I am trying to use following API from Functions to convert a map into
column so I can pass it to UDF.

map(cols: Column

*): Column


"Creates a new map column. The input columns must be grouped as key-value
pairs, e.g. (key1, value1, key2, value2, ...). The key columns must all
have the same data type, and can't be null. The value columns must all have
the same data type."


final val idxMap = idxMapRdd.collectAsMap
val colmap =  map(idxMapA.map(lit _): _*)

But getting following error:

:139: error: type mismatch;
 found   : Iterable[org.apache.spark.sql.Column]
 required: Seq[org.apache.spark.sql.Column]
   val colmap =  map(idxMapArr.map(lit _): _*)


If I try:
val colmap =  map(idxMapArr.map(lit _).toSeq: _*)

It says:

java.lang.RuntimeException: Unsupported literal type class scala.Tuple2
(17.0,MBO)
at
org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:57)
at org.apache.spark.sql.functions$.lit(functions.scala:101)
at $anonfun$1.apply(:153)



What is the correct usage of a `map` api to convert hashmap into column?

-- 


[image: What's New with Xactly] 

  [image: LinkedIn] 
  [image: Twitter] 
  [image: Facebook] 
  [image: YouTube] 



spark ml - ngram - how to preserve single word (1-gram)

2016-11-08 Thread Nirav Patel
Is it possible to preserve single token while using n-gram feature
transformer?

e.g.

Array("Hi", "I", "heard", "about", "Spark")

Becomes

Array("Hi", "i", "heard", "about", "Spark", "Hi i", "I heard", "heard
about", "about Spark")

Currently if I want to do it I will have to manually transform column first
using current ngram implementation then join 1-gram tokens to each column
value. basically I have to do this outside of pipeline.

-- 


[image: What's New with Xactly] 

  [image: LinkedIn] 
  [image: Twitter] 
  [image: Facebook] 
  [image: YouTube] 



Spark ML - Naive Bayes - how to select Threshold values

2016-11-07 Thread Nirav Patel
Few questions about `thresholds` parameter: This is what doc says "Param
for Thresholds in multi-class classification to adjust the probability of
predicting each class. Array must have length equal to the number of
classes, with values >= 0. The class with largest value p/t is predicted,
where p is the original probability of that class and t is the class'
threshold."

0) How does threshold helps here? My general idea is if you have threshold
0.7 then at least one class prediction probability should be more then 0.7
if not then prediction should return empty. Means classify it as
'uncertain' . How can p/t function going to achieve that?

1) What probability it adjust? default column 'probability' is actually
conditional probability and 'rawPrediction'
confidence . I believe threshold will adjust 'rawPrediction' not
'probability' column. Am I right?

2) Here's how some of my probability and rawPrediction vector look like.
How do I set threshold values based on this
based on this? rawPrediction seems to be on log scale here.

Probability:
[2.233368649314982E-15,1.6429456680945863E-9,1.4377313514127723E-15,7.858651849363202E-15]

rawPrediction:
[-496.9606736723107,-483.452183395287,-497.40111830218746]

Basically I want classifier to leave Prediction column empty if it doesn't
have any probability that is more then 0.7 percent.

Is there any default threshold like 0.5 ? if so on what values it applies
cause  "Probability" and "rawPrediction" don't seem to be between 0 and 1

-- 


[image: What's New with Xactly] 

  [image: LinkedIn] 
  [image: Twitter] 
  [image: Facebook] 
  [image: YouTube] 



Spark ML - Is it rule of thumb that all Estimators should only be Fit on Training data

2016-11-02 Thread Nirav Patel
It is very clear that for ML algorithms (classification, regression) that
Estimator only fits on training data but it's not very clear of other
estimators like IDF for example.
IDF is a feature transformation model but having IDF estimator and
transformer makes it little confusing that what exactly it does in Fitting
on one dataset vs Transforming on another dataset.

-- 


[image: What's New with Xactly] 

  [image: LinkedIn] 
  [image: Twitter] 
  [image: Facebook] 
  [image: YouTube] 



Re: Spark ML - CrossValidation - How to get Evaluation metrics of best model

2016-11-02 Thread Nirav Patel
Thanks!

On Tue, Nov 1, 2016 at 6:30 AM, Sean Owen <so...@cloudera.com> wrote:

> CrossValidator splits the data into k sets, and then trains k times,
> holding out one subset for cross-validation each time. You are correct that
> you should actually withhold an additional test set, before you use
> CrossValidator, in order to get an unbiased estimate of the best model's
> performance.
>
> On Tue, Nov 1, 2016 at 12:10 PM Nirav Patel <npa...@xactlycorp.com> wrote:
>
>> I am running classification model. with normal training-test split I can
>> check model accuracy and F1 score using MulticlassClassificationEvaluator.
>> How can I do this with CrossValidation approach?
>> Afaik, you Fit entire sample data in CrossValidator as you don't want to
>> leave out any observation from either testing or training. But by doing so
>> I don't have anymore unseen data on which I can run finalized model on. So
>> is there a way I can get Accuracy and F1 score of a best model resulted
>> from cross validation?
>> Or should I still split sample data in to training and test before
>> running cross validation against only training data? so later I can test it
>> against test data.
>>
>>
>>
>> [image: What's New with Xactly] <http://www.xactlycorp.com/email-click/>
>>
>> <https://www.nyse.com/quote/XNYS:XTLY>  [image: LinkedIn]
>> <https://www.linkedin.com/company/xactly-corporation>  [image: Twitter]
>> <https://twitter.com/Xactly>  [image: Facebook]
>> <https://www.facebook.com/XactlyCorp>  [image: YouTube]
>> <http://www.youtube.com/xactlycorporation>
>
>

-- 


[image: What's New with Xactly] <http://www.xactlycorp.com/email-click/>

<https://www.nyse.com/quote/XNYS:XTLY>  [image: LinkedIn] 
<https://www.linkedin.com/company/xactly-corporation>  [image: Twitter] 
<https://twitter.com/Xactly>  [image: Facebook] 
<https://www.facebook.com/XactlyCorp>  [image: YouTube] 
<http://www.youtube.com/xactlycorporation>


Re: Spark ML - Is IDF model reusable

2016-11-01 Thread Nirav Patel
Cool!

So going back to IDF Estimator and Model problem, do you know what an IDF
estimator really does during Fitting process? It must be storing some state
(information) as I mentioned in OP (|D|, DF|t, D| and perhaps TF|t, D|)
that it re-uses to Transform test data (labeled data). Or does it just
maintains a map(lookup) of tokens -> IDF score and uses that to lookup
scores for test data tokens.

Here's one possible thought in context of Naive bayes
Fitting IDF model (idf1) generates conditional probability of a
token(feature) . e.g. let's say IDF of term "software" is 4.5 , so it store
a lookup software -> 4.5
Transforming training data using idf1 basically just creates a dataframe
with above conditional probability vectors for each document
Transforming test data using same idf1 uses a lookup generated above to
create conditional probability vectors for each document. e.g. if it
encounter "software" in test data it's IDF value would be just 4.5

Thanks




On Tue, Nov 1, 2016 at 4:09 PM, ayan guha <guha.a...@gmail.com> wrote:

> Yes, that is correct. I think I misread a part of it in terms of
> scoringI think we both are saying same thing so thats a good thing :)
>
> On Wed, Nov 2, 2016 at 10:04 AM, Nirav Patel <npa...@xactlycorp.com>
> wrote:
>
>> Hi Ayan,
>>
>> "classification algorithm will for sure need to Fit against new dataset
>> to produce new model" I said this in context of re-training the model. Is
>> it not correct? Isn't it part of re-training?
>>
>> Thanks
>>
>> On Tue, Nov 1, 2016 at 4:01 PM, ayan guha <guha.a...@gmail.com> wrote:
>>
>>> Hi
>>>
>>> "classification algorithm will for sure need to Fit against new dataset
>>> to produce new model" - I do not think this is correct. Maybe we are
>>> talking semantics but AFAIU, you "train" one model using some dataset, and
>>> then use it for scoring new datasets.
>>>
>>> You may re-train every month, yes. And you may run cross validation once
>>> a month (after re-training) or lower freq like once in 2-3 months to
>>> validate model quality. Here, number of months are not important, but you
>>> must be running cross validation and similar sort of "model evaluation"
>>> work flow typically in lower frequency than Re-Training process.
>>>
>>> On Wed, Nov 2, 2016 at 5:48 AM, Nirav Patel <npa...@xactlycorp.com>
>>> wrote:
>>>
>>>> Hi Ayan,
>>>> After deployment, we might re-train it every month. That is whole
>>>> different problem I have explored yet. classification algorithm will for
>>>> sure need to Fit against new dataset to produce new model. Correct me if I
>>>> am wrong but I think I will also FIt new IDF model based on new dataset. At
>>>> that time as well I will follow same training-validation split (or
>>>> corss-validation) to evaluate model performance on new data before
>>>> releasing it to make prediction. So afik , every time you  need to re-train
>>>> model you will need to corss validate using some data split strategy.
>>>>
>>>> I think spark ML document should start explaining mathematical model or
>>>> simple algorithm what Fit and Transform means for particular algorithm
>>>> (IDF, NaiveBayes)
>>>>
>>>> Thanks
>>>>
>>>> On Tue, Nov 1, 2016 at 5:45 AM, ayan guha <guha.a...@gmail.com> wrote:
>>>>
>>>>> I have come across similar situation recently and decided to run
>>>>> Training  workflow less frequently than scoring workflow.
>>>>>
>>>>> In your use case I would imagine you will run IDF fit workflow once in
>>>>> say a week. It will produce a model object which will be saved. In scoring
>>>>> workflow, you will typically see new unseen dataset and the model 
>>>>> generated
>>>>> in training flow will be used to score or label this new dataset.
>>>>>
>>>>> Note, train and test datasets are used during development phase when
>>>>> you are trying to find out which model to use and
>>>>> efficientcy/performance/accuracy etc. It will never be part of
>>>>> workflow. In a little elaborate setting you may want to automate model
>>>>> evaluations, but that's a different story.
>>>>>
>>>>> Not sure if I could explain properly, please feel free to comment.
>>>>> On 1 Nov 2016 22:54, "Nirav Patel" <npa...@xactlycorp.com> wrote:
>>

Re: Spark ML - Is IDF model reusable

2016-11-01 Thread Nirav Patel
Hi Ayan,

"classification algorithm will for sure need to Fit against new dataset to
produce new model" I said this in context of re-training the model. Is it
not correct? Isn't it part of re-training?

Thanks

On Tue, Nov 1, 2016 at 4:01 PM, ayan guha <guha.a...@gmail.com> wrote:

> Hi
>
> "classification algorithm will for sure need to Fit against new dataset
> to produce new model" - I do not think this is correct. Maybe we are
> talking semantics but AFAIU, you "train" one model using some dataset, and
> then use it for scoring new datasets.
>
> You may re-train every month, yes. And you may run cross validation once a
> month (after re-training) or lower freq like once in 2-3 months to validate
> model quality. Here, number of months are not important, but you must be
> running cross validation and similar sort of "model evaluation" work flow
> typically in lower frequency than Re-Training process.
>
> On Wed, Nov 2, 2016 at 5:48 AM, Nirav Patel <npa...@xactlycorp.com> wrote:
>
>> Hi Ayan,
>> After deployment, we might re-train it every month. That is whole
>> different problem I have explored yet. classification algorithm will for
>> sure need to Fit against new dataset to produce new model. Correct me if I
>> am wrong but I think I will also FIt new IDF model based on new dataset. At
>> that time as well I will follow same training-validation split (or
>> corss-validation) to evaluate model performance on new data before
>> releasing it to make prediction. So afik , every time you  need to re-train
>> model you will need to corss validate using some data split strategy.
>>
>> I think spark ML document should start explaining mathematical model or
>> simple algorithm what Fit and Transform means for particular algorithm
>> (IDF, NaiveBayes)
>>
>> Thanks
>>
>> On Tue, Nov 1, 2016 at 5:45 AM, ayan guha <guha.a...@gmail.com> wrote:
>>
>>> I have come across similar situation recently and decided to run
>>> Training  workflow less frequently than scoring workflow.
>>>
>>> In your use case I would imagine you will run IDF fit workflow once in
>>> say a week. It will produce a model object which will be saved. In scoring
>>> workflow, you will typically see new unseen dataset and the model generated
>>> in training flow will be used to score or label this new dataset.
>>>
>>> Note, train and test datasets are used during development phase when you
>>> are trying to find out which model to use and 
>>> efficientcy/performance/accuracy
>>> etc. It will never be part of workflow. In a little elaborate setting you
>>> may want to automate model evaluations, but that's a different story.
>>>
>>> Not sure if I could explain properly, please feel free to comment.
>>> On 1 Nov 2016 22:54, "Nirav Patel" <npa...@xactlycorp.com> wrote:
>>>
>>>> Yes, I do apply NaiveBayes after IDF .
>>>>
>>>> " you can re-train (fit) on all your data before applying it to unseen
>>>> data." Did you mean I can reuse that model to Transform both training and
>>>> test data?
>>>>
>>>> Here's the process:
>>>>
>>>> Datasets:
>>>>
>>>>1. Full sample data (labeled)
>>>>2. Training (labeled)
>>>>3. Test (labeled)
>>>>4. Unseen (non-labeled)
>>>>
>>>> Here are two workflow options I see:
>>>>
>>>> Option - 1 (currently using)
>>>>
>>>>1. Fit IDF model (idf-1) on full Sample data
>>>>2. Apply(Transform) idf-1 on full sample data
>>>>3. Split data set into Training and Test data
>>>>4. Fit ML model on Training data
>>>>5. Apply(Transform) model on Test data
>>>>6. Apply(Transform) idf-1 on Unseen data
>>>>7. Apply(Transform) model on Unseen data
>>>>
>>>> Option - 2
>>>>
>>>>1. Split sample data into Training and Test data
>>>>2. Fit IDF model (idf-1) only on training data
>>>>3. Apply(Transform) idf-1 on training data
>>>>4. Apply(Transform) idf-1 on test data
>>>>5. Fit ML model on Training data
>>>>6. Apply(Transform) model on Test data
>>>>7. Apply(Transform) idf-1 on Unseen data
>>>>8. Apply(Transform) model on Unseen data
>>>>
>>>> So you are suggesting Option-2 in this particular case, right?
>>>>
&

Re: Spark ML - Is IDF model reusable

2016-11-01 Thread Nirav Patel
Hi Ayan,
After deployment, we might re-train it every month. That is whole different
problem I have explored yet. classification algorithm will for sure need to
Fit against new dataset to produce new model. Correct me if I am wrong but
I think I will also FIt new IDF model based on new dataset. At that time as
well I will follow same training-validation split (or corss-validation) to
evaluate model performance on new data before releasing it to make
prediction. So afik , every time you  need to re-train model you will need
to corss validate using some data split strategy.

I think spark ML document should start explaining mathematical model or
simple algorithm what Fit and Transform means for particular algorithm
(IDF, NaiveBayes)

Thanks

On Tue, Nov 1, 2016 at 5:45 AM, ayan guha <guha.a...@gmail.com> wrote:

> I have come across similar situation recently and decided to run Training
> workflow less frequently than scoring workflow.
>
> In your use case I would imagine you will run IDF fit workflow once in say
> a week. It will produce a model object which will be saved. In scoring
> workflow, you will typically see new unseen dataset and the model generated
> in training flow will be used to score or label this new dataset.
>
> Note, train and test datasets are used during development phase when you
> are trying to find out which model to use and efficientcy/performance/accuracy
> etc. It will never be part of workflow. In a little elaborate setting you
> may want to automate model evaluations, but that's a different story.
>
> Not sure if I could explain properly, please feel free to comment.
> On 1 Nov 2016 22:54, "Nirav Patel" <npa...@xactlycorp.com> wrote:
>
>> Yes, I do apply NaiveBayes after IDF .
>>
>> " you can re-train (fit) on all your data before applying it to unseen
>> data." Did you mean I can reuse that model to Transform both training and
>> test data?
>>
>> Here's the process:
>>
>> Datasets:
>>
>>1. Full sample data (labeled)
>>2. Training (labeled)
>>3. Test (labeled)
>>4. Unseen (non-labeled)
>>
>> Here are two workflow options I see:
>>
>> Option - 1 (currently using)
>>
>>1. Fit IDF model (idf-1) on full Sample data
>>2. Apply(Transform) idf-1 on full sample data
>>3. Split data set into Training and Test data
>>4. Fit ML model on Training data
>>5. Apply(Transform) model on Test data
>>6. Apply(Transform) idf-1 on Unseen data
>>7. Apply(Transform) model on Unseen data
>>
>> Option - 2
>>
>>1. Split sample data into Training and Test data
>>2. Fit IDF model (idf-1) only on training data
>>3. Apply(Transform) idf-1 on training data
>>4. Apply(Transform) idf-1 on test data
>>5. Fit ML model on Training data
>>6. Apply(Transform) model on Test data
>>7. Apply(Transform) idf-1 on Unseen data
>>8. Apply(Transform) model on Unseen data
>>
>> So you are suggesting Option-2 in this particular case, right?
>>
>> On Tue, Nov 1, 2016 at 4:24 AM, Robin East <robin.e...@xense.co.uk>
>> wrote:
>>
>>> Fit it on training data to evaluate the model. You can either use that
>>> model to apply to unseen data or you can re-train (fit) on all your data
>>> before applying it to unseen data.
>>>
>>> fit and transform are 2 different things: fit creates a model, transform
>>> applies a model to data to create transformed output. If you are using your
>>> training data in a subsequent step (e.g. running logistic regression or
>>> some other machine learning algorithm) then you need to transform your
>>> training data using the IDF model before passing it through the next step.
>>>
>>> 
>>> ---
>>> Robin East
>>> *Spark GraphX in Action* Michael Malak and Robin East
>>> Manning Publications Co.
>>> http://www.manning.com/books/spark-graphx-in-action
>>>
>>>
>>>
>>>
>>>
>>> On 1 Nov 2016, at 11:18, Nirav Patel <npa...@xactlycorp.com> wrote:
>>>
>>> Just to re-iterate what you said, I should fit IDF model only on
>>> training data and then re-use it for both test data and then later on
>>> unseen data to make predictions.
>>>
>>> On Tue, Nov 1, 2016 at 3:49 AM, Robin East <robin.e...@xense.co.uk>
>>> wrote:
>>>
>>>> The point of setting aside a portion of your data as a test set is to
>>>> try and mimic applying your mo

Spark ML - CrossValidation - How to get Evaluation metrics of best model

2016-11-01 Thread Nirav Patel
I am running classification model. with normal training-test split I can
check model accuracy and F1 score using MulticlassClassificationEvaluator.
How can I do this with CrossValidation approach?
Afaik, you Fit entire sample data in CrossValidator as you don't want to
leave out any observation from either testing or training. But by doing so
I don't have anymore unseen data on which I can run finalized model on. So
is there a way I can get Accuracy and F1 score of a best model resulted
from cross validation?
Or should I still split sample data in to training and test before running
cross validation against only training data? so later I can test it against
test data.

-- 


[image: What's New with Xactly] 

  [image: LinkedIn] 
  [image: Twitter] 
  [image: Facebook] 
  [image: YouTube] 



Re: Spark ML - Is IDF model reusable

2016-11-01 Thread Nirav Patel
Yes, I do apply NaiveBayes after IDF .

" you can re-train (fit) on all your data before applying it to unseen
data." Did you mean I can reuse that model to Transform both training and
test data?

Here's the process:

Datasets:

   1. Full sample data (labeled)
   2. Training (labeled)
   3. Test (labeled)
   4. Unseen (non-labeled)

Here are two workflow options I see:

Option - 1 (currently using)

   1. Fit IDF model (idf-1) on full Sample data
   2. Apply(Transform) idf-1 on full sample data
   3. Split data set into Training and Test data
   4. Fit ML model on Training data
   5. Apply(Transform) model on Test data
   6. Apply(Transform) idf-1 on Unseen data
   7. Apply(Transform) model on Unseen data

Option - 2

   1. Split sample data into Training and Test data
   2. Fit IDF model (idf-1) only on training data
   3. Apply(Transform) idf-1 on training data
   4. Apply(Transform) idf-1 on test data
   5. Fit ML model on Training data
   6. Apply(Transform) model on Test data
   7. Apply(Transform) idf-1 on Unseen data
   8. Apply(Transform) model on Unseen data

So you are suggesting Option-2 in this particular case, right?

On Tue, Nov 1, 2016 at 4:24 AM, Robin East <robin.e...@xense.co.uk> wrote:

> Fit it on training data to evaluate the model. You can either use that
> model to apply to unseen data or you can re-train (fit) on all your data
> before applying it to unseen data.
>
> fit and transform are 2 different things: fit creates a model, transform
> applies a model to data to create transformed output. If you are using your
> training data in a subsequent step (e.g. running logistic regression or
> some other machine learning algorithm) then you need to transform your
> training data using the IDF model before passing it through the next step.
>
> 
> ---
> Robin East
> *Spark GraphX in Action* Michael Malak and Robin East
> Manning Publications Co.
> http://www.manning.com/books/spark-graphx-in-action
>
>
>
>
>
> On 1 Nov 2016, at 11:18, Nirav Patel <npa...@xactlycorp.com> wrote:
>
> Just to re-iterate what you said, I should fit IDF model only on training
> data and then re-use it for both test data and then later on unseen data to
> make predictions.
>
> On Tue, Nov 1, 2016 at 3:49 AM, Robin East <robin.e...@xense.co.uk> wrote:
>
>> The point of setting aside a portion of your data as a test set is to try
>> and mimic applying your model to unseen data. If you fit your IDF model to
>> all your data, any evaluation you perform on your test set is likely to
>> over perform compared to ‘real’ unseen data. Effectively you would have
>> overfit your model.
>> 
>> ---
>> Robin East
>> *Spark GraphX in Action* Michael Malak and Robin East
>> Manning Publications Co.
>> http://www.manning.com/books/spark-graphx-in-action
>>
>>
>>
>>
>>
>> On 1 Nov 2016, at 10:15, Nirav Patel <npa...@xactlycorp.com> wrote:
>>
>> FYI, I do reuse IDF model while making prediction against new unlabeled
>> data but not between training and test data while training a model.
>>
>> On Tue, Nov 1, 2016 at 3:10 AM, Nirav Patel <npa...@xactlycorp.com>
>> wrote:
>>
>>> I am using IDF estimator/model (TF-IDF) to convert text features into
>>> vectors. Currently, I fit IDF model on all sample data and then transform
>>> them. I read somewhere that I should split my data into training and test
>>> before fitting IDF model; Fit IDF only on training data and then use same
>>> transformer to transform training and test data.
>>> This raise more questions:
>>> 1) Why would you do that? What exactly do IDF learn during fitting
>>> process that it can reuse to transform any new dataset. Perhaps idea is to
>>> keep same value for |D| and DF|t, D| while use new TF|t, D| ?
>>> 2) If not then fitting and transforming seems redundant for IDF model
>>>
>>
>>
>>
>>
>> [image: What's New with Xactly] <http://www.xactlycorp.com/email-click/>
>>
>> <https://www.nyse.com/quote/XNYS:XTLY>  [image: LinkedIn]
>> <https://www.linkedin.com/company/xactly-corporation>  [image: Twitter]
>> <https://twitter.com/Xactly>  [image: Facebook]
>> <https://www.facebook.com/XactlyCorp>  [image: YouTube]
>> <http://www.youtube.com/xactlycorporation>
>>
>>
>>
>
>
>
> [image: What's New with Xactly] <http://www.xactlycorp.com/email-click/>
>
> <https://www.nyse.com/quote/XNYS:XTLY>  [image: LinkedIn]

Re: Spark ML - Is IDF model reusable

2016-11-01 Thread Nirav Patel
Just to re-iterate what you said, I should fit IDF model only on training
data and then re-use it for both test data and then later on unseen data to
make predictions.

On Tue, Nov 1, 2016 at 3:49 AM, Robin East <robin.e...@xense.co.uk> wrote:

> The point of setting aside a portion of your data as a test set is to try
> and mimic applying your model to unseen data. If you fit your IDF model to
> all your data, any evaluation you perform on your test set is likely to
> over perform compared to ‘real’ unseen data. Effectively you would have
> overfit your model.
> 
> ---
> Robin East
> *Spark GraphX in Action* Michael Malak and Robin East
> Manning Publications Co.
> http://www.manning.com/books/spark-graphx-in-action
>
>
>
>
>
> On 1 Nov 2016, at 10:15, Nirav Patel <npa...@xactlycorp.com> wrote:
>
> FYI, I do reuse IDF model while making prediction against new unlabeled
> data but not between training and test data while training a model.
>
> On Tue, Nov 1, 2016 at 3:10 AM, Nirav Patel <npa...@xactlycorp.com> wrote:
>
>> I am using IDF estimator/model (TF-IDF) to convert text features into
>> vectors. Currently, I fit IDF model on all sample data and then transform
>> them. I read somewhere that I should split my data into training and test
>> before fitting IDF model; Fit IDF only on training data and then use same
>> transformer to transform training and test data.
>> This raise more questions:
>> 1) Why would you do that? What exactly do IDF learn during fitting
>> process that it can reuse to transform any new dataset. Perhaps idea is to
>> keep same value for |D| and DF|t, D| while use new TF|t, D| ?
>> 2) If not then fitting and transforming seems redundant for IDF model
>>
>
>
>
>
> [image: What's New with Xactly] <http://www.xactlycorp.com/email-click/>
>
> <https://www.nyse.com/quote/XNYS:XTLY>  [image: LinkedIn]
> <https://www.linkedin.com/company/xactly-corporation>  [image: Twitter]
> <https://twitter.com/Xactly>  [image: Facebook]
> <https://www.facebook.com/XactlyCorp>  [image: YouTube]
> <http://www.youtube.com/xactlycorporation>
>
>
>

-- 


[image: What's New with Xactly] <http://www.xactlycorp.com/email-click/>

<https://www.nyse.com/quote/XNYS:XTLY>  [image: LinkedIn] 
<https://www.linkedin.com/company/xactly-corporation>  [image: Twitter] 
<https://twitter.com/Xactly>  [image: Facebook] 
<https://www.facebook.com/XactlyCorp>  [image: YouTube] 
<http://www.youtube.com/xactlycorporation>


Spark ML - Is IDF model reusable

2016-11-01 Thread Nirav Patel
FYI, I do reuse IDF model while making prediction against new unlabeled
data but not between training and test data while training a model.

On Tue, Nov 1, 2016 at 3:10 AM, Nirav Patel <npa...@xactlycorp.com> wrote:

> I am using IDF estimator/model (TF-IDF) to convert text features into
> vectors. Currently, I fit IDF model on all sample data and then transform
> them. I read somewhere that I should split my data into training and test
> before fitting IDF model; Fit IDF only on training data and then use same
> transformer to transform training and test data.
> This raise more questions:
> 1) Why would you do that? What exactly do IDF learn during fitting process
> that it can reuse to transform any new dataset. Perhaps idea is to keep
> same value for |D| and DF|t, D| while use new TF|t, D| ?
> 2) If not then fitting and transforming seems redundant for IDF model
>

-- 


[image: What's New with Xactly] <http://www.xactlycorp.com/email-click/>

<https://www.nyse.com/quote/XNYS:XTLY>  [image: LinkedIn] 
<https://www.linkedin.com/company/xactly-corporation>  [image: Twitter] 
<https://twitter.com/Xactly>  [image: Facebook] 
<https://www.facebook.com/XactlyCorp>  [image: YouTube] 
<http://www.youtube.com/xactlycorporation>


Is IDF model reusable

2016-11-01 Thread Nirav Patel
I am using IDF estimator/model (TF-IDF) to convert text features into
vectors. Currently, I fit IDF model on all sample data and then transform
them. I read somewhere that I should split my data into training and test
before fitting IDF model; Fit IDF only on training data and then use same
transformer to transform training and test data.
This raise more questions:
1) Why would you do that? What exactly do IDF learn during fitting process
that it can reuse to transform any new dataset. Perhaps idea is to keep
same value for |D| and DF|t, D| while use new TF|t, D| ?
2) If not then fitting and transforming seems redundant for IDF model

-- 


[image: What's New with Xactly] 

  [image: LinkedIn] 
  [image: Twitter] 
  [image: Facebook] 
  [image: YouTube] 



MulticlassClassificationEvaluator how weighted precision and weighted recall calculated

2016-10-03 Thread Nirav Patel
For example 3 class would it be?

weightedPrecision = ( TP1 * w1 + TP2 * w2 + TP3 * w3) /  ( TP1 * w1 + TP2 *
w2 + TP3 * w3) + ( FP1 * w1 + FP2 * w2 + FP3 * w3)

where TP1..2 are TP for each class.
w1, w2.. are wight for each class based on their distribution in sample
data?

and similar for recall? How about accuracy?




Thanks!

-- 


[image: What's New with Xactly] 

  [image: LinkedIn] 
  [image: Twitter] 
  [image: Facebook] 
  [image: YouTube] 



Re: Tutorial error - zeppelin 0.6.2 built with spark 2.0 and mapr

2016-09-26 Thread Nirav Patel
FYI, it works when I use MapR configured Spark 2.0. ie

export SPARK_HOME=/opt/mapr/spark/spark-2.0.0-bin-without-hadoop


Thanks

Nirav

On Mon, Sep 26, 2016 at 3:45 PM, Nirav Patel <npa...@xactlycorp.com> wrote:

> Hi,
>
> I built zeppeling 0.6 branch using spark 2.0 using following mvn :
>
> mvn clean package -Pbuild-distr -Pmapr41 -Pyarn -Pspark-2.0 -Pscala-2.11
> -DskipTests
>
> Built went successful.
> I only have following set in zeppelin-conf.sh
>
> export HADOOP_HOME=/opt/mapr/hadoop/hadoop-2.5.1/
>
> export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
>
>
> i.e. I am using embedded spark 2.0 created during build process.
>
> After starting zeppelin for first time I tried to run 'Zeppelin Tutorial'
> notebook and got following exception in 'Load data into table' paragraph
>
>
> import org.apache.commons.io.IOUtils
> import java.net.URL
> import java.nio.charset.Charset
> bankText: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at
> parallelize at :30
> defined class Bank
> java.lang.UnsatisfiedLinkError: Native Library /tmp/mapr-xactly-
> libMapRClient.0.6.2-SNAPSHOT.so already loaded in another classloader
> at java.lang.ClassLoader.loadLibrary0(ClassLoader.java:1907)
> at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1824)
> at java.lang.Runtime.load0(Runtime.java:809)
> at java.lang.System.load(System.java:1086)
> at com.mapr.fs.shim.LibraryLoader.load(LibraryLoader.java:29)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at com.mapr.fs.ShimLoader.loadNativeLibrary(ShimLoader.java:335)
> at com.mapr.fs.ShimLoader.load(ShimLoader.java:210)
> at com.mapr.fs.MapRFileSystem.(MapRFileSystem.java:80)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:348)
> at org.apache.hadoop.conf.Configuration.getClassByNameOrNull(
> Configuration.java:1857)
> at org.apache.hadoop.conf.Configuration.getClassByName(
> Configuration.java:1822)
> at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:1916)
> at org.apache.hadoop.fs.FileSystem.getFileSystemClass(
> FileSystem.java:2609)
> at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2622)
> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
> at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2661)
> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2643)
> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:404)
> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:169)
> at org.apache.hadoop.hive.ql.session.SessionState.start(
> SessionState.java:505)
> at org.apache.spark.sql.hive.client.HiveClientImpl.(
> HiveClientImpl.scala:171)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance(
> NativeConstructorAccessorImpl.java:62)
> at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(
> DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at org.apache.spark.sql.hive.client.IsolatedClientLoader.createClient(
> IsolatedClientLoader.scala:258)
> at org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(
> HiveUtils.scala:359)
> at org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(
> HiveUtils.scala:263)
> at org.apache.spark.sql.hive.HiveSharedState.metadataHive$
> lzycompute(HiveSharedState.scala:39)
> at org.apache.spark.sql.hive.HiveSharedState.metadataHive(
> HiveSharedState.scala:38)
> at org.apache.spark.sql.hive.HiveSharedState.externalCatalog$lzycompute(
> HiveSharedState.scala:46)
> at org.apache.spark.sql.hive.HiveSharedState.externalCatalog(
> HiveSharedState.scala:45)
> at org.apache.spark.sql.hive.HiveSessionState.catalog$
> lzycompute(HiveSessionState.scala:50)
> at org.apache.spark.sql.hive.HiveSessionState.catalog(
> HiveSessionState.scala:48)
> at org.apache.spark.sql.hive.HiveSessionState$$anon$1.<
> init>(HiveSessionState.scala:63)
> at org.apache.spark.sql.hive.HiveSessionState.analyzer$
> lzycompute(HiveSessionState.scala:63)
> at org.apache.spark.sql.hive.HiveSessionState.analyzer(
> HiveSessionState.scala:62)
> at org.apache.spark.sql.execution.QueryExecution.
> assertAnalyzed(QueryExecution.scala:49)
> at org.apache.spark.sql.Dataset.(Dataset.scala:161)
> at org.apache.spark.sql.Dataset.(Dataset.scala:167)
> at org.apache.spark.sql.Dataset$.apply(Dataset.scala:59)
> at org.apache.spark.sql.SparkSession.create

Tutorial error - zeppelin 0.6.2 built with spark 2.0 and mapr

2016-09-26 Thread Nirav Patel
Hi,

I built zeppeling 0.6 branch using spark 2.0 using following mvn :

mvn clean package -Pbuild-distr -Pmapr41 -Pyarn -Pspark-2.0 -Pscala-2.11
-DskipTests

Built went successful.
I only have following set in zeppelin-conf.sh

export HADOOP_HOME=/opt/mapr/hadoop/hadoop-2.5.1/

export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop


i.e. I am using embedded spark 2.0 created during build process.

After starting zeppelin for first time I tried to run 'Zeppelin Tutorial'
notebook and got following exception in 'Load data into table' paragraph


import org.apache.commons.io.IOUtils
import java.net.URL
import java.nio.charset.Charset
bankText: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at
parallelize at :30
defined class Bank
java.lang.UnsatisfiedLinkError: Native Library /tmp/
mapr-xactly-libMapRClient.0.6.2-SNAPSHOT.so already loaded in another
classloader
at java.lang.ClassLoader.loadLibrary0(ClassLoader.java:1907)
at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1824)
at java.lang.Runtime.load0(Runtime.java:809)
at java.lang.System.load(System.java:1086)
at com.mapr.fs.shim.LibraryLoader.load(LibraryLoader.java:29)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.mapr.fs.ShimLoader.loadNativeLibrary(ShimLoader.java:335)
at com.mapr.fs.ShimLoader.load(ShimLoader.java:210)
at com.mapr.fs.MapRFileSystem.(MapRFileSystem.java:80)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at
org.apache.hadoop.conf.Configuration.getClassByNameOrNull(Configuration.java:1857)
at
org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:1822)
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:1916)
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2609)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2622)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2661)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2643)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:404)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:169)
at
org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:505)
at
org.apache.spark.sql.hive.client.HiveClientImpl.(HiveClientImpl.scala:171)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at
org.apache.spark.sql.hive.client.IsolatedClientLoader.createClient(IsolatedClientLoader.scala:258)
at
org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:359)
at
org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:263)
at
org.apache.spark.sql.hive.HiveSharedState.metadataHive$lzycompute(HiveSharedState.scala:39)
at
org.apache.spark.sql.hive.HiveSharedState.metadataHive(HiveSharedState.scala:38)
at
org.apache.spark.sql.hive.HiveSharedState.externalCatalog$lzycompute(HiveSharedState.scala:46)
at
org.apache.spark.sql.hive.HiveSharedState.externalCatalog(HiveSharedState.scala:45)
at
org.apache.spark.sql.hive.HiveSessionState.catalog$lzycompute(HiveSessionState.scala:50)
at
org.apache.spark.sql.hive.HiveSessionState.catalog(HiveSessionState.scala:48)
at
org.apache.spark.sql.hive.HiveSessionState$$anon$1.(HiveSessionState.scala:63)
at
org.apache.spark.sql.hive.HiveSessionState.analyzer$lzycompute(HiveSessionState.scala:63)
at
org.apache.spark.sql.hive.HiveSessionState.analyzer(HiveSessionState.scala:62)
at
org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:49)
at org.apache.spark.sql.Dataset.(Dataset.scala:161)
at org.apache.spark.sql.Dataset.(Dataset.scala:167)
at org.apache.spark.sql.Dataset$.apply(Dataset.scala:59)
at org.apache.spark.sql.SparkSession.createDataset(SparkSession.scala:441)
at org.apache.spark.sql.SQLContext.createDataset(SQLContext.scala:395)
at
org.apache.spark.sql.SQLImplicits.rddToDatasetHolder(SQLImplicits.scala:163)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:36)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:48)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:50)
at $iwC$$iwC$$iwC$$iwC$$iwC.(:52)
at $iwC$$iwC$$iwC$$iwC.(:54)
at $iwC$$iwC$$iwC.(:56)
at $iwC$$iwC.(:58)
at $iwC.(:60)
at (:62)
at .(:66)
at .()
at .(:7)
at .()
at $print()
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at 

How PolynomialExpansion works

2016-09-16 Thread Nirav Patel
Doc says:

Take a 2-variable feature vector as an example: (x, y), if we want to
expand it with degree 2, then we get (x, x * x, y, x * y, y * y).

I know polynomial expansion of (x+y)^2 = x^2 + 2xy + y^2 but can't relate
it to above.

Thanks

-- 


[image: What's New with Xactly] 

  [image: LinkedIn] 
  [image: Twitter] 
  [image: Facebook] 
  [image: YouTube] 



Re: Spark 1.5.2 - Different results from reduceByKey over multiple iterations

2016-06-23 Thread Nirav Patel
SO it was indeed my merge function. I created new result object for every
merge and its working now.

Thanks

On Wed, Jun 22, 2016 at 3:46 PM, Nirav Patel <npa...@xactlycorp.com> wrote:

> PS. In my reduceByKey operation I have two mutable object. What I do is
> merge mutable2 into mutable1 and return mutable1. I read that it works for
> aggregateByKey so thought it will work for reduceByKey as well. I might be
> wrong here. Can someone verify if this will work or be un predictable?
>
> On Wed, Jun 22, 2016 at 11:52 AM, Nirav Patel <npa...@xactlycorp.com>
> wrote:
>
>> Hi,
>>
>> I do not see any indication of errors or executor getting killed in spark
>> UI - jobs, stages, event timelines. No task failures. I also don't see any
>> errors in executor logs.
>>
>> Thanks
>>
>> On Wed, Jun 22, 2016 at 2:32 AM, Ted Yu <yuzhih...@gmail.com> wrote:
>>
>>> For the run which returned incorrect result, did you observe any error
>>> (on workers) ?
>>>
>>> Cheers
>>>
>>> On Tue, Jun 21, 2016 at 10:42 PM, Nirav Patel <npa...@xactlycorp.com>
>>> wrote:
>>>
>>>> I have an RDD[String, MyObj] which is a result of Join + Map operation.
>>>> It has no partitioner info. I run reduceByKey without passing any
>>>> Partitioner or partition counts.  I observed that output aggregation result
>>>> for given key is incorrect sometime. like 1 out of 5 times. It looks like
>>>> reduce operation is joining values from two different keys. There is no
>>>> configuration change between multiple runs. I am scratching my head over
>>>> this. I verified results by printing out RDD before and after reduce
>>>> operation; collecting subset at driver.
>>>>
>>>> Besides shuffle and storage memory fraction I use following options:
>>>>
>>>> sparkConf.set("spark.driver.userClassPathFirst","true")
>>>> sparkConf.set("spark.unsafe.offHeap","true")
>>>> sparkConf.set("spark.reducer.maxSizeInFlight","128m")
>>>> sparkConf.set("spark.serializer",
>>>> "org.apache.spark.serializer.KryoSerializer")
>>>>
>>>>
>>>>
>>>> [image: What's New with Xactly]
>>>> <http://www.xactlycorp.com/email-click/>
>>>>
>>>> <https://www.nyse.com/quote/XNYS:XTLY>  [image: LinkedIn]
>>>> <https://www.linkedin.com/company/xactly-corporation>  [image: Twitter]
>>>> <https://twitter.com/Xactly>  [image: Facebook]
>>>> <https://www.facebook.com/XactlyCorp>  [image: YouTube]
>>>> <http://www.youtube.com/xactlycorporation>
>>>
>>>
>>>
>>
>

-- 


[image: What's New with Xactly] <http://www.xactlycorp.com/email-click/>

<https://www.nyse.com/quote/XNYS:XTLY>  [image: LinkedIn] 
<https://www.linkedin.com/company/xactly-corporation>  [image: Twitter] 
<https://twitter.com/Xactly>  [image: Facebook] 
<https://www.facebook.com/XactlyCorp>  [image: YouTube] 
<http://www.youtube.com/xactlycorporation>


Re: OOM on the driver after increasing partitions

2016-06-22 Thread Nirav Patel
I believe it would be task, partitions, task status etc information. I do
not know exact of those things but I had OOM on driver with 512MB and
increasing it did help. Someone else might be able to answer about exact
memory usage of driver better.

You also seem to use broadcast means sending something from dirver jvm. You
can try taking memory dump when your driver memory is about full or set jvm
args to take it automatically on OutOfMemory error. Analyze it and share
your finding :)



On Wed, Jun 22, 2016 at 4:33 PM, Raghava Mutharaju <
m.vijayaragh...@gmail.com> wrote:

> Ok. Would be able to shed more light on what exact meta data it manages
> and what is the relationship with more number of partitions/nodes?
>
> There is one executor running on each node -- so there are 64 executors in
> total. Each executor, including the driver are give 12GB and this is the
> maximum available limit. So the other options are
>
> 1) Separate the driver from master, i.e., run them on two separate nodes
> 2) Increase the RAM capacity on the driver/master node.
>
> Regards,
> Raghava.
>
>
> On Wed, Jun 22, 2016 at 7:05 PM, Nirav Patel <npa...@xactlycorp.com>
> wrote:
>
>> Yes driver keeps fair amount of meta data to manage scheduling across all
>> your executors. I assume with 64 nodes you have more executors as well.
>> Simple way to test is to increase driver memory.
>>
>> On Wed, Jun 22, 2016 at 10:10 AM, Raghava Mutharaju <
>> m.vijayaragh...@gmail.com> wrote:
>>
>>> It is an iterative algorithm which uses map, mapPartitions, join, union,
>>> filter, broadcast and count. The goal is to compute a set of tuples and in
>>> each iteration few tuples are added to it. Outline is given below
>>>
>>> 1) Start with initial set of tuples, T
>>> 2) In each iteration compute deltaT, and add them to T, i.e., T = T +
>>> deltaT
>>> 3) Stop when current T size (count) is same as previous T size, i.e.,
>>> deltaT is 0.
>>>
>>> Do you think something happens on the driver due to the application
>>> logic and when the partitions are increased?
>>>
>>> Regards,
>>> Raghava.
>>>
>>>
>>> On Wed, Jun 22, 2016 at 12:33 PM, Sonal Goyal <sonalgoy...@gmail.com>
>>> wrote:
>>>
>>>> What does your application do?
>>>>
>>>> Best Regards,
>>>> Sonal
>>>> Founder, Nube Technologies <http://www.nubetech.co>
>>>> Reifier at Strata Hadoop World
>>>> <https://www.youtube.com/watch?v=eD3LkpPQIgM>
>>>> Reifier at Spark Summit 2015
>>>> <https://spark-summit.org/2015/events/real-time-fuzzy-matching-with-spark-and-elastic-search/>
>>>>
>>>> <http://in.linkedin.com/in/sonalgoyal>
>>>>
>>>>
>>>>
>>>> On Wed, Jun 22, 2016 at 9:57 PM, Raghava Mutharaju <
>>>> m.vijayaragh...@gmail.com> wrote:
>>>>
>>>>> Hello All,
>>>>>
>>>>> We have a Spark cluster where driver and master are running on the
>>>>> same node. We are using Spark Standalone cluster manager. If the number of
>>>>> nodes (and the partitions) are increased, the same dataset that used to 
>>>>> run
>>>>> to completion on lesser number of nodes is now giving an out of memory on
>>>>> the driver.
>>>>>
>>>>> For example, a dataset that runs on 32 nodes with number of partitions
>>>>> set to 256 completes whereas the same dataset when run on 64 nodes with
>>>>> number of partitions as 512 gives an OOM on the driver side.
>>>>>
>>>>> From what I read in the Spark documentation and other articles,
>>>>> following are the responsibilities of the driver/master.
>>>>>
>>>>> 1) create spark context
>>>>> 2) build DAG of operations
>>>>> 3) schedule tasks
>>>>>
>>>>> I am guessing that 1) and 2) should not change w.r.t number of
>>>>> nodes/partitions. So is it that since the driver has to keep track of lot
>>>>> more tasks, that it gives an OOM?
>>>>>
>>>>> What could be the possible reasons behind the driver-side OOM when the
>>>>> number of partitions are increased?
>>>>>
>>>>> Regards,
>>>>> Raghava.
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> Regards,
>>> Raghava
>>> http://raghavam.github.io
>>>
>>
>>
>>
>>
>> [image: What's New with Xactly] <http://www.xactlycorp.com/email-click/>
>>
>> <https://www.nyse.com/quote/XNYS:XTLY>  [image: LinkedIn]
>> <https://www.linkedin.com/company/xactly-corporation>  [image: Twitter]
>> <https://twitter.com/Xactly>  [image: Facebook]
>> <https://www.facebook.com/XactlyCorp>  [image: YouTube]
>> <http://www.youtube.com/xactlycorporation>
>
>
>
>
> --
> Regards,
> Raghava
> http://raghavam.github.io
>

-- 


[image: What's New with Xactly] <http://www.xactlycorp.com/email-click/>

<https://www.nyse.com/quote/XNYS:XTLY>  [image: LinkedIn] 
<https://www.linkedin.com/company/xactly-corporation>  [image: Twitter] 
<https://twitter.com/Xactly>  [image: Facebook] 
<https://www.facebook.com/XactlyCorp>  [image: YouTube] 
<http://www.youtube.com/xactlycorporation>


Re: OOM on the driver after increasing partitions

2016-06-22 Thread Nirav Patel
Yes driver keeps fair amount of meta data to manage scheduling across all
your executors. I assume with 64 nodes you have more executors as well.
Simple way to test is to increase driver memory.

On Wed, Jun 22, 2016 at 10:10 AM, Raghava Mutharaju <
m.vijayaragh...@gmail.com> wrote:

> It is an iterative algorithm which uses map, mapPartitions, join, union,
> filter, broadcast and count. The goal is to compute a set of tuples and in
> each iteration few tuples are added to it. Outline is given below
>
> 1) Start with initial set of tuples, T
> 2) In each iteration compute deltaT, and add them to T, i.e., T = T +
> deltaT
> 3) Stop when current T size (count) is same as previous T size, i.e.,
> deltaT is 0.
>
> Do you think something happens on the driver due to the application logic
> and when the partitions are increased?
>
> Regards,
> Raghava.
>
>
> On Wed, Jun 22, 2016 at 12:33 PM, Sonal Goyal 
> wrote:
>
>> What does your application do?
>>
>> Best Regards,
>> Sonal
>> Founder, Nube Technologies 
>> Reifier at Strata Hadoop World
>> 
>> Reifier at Spark Summit 2015
>> 
>>
>> 
>>
>>
>>
>> On Wed, Jun 22, 2016 at 9:57 PM, Raghava Mutharaju <
>> m.vijayaragh...@gmail.com> wrote:
>>
>>> Hello All,
>>>
>>> We have a Spark cluster where driver and master are running on the same
>>> node. We are using Spark Standalone cluster manager. If the number of nodes
>>> (and the partitions) are increased, the same dataset that used to run to
>>> completion on lesser number of nodes is now giving an out of memory on the
>>> driver.
>>>
>>> For example, a dataset that runs on 32 nodes with number of partitions
>>> set to 256 completes whereas the same dataset when run on 64 nodes with
>>> number of partitions as 512 gives an OOM on the driver side.
>>>
>>> From what I read in the Spark documentation and other articles,
>>> following are the responsibilities of the driver/master.
>>>
>>> 1) create spark context
>>> 2) build DAG of operations
>>> 3) schedule tasks
>>>
>>> I am guessing that 1) and 2) should not change w.r.t number of
>>> nodes/partitions. So is it that since the driver has to keep track of lot
>>> more tasks, that it gives an OOM?
>>>
>>> What could be the possible reasons behind the driver-side OOM when the
>>> number of partitions are increased?
>>>
>>> Regards,
>>> Raghava.
>>>
>>
>>
>
>
> --
> Regards,
> Raghava
> http://raghavam.github.io
>

-- 


[image: What's New with Xactly] 

  [image: LinkedIn] 
  [image: Twitter] 
  [image: Facebook] 
  [image: YouTube] 



Re: Spark 1.5.2 - Different results from reduceByKey over multiple iterations

2016-06-22 Thread Nirav Patel
PS. In my reduceByKey operation I have two mutable object. What I do is
merge mutable2 into mutable1 and return mutable1. I read that it works for
aggregateByKey so thought it will work for reduceByKey as well. I might be
wrong here. Can someone verify if this will work or be un predictable?

On Wed, Jun 22, 2016 at 11:52 AM, Nirav Patel <npa...@xactlycorp.com> wrote:

> Hi,
>
> I do not see any indication of errors or executor getting killed in spark
> UI - jobs, stages, event timelines. No task failures. I also don't see any
> errors in executor logs.
>
> Thanks
>
> On Wed, Jun 22, 2016 at 2:32 AM, Ted Yu <yuzhih...@gmail.com> wrote:
>
>> For the run which returned incorrect result, did you observe any error
>> (on workers) ?
>>
>> Cheers
>>
>> On Tue, Jun 21, 2016 at 10:42 PM, Nirav Patel <npa...@xactlycorp.com>
>> wrote:
>>
>>> I have an RDD[String, MyObj] which is a result of Join + Map operation.
>>> It has no partitioner info. I run reduceByKey without passing any
>>> Partitioner or partition counts.  I observed that output aggregation result
>>> for given key is incorrect sometime. like 1 out of 5 times. It looks like
>>> reduce operation is joining values from two different keys. There is no
>>> configuration change between multiple runs. I am scratching my head over
>>> this. I verified results by printing out RDD before and after reduce
>>> operation; collecting subset at driver.
>>>
>>> Besides shuffle and storage memory fraction I use following options:
>>>
>>> sparkConf.set("spark.driver.userClassPathFirst","true")
>>> sparkConf.set("spark.unsafe.offHeap","true")
>>> sparkConf.set("spark.reducer.maxSizeInFlight","128m")
>>> sparkConf.set("spark.serializer",
>>> "org.apache.spark.serializer.KryoSerializer")
>>>
>>>
>>>
>>> [image: What's New with Xactly] <http://www.xactlycorp.com/email-click/>
>>>
>>> <https://www.nyse.com/quote/XNYS:XTLY>  [image: LinkedIn]
>>> <https://www.linkedin.com/company/xactly-corporation>  [image: Twitter]
>>> <https://twitter.com/Xactly>  [image: Facebook]
>>> <https://www.facebook.com/XactlyCorp>  [image: YouTube]
>>> <http://www.youtube.com/xactlycorporation>
>>
>>
>>
>

-- 


[image: What's New with Xactly] <http://www.xactlycorp.com/email-click/>

<https://www.nyse.com/quote/XNYS:XTLY>  [image: LinkedIn] 
<https://www.linkedin.com/company/xactly-corporation>  [image: Twitter] 
<https://twitter.com/Xactly>  [image: Facebook] 
<https://www.facebook.com/XactlyCorp>  [image: YouTube] 
<http://www.youtube.com/xactlycorporation>


Re: Spark 1.5.2 - Different results from reduceByKey over multiple iterations

2016-06-22 Thread Nirav Patel
Hi,

I do not see any indication of errors or executor getting killed in spark
UI - jobs, stages, event timelines. No task failures. I also don't see any
errors in executor logs.

Thanks

On Wed, Jun 22, 2016 at 2:32 AM, Ted Yu <yuzhih...@gmail.com> wrote:

> For the run which returned incorrect result, did you observe any error (on
> workers) ?
>
> Cheers
>
> On Tue, Jun 21, 2016 at 10:42 PM, Nirav Patel <npa...@xactlycorp.com>
> wrote:
>
>> I have an RDD[String, MyObj] which is a result of Join + Map operation.
>> It has no partitioner info. I run reduceByKey without passing any
>> Partitioner or partition counts.  I observed that output aggregation result
>> for given key is incorrect sometime. like 1 out of 5 times. It looks like
>> reduce operation is joining values from two different keys. There is no
>> configuration change between multiple runs. I am scratching my head over
>> this. I verified results by printing out RDD before and after reduce
>> operation; collecting subset at driver.
>>
>> Besides shuffle and storage memory fraction I use following options:
>>
>> sparkConf.set("spark.driver.userClassPathFirst","true")
>> sparkConf.set("spark.unsafe.offHeap","true")
>> sparkConf.set("spark.reducer.maxSizeInFlight","128m")
>> sparkConf.set("spark.serializer",
>> "org.apache.spark.serializer.KryoSerializer")
>>
>>
>>
>> [image: What's New with Xactly] <http://www.xactlycorp.com/email-click/>
>>
>> <https://www.nyse.com/quote/XNYS:XTLY>  [image: LinkedIn]
>> <https://www.linkedin.com/company/xactly-corporation>  [image: Twitter]
>> <https://twitter.com/Xactly>  [image: Facebook]
>> <https://www.facebook.com/XactlyCorp>  [image: YouTube]
>> <http://www.youtube.com/xactlycorporation>
>
>
>

-- 


[image: What's New with Xactly] <http://www.xactlycorp.com/email-click/>

<https://www.nyse.com/quote/XNYS:XTLY>  [image: LinkedIn] 
<https://www.linkedin.com/company/xactly-corporation>  [image: Twitter] 
<https://twitter.com/Xactly>  [image: Facebook] 
<https://www.facebook.com/XactlyCorp>  [image: YouTube] 
<http://www.youtube.com/xactlycorporation>


Re: FullOuterJoin on Spark

2016-06-22 Thread Nirav Patel
Can your domain list fit in memory of one executor. if so you can use
broadcast join.

You can always narrow down to inner join and derive rest from original set
if memory is issue there. If you are just concerned about shuffle memory
then to reduce amount of shuffle you can do following:
1) partition both rdd (dataframes) with same partitioner with same count so
corresponding data will on on same node at least
2) increase shuffle.memoryfraction

you can use dataframes with spark 1.6 or greater to further reduce memory
footprint. I haven't tested that though.


On Tue, Jun 21, 2016 at 6:16 AM, Rychnovsky, Dusan <
dusan.rychnov...@firma.seznam.cz> wrote:

> Hi,
>
>
> can somebody please explain the way FullOuterJoin works on Spark? Does
> each intersection get fully loaded to memory?
>
> My problem is as follows:
>
>
> I have two large data-sets:
>
>
> * a list of web pages,
>
> * a list of domain-names with specific rules for processing pages from
> that domain.
>
>
> I am joining these web-pages with processing rules.
>
>
> For certain domains there are millions of web-pages.
>
>
> Based on the memory demands the join is having it looks like the whole
> intersection (i.e. a domain + all corresponding pages) are kept in memory
> while processing.
>
>
> What I really need in this case, though, is to hold just the domain and
> iterate over all corresponding pages, one at a time.
>
>
> What would be the best way to do this on Spark?
>
> Thank you,
>
> Dusan Rychnovsky
>
>
>

-- 


[image: What's New with Xactly] 

  [image: LinkedIn] 
  [image: Twitter] 
  [image: Facebook] 
  [image: YouTube] 



Re: spark job automatically killed without rhyme or reason

2016-06-22 Thread Nirav Patel
spark is memory hogger and suicidal if you have a job processing bigger
dataset. however databricks claims that  spark > 1.6  have optimization
related to memory footprint as well as processing. It will only be
available if you use dataframe or dataset. if you are using rdd you have to
do lot of testing and tuning.

On Mon, Jun 20, 2016 at 1:34 AM, Sean Owen  wrote:

> I'm not sure that's the conclusion. It's not trivial to tune and
> configure YARN and Spark to match your app's memory needs and profile,
> but, it's also just a matter of setting them properly. I'm not clear
> you've set the executor memory for example, in particular
> spark.yarn.executor.memoryOverhead
>
> Everything else you mention is a symptom of YARN shutting down your
> jobs because your memory settings don't match what your app does.
> They're not problems per se, based on what you have provided.
>
>
> On Mon, Jun 20, 2016 at 9:17 AM, Zhiliang Zhu
>  wrote:
> > Hi Alexander ,
> >
> > Thanks a lot for your comments.
> >
> > Spark seems not that stable when it comes to run big job, too much data
> or
> > too much time, yes, the problem is gone when reducing the scale.
> > Sometimes reset some job running parameter (such as --drive-memory may
> help
> > in GC issue) , sometimes may rewrite the codes by applying other
> algorithm.
> >
> > As you commented the shuffle operation, it sounds some as the reason ...
> >
> > Best Wishes !
> >
> >
> >
> > On Friday, June 17, 2016 8:45 PM, Alexander Kapustin 
> > wrote:
> >
> >
> > Hi Zhiliang,
> >
> > Yes, find the exact reason of failure is very difficult. We have issue
> with
> > similar behavior, due to limited time for investigation, we reduce the
> > number of processed data, and problem has gone.
> >
> > Some points which may help you in investigations:
> > · If you start spark-history-server (or monitoring running
> > application on 4040 port), look into failed stages (if any). By default
> > Spark try to retry stage execution 2 times, after that job fails
> > · Some useful information may contains in yarn logs on Hadoop
> nodes
> > (yarn--nodemanager-.log), but this is only information about
> > killed container, not about the reasons why this stage took so much
> memory
> >
> > As I can see in your logs, failed step relates to shuffle operation,
> could
> > you change your job to avoid massive shuffle operation?
> >
> > --
> > WBR, Alexander
> >
> > From: Zhiliang Zhu
> > Sent: 17 июня 2016 г. 14:10
> > To: User; kp...@hotmail.com
> > Subject: Re: spark job automatically killed without rhyme or reason
> >
> >
> > Show original message
> >
> >
> > Hi Alexander,
> >
> > is your yarn userlog   just for the executor log ?
> >
> > as for those logs seem a little difficult to exactly decide the wrong
> point,
> > due to sometimes successful job may also have some kinds of the error
> ...
> > but will repair itself.
> > spark seems not that stable currently ...
> >
> > Thank you in advance~
> >
> >
> >
> > On Friday, June 17, 2016 6:53 PM, Zhiliang Zhu 
> wrote:
> >
> >
> > Hi Alexander,
> >
> > Thanks a lot for your reply.
> >
> > Yes, submitted by yarn.
> > Do you just mean in the executor log file by way of yarn logs
> -applicationId
> > id,
> >
> > in this file, both in some containers' stdout  and stderr :
> >
> > 16/06/17 14:05:40 INFO client.TransportClientFactory: Found inactive
> > connection to ip-172-31-20-104/172.31.20.104:49991, creating a new one.
> > 16/06/17 14:05:40 ERROR shuffle.RetryingBlockFetcher: Exception while
> > beginning fetch of 1 outstanding blocks
> > java.io.IOException: Failed to connect to
> > ip-172-31-20-104/172.31.20.104:49991  <-- may it be due
> to
> > that spark is not stable, and spark may repair itself for these kinds of
> > error ? (saw some in successful run )
> >
> > at
> >
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:193)
> > at
> >
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
> > 
> > Caused by: java.net.ConnectException: Connection refused:
> > ip-172-31-20-104/172.31.20.104:49991
> > at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> > at
> > sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
> > at
> >
> io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224)
> > at
> >
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:289)
> > at
> >
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
> > at
> >
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
> > at
> >
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
> > at 

Spark 1.5.2 - Different results from reduceByKey over multiple iterations

2016-06-21 Thread Nirav Patel
I have an RDD[String, MyObj] which is a result of Join + Map operation. It
has no partitioner info. I run reduceByKey without passing any Partitioner
or partition counts.  I observed that output aggregation result for given
key is incorrect sometime. like 1 out of 5 times. It looks like reduce
operation is joining values from two different keys. There is no
configuration change between multiple runs. I am scratching my head over
this. I verified results by printing out RDD before and after reduce
operation; collecting subset at driver.

Besides shuffle and storage memory fraction I use following options:

sparkConf.set("spark.driver.userClassPathFirst","true")
sparkConf.set("spark.unsafe.offHeap","true")
sparkConf.set("spark.reducer.maxSizeInFlight","128m")
sparkConf.set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")

-- 


[image: What's New with Xactly] 

  [image: LinkedIn] 
  [image: Twitter] 
  [image: Facebook] 
  [image: YouTube] 



Spark dynamic allocation - efficiently request new resource

2016-06-07 Thread Nirav Patel
Hi,

Do current or future(2.0) spark dynamic allocation have capability to
request a container with varying resource requirements based on various
factor? Few factors I can think of is based on stage and data its
processing it can either ask for more CPUs or more Memory. i.e. new
executor can have different number of CPU cores or memory available for all
of its task.
That way spark can process data skew with heavier executor by assigning
more Memory or CPUs to new executors.

Thanks
Nirav

-- 


[image: What's New with Xactly] 

  [image: LinkedIn] 
  [image: Twitter] 
  [image: Facebook] 
  [image: YouTube] 



Re: Bulk loading Serialized RDD into Hbase throws KryoException - IndexOutOfBoundsException

2016-05-30 Thread Nirav Patel
Put is a type of Mutation so not sure what you mean by if I use mutation.

Anyway I registered all 3 classes to kryo.

kryo.register(classOf[org.apache.hadoop.hbase.client.Put])

kryo.register(classOf[ImmutableBytesWritable])

kryo.register(classOf[Mutable])


It still fails with the same exception.



On Sun, May 29, 2016 at 11:26 PM, sjk <shijinkui...@163.com> wrote:

> org.apache.hadoop.hbase.client.{Mutation, Put}
> org.apache.hadoop.hbase.io.ImmutableBytesWritable
>
> if u used mutation, register the above class too
>
> On May 30, 2016, at 08:11, Nirav Patel <npa...@xactlycorp.com> wrote:
>
> Sure let me can try that. But from looks of it it seems kryo kryo.
> util.MapReferenceResolver.getReadObject trying to access incorrect index
> (100)
>
> On Sun, May 29, 2016 at 5:06 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>
>> Can you register Put with Kryo ?
>>
>> Thanks
>>
>> On May 29, 2016, at 4:58 PM, Nirav Patel <npa...@xactlycorp.com> wrote:
>>
>> I pasted code snipped for that method.
>>
>> here's full def:
>>
>>   def writeRddToHBase2(hbaseRdd: RDD[(ImmutableBytesWritable, Put)],
>> tableName: String) {
>>
>>
>> hbaseRdd.values.foreachPartition{ itr =>
>>
>> val hConf = HBaseConfiguration.create()
>>
>> hConf.setInt("hbase.client.write.buffer", 16097152)
>>
>> val table = new HTable(hConf, tableName)
>>
>> //table.setWriteBufferSize(8388608)
>>
>> *itr.grouped(100).foreach(table.put(_)) *  // << Exception
>> happens at this point
>>
>> table.close()
>>
>> }
>>
>>   }
>>
>>
>> I am using hbase 0.98.12 mapr distribution.
>>
>>
>> Thanks
>>
>> Nirav
>>
>> On Sun, May 29, 2016 at 4:46 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>
>>> bq.  at com.mycorpt.myprojjobs.spark.jobs.hbase.HbaseUtils$$
>>> anonfun$writeRddToHBase2$1.apply(HbaseUtils.scala:80)
>>>
>>> Can you reveal related code from HbaseUtils.scala ?
>>>
>>> Which hbase version are you using ?
>>>
>>> Thanks
>>>
>>> On Sun, May 29, 2016 at 4:26 PM, Nirav Patel <npa...@xactlycorp.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I am getting following Kryo deserialization error when trying to
>>>> buklload Cached RDD into Hbase. It works if I don't cache the RDD. I cache
>>>> it with MEMORY_ONLY_SER.
>>>>
>>>> here's the code snippet:
>>>>
>>>>
>>>> hbaseRdd.values.foreachPartition{ itr =>
>>>> val hConf = HBaseConfiguration.create()
>>>> hConf.setInt("hbase.client.write.buffer", 16097152)
>>>> val table = new HTable(hConf, tableName)
>>>> itr.grouped(100).foreach(table.put(_))
>>>> table.close()
>>>> }
>>>> hbaseRdd is of type RDD[(ImmutableBytesWritable, Put)]
>>>>
>>>>
>>>> Exception I am getting. I read on Kryo JIRA that this may be issue with
>>>> incorrect use of serialization library. So could this be issue with
>>>> twitter-chill library or spark core it self ?
>>>>
>>>> Job aborted due to stage failure: Task 16 in stage 9.0 failed 10 times,
>>>> most recent failure: Lost task 16.9 in stage 9.0 (TID 28614,
>>>> hdn10.mycorptcorporation.local): com.esotericsoftware.kryo.KryoException:
>>>> java.lang.IndexOutOfBoundsException: Index: 100, Size: 6
>>>> Serialization trace:
>>>> familyMap (org.apache.hadoop.hbase.client.Put)
>>>> at
>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626)
>>>> at
>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>>>> at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:42)
>>>> at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33)
>>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>>>> at
>>>> org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:192)
>>>> at
>>>> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:181)
>>>> at org.apache.spark.util.NextIterator.hasNext(NextIterator.sc

Re: Bulk loading Serialized RDD into Hbase throws KryoException - IndexOutOfBoundsException

2016-05-29 Thread Nirav Patel
Sure let me can try that. But from looks of it it seems kryo kryo.
util.MapReferenceResolver.getReadObject trying to access incorrect index
(100)

On Sun, May 29, 2016 at 5:06 PM, Ted Yu <yuzhih...@gmail.com> wrote:

> Can you register Put with Kryo ?
>
> Thanks
>
> On May 29, 2016, at 4:58 PM, Nirav Patel <npa...@xactlycorp.com> wrote:
>
> I pasted code snipped for that method.
>
> here's full def:
>
>   def writeRddToHBase2(hbaseRdd: RDD[(ImmutableBytesWritable, Put)],
> tableName: String) {
>
>
> hbaseRdd.values.foreachPartition{ itr =>
>
> val hConf = HBaseConfiguration.create()
>
> hConf.setInt("hbase.client.write.buffer", 16097152)
>
> val table = new HTable(hConf, tableName)
>
> //table.setWriteBufferSize(8388608)
>
> *itr.grouped(100).foreach(table.put(_)) *  // << Exception
> happens at this point
>
> table.close()
>
> }
>
>   }
>
>
> I am using hbase 0.98.12 mapr distribution.
>
>
> Thanks
>
> Nirav
>
> On Sun, May 29, 2016 at 4:46 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>
>> bq.  at com.mycorpt.myprojjobs.spark.jobs.hbase.HbaseUtils$$
>> anonfun$writeRddToHBase2$1.apply(HbaseUtils.scala:80)
>>
>> Can you reveal related code from HbaseUtils.scala ?
>>
>> Which hbase version are you using ?
>>
>> Thanks
>>
>> On Sun, May 29, 2016 at 4:26 PM, Nirav Patel <npa...@xactlycorp.com>
>> wrote:
>>
>>> Hi,
>>>
>>> I am getting following Kryo deserialization error when trying to
>>> buklload Cached RDD into Hbase. It works if I don't cache the RDD. I cache
>>> it with MEMORY_ONLY_SER.
>>>
>>> here's the code snippet:
>>>
>>>
>>> hbaseRdd.values.foreachPartition{ itr =>
>>> val hConf = HBaseConfiguration.create()
>>> hConf.setInt("hbase.client.write.buffer", 16097152)
>>> val table = new HTable(hConf, tableName)
>>> itr.grouped(100).foreach(table.put(_))
>>> table.close()
>>> }
>>> hbaseRdd is of type RDD[(ImmutableBytesWritable, Put)]
>>>
>>>
>>> Exception I am getting. I read on Kryo JIRA that this may be issue with
>>> incorrect use of serialization library. So could this be issue with
>>> twitter-chill library or spark core it self ?
>>>
>>> Job aborted due to stage failure: Task 16 in stage 9.0 failed 10 times,
>>> most recent failure: Lost task 16.9 in stage 9.0 (TID 28614,
>>> hdn10.mycorptcorporation.local): com.esotericsoftware.kryo.KryoException:
>>> java.lang.IndexOutOfBoundsException: Index: 100, Size: 6
>>> Serialization trace:
>>> familyMap (org.apache.hadoop.hbase.client.Put)
>>> at
>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626)
>>> at
>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>>> at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:42)
>>> at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33)
>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>>> at
>>> org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:192)
>>> at
>>> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:181)
>>> at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
>>> at
>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>> at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:966)
>>> at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:972)
>>> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>> at
>>> com.mycorpt.myprojjobs.spark.jobs.hbase.HbaseUtils$$anonfun$writeRddToHBase2$1.apply(HbaseUtils.scala:80)
>>> at
>>> com.mycorpt.myprojjobs.spark.jobs.hbase.HbaseUtils$$anonfun$writeRddToHBase2$1.apply(HbaseUtils.scala:75)
>>> at
>>> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:902)
>>> at
>>> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:902)
>>> at
>>&

Re: Bulk loading Serialized RDD into Hbase throws KryoException - IndexOutOfBoundsException

2016-05-29 Thread Nirav Patel
I pasted code snipped for that method.

here's full def:

  def writeRddToHBase2(hbaseRdd: RDD[(ImmutableBytesWritable, Put)],
tableName: String) {


hbaseRdd.values.foreachPartition{ itr =>

val hConf = HBaseConfiguration.create()

hConf.setInt("hbase.client.write.buffer", 16097152)

val table = new HTable(hConf, tableName)

//table.setWriteBufferSize(8388608)

*itr.grouped(100).foreach(table.put(_)) *  // << Exception happens
at this point

table.close()

}

  }


I am using hbase 0.98.12 mapr distribution.


Thanks

Nirav

On Sun, May 29, 2016 at 4:46 PM, Ted Yu <yuzhih...@gmail.com> wrote:

> bq.  at com.mycorpt.myprojjobs.spark.jobs.hbase.HbaseUtils$$
> anonfun$writeRddToHBase2$1.apply(HbaseUtils.scala:80)
>
> Can you reveal related code from HbaseUtils.scala ?
>
> Which hbase version are you using ?
>
> Thanks
>
> On Sun, May 29, 2016 at 4:26 PM, Nirav Patel <npa...@xactlycorp.com>
> wrote:
>
>> Hi,
>>
>> I am getting following Kryo deserialization error when trying to buklload
>> Cached RDD into Hbase. It works if I don't cache the RDD. I cache it
>> with MEMORY_ONLY_SER.
>>
>> here's the code snippet:
>>
>>
>> hbaseRdd.values.foreachPartition{ itr =>
>> val hConf = HBaseConfiguration.create()
>> hConf.setInt("hbase.client.write.buffer", 16097152)
>> val table = new HTable(hConf, tableName)
>> itr.grouped(100).foreach(table.put(_))
>> table.close()
>> }
>> hbaseRdd is of type RDD[(ImmutableBytesWritable, Put)]
>>
>>
>> Exception I am getting. I read on Kryo JIRA that this may be issue with
>> incorrect use of serialization library. So could this be issue with
>> twitter-chill library or spark core it self ?
>>
>> Job aborted due to stage failure: Task 16 in stage 9.0 failed 10 times,
>> most recent failure: Lost task 16.9 in stage 9.0 (TID 28614,
>> hdn10.mycorptcorporation.local): com.esotericsoftware.kryo.KryoException:
>> java.lang.IndexOutOfBoundsException: Index: 100, Size: 6
>> Serialization trace:
>> familyMap (org.apache.hadoop.hbase.client.Put)
>> at
>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626)
>> at
>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>> at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:42)
>> at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33)
>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>> at
>> org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:192)
>> at
>> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:181)
>> at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
>> at
>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>> at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:966)
>> at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:972)
>> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>> at
>> com.mycorpt.myprojjobs.spark.jobs.hbase.HbaseUtils$$anonfun$writeRddToHBase2$1.apply(HbaseUtils.scala:80)
>> at
>> com.mycorpt.myprojjobs.spark.jobs.hbase.HbaseUtils$$anonfun$writeRddToHBase2$1.apply(HbaseUtils.scala:75)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:902)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:902)
>> at
>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1850)
>> at
>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1850)
>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>> at org.apache.spark.scheduler.Task.run(Task.scala:88)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> at java.lang.Thread.run(Thread.java:744)
>> Caused by: java.lang.IndexOutOfBoundsException: Index: 100, Size: 6
>> at java.util.ArrayList.rangeCheck(ArrayList.java:635)
>> at java.util.ArrayList.

Bulk loading Serialized RDD into Hbase throws KryoException - IndexOutOfBoundsException

2016-05-29 Thread Nirav Patel
Hi,

I am getting following Kryo deserialization error when trying to buklload
Cached RDD into Hbase. It works if I don't cache the RDD. I cache it
with MEMORY_ONLY_SER.

here's the code snippet:


hbaseRdd.values.foreachPartition{ itr =>
val hConf = HBaseConfiguration.create()
hConf.setInt("hbase.client.write.buffer", 16097152)
val table = new HTable(hConf, tableName)
itr.grouped(100).foreach(table.put(_))
table.close()
}
hbaseRdd is of type RDD[(ImmutableBytesWritable, Put)]


Exception I am getting. I read on Kryo JIRA that this may be issue with
incorrect use of serialization library. So could this be issue with
twitter-chill library or spark core it self ?

Job aborted due to stage failure: Task 16 in stage 9.0 failed 10 times,
most recent failure: Lost task 16.9 in stage 9.0 (TID 28614,
hdn10.mycorptcorporation.local): com.esotericsoftware.kryo.KryoException:
java.lang.IndexOutOfBoundsException: Index: 100, Size: 6
Serialization trace:
familyMap (org.apache.hadoop.hbase.client.Put)
at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:42)
at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
at
org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:192)
at
org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:181)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:966)
at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:972)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at
com.mycorpt.myprojjobs.spark.jobs.hbase.HbaseUtils$$anonfun$writeRddToHBase2$1.apply(HbaseUtils.scala:80)
at
com.mycorpt.myprojjobs.spark.jobs.hbase.HbaseUtils$$anonfun$writeRddToHBase2$1.apply(HbaseUtils.scala:75)
at
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:902)
at
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:902)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1850)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1850)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
Caused by: java.lang.IndexOutOfBoundsException: Index: 100, Size: 6
at java.util.ArrayList.rangeCheck(ArrayList.java:635)
at java.util.ArrayList.get(ArrayList.java:411)
at
com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:773)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:727)
at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:134)
at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:17)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
... 26 more

-- 


[image: What's New with Xactly] 

  [image: LinkedIn] 
  [image: Twitter] 
  [image: Facebook] 
  [image: YouTube] 



Re: Spark UI doesn't give visibility on which stage job actually failed (due to lazy eval nature)

2016-05-25 Thread Nirav Patel
I think it does because user doesn't exactly see their application logic
and flow as spark internal does. Off course we follow general guidelines
for performance but we shouldn't care really how exactly spark decide to
execute DAG. Spark scheduler or core can keep changing over time to
optimize it. So optimizing from user perspective is to look at what
transformation they are using and what they are doing inside those
transformation. If user have some transparency from framework on how those
transformation are utilizing resources over time or where they are failing
we can better optimize it . That way we are focused on our application
logic rather what framework is doing underneath.

About soln, doesn't spark driver (spark context + event listner) have
knowledge of every job, taskset, task and their current state? Spark UI can
relate job to stage to task then why not stage to transformation.

Again my real point is to assess this as an requirement from users,
stakeholders perspective regardless of technical challenge.

Thanks
Nirav

On Wed, May 25, 2016 at 8:04 PM, Mark Hamstra <m...@clearstorydata.com>
wrote:

> But when you talk about optimizing the DAG, it really doesn't make sense
> to also talk about transformation steps as separate entities.  The
> DAGScheduler knows about Jobs, Stages, TaskSets and Tasks.  The
> TaskScheduler knows about TaskSets ad Tasks.  Neither of them understands
> the transformation steps that you used to define your RDD -- at least not
> as separable, distinct steps.  To give the kind of
> transformation-step-oriented information that you want would require parts
> of Spark that don't currently concern themselves at all with RDD
> transformation steps to start tracking them and how they map to Jobs,
> Stages, TaskSets and Tasks -- and when you start talking about Datasets and
> Spark SQL, you then needing to start talking about tracking and mapping
> concepts like Plans, Schemas and Queries.  It would introduce significant
> new complexity.
>
> On Wed, May 25, 2016 at 6:59 PM, Nirav Patel <npa...@xactlycorp.com>
> wrote:
>
>> Hi Mark,
>>
>> I might have said stage instead of step in my last statement "UI just
>> says Collect failed but in fact it could be any stage in that lazy chain of
>> evaluation."
>>
>> Anyways even you agree that this visibility of underlaying steps wont't
>> be available. which does pose difficulties in terms of troubleshooting as
>> well as optimizations at step level. I think users will have hard time
>> without this. Its great that spark community working on different levels of
>> internal optimizations but its also important to give enough visibility
>> to users to enable them to debug issues and resolve bottleneck.
>> There is also no visibility into how spark utilizes shuffle memory space
>> vs user memory space vs cache space. It's a separate topic though. If
>> everything is working magically as a black box then it's fine but when you
>> have large number of people on this site complaining about  OOM and shuffle
>> error all the time you need to start providing some transparency to
>> address that.
>>
>> Thanks
>>
>>
>> On Wed, May 25, 2016 at 6:41 PM, Mark Hamstra <m...@clearstorydata.com>
>> wrote:
>>
>>> You appear to be misunderstanding the nature of a Stage.  Individual
>>> transformation steps such as `map` do not define the boundaries of Stages.
>>> Rather, a sequence of transformations in which there is only a
>>> NarrowDependency between each of the transformations will be pipelined into
>>> a single Stage.  It is only when there is a ShuffleDependency that a new
>>> Stage will be defined -- i.e. shuffle boundaries define Stage boundaries.
>>> With whole stage code gen in Spark 2.0, there will be even less opportunity
>>> to treat individual transformations within a sequence of narrow
>>> dependencies as though they were discrete, separable entities.  The Failed
>>> Stages portion of the Web UI will tell you which Stage in a Job failed, and
>>> the accompanying error log message will generally also give you some idea
>>> of which Tasks failed and why.  Tracing the error back further and at a
>>> different level of abstraction to lay blame on a particular transformation
>>> wouldn't be particularly easy.
>>>
>>> On Wed, May 25, 2016 at 5:28 PM, Nirav Patel <npa...@xactlycorp.com>
>>> wrote:
>>>
>>>> It's great that spark scheduler does optimized DAG processing and only
>>>> does lazy eval when some action is performed or shuffle dependency is
>>>> encountered. Sometime it goes further after shuffle dep bef

Re: Spark UI doesn't give visibility on which stage job actually failed (due to lazy eval nature)

2016-05-25 Thread Nirav Patel
Hi Mark,

I might have said stage instead of step in my last statement "UI just says
Collect failed but in fact it could be any stage in that lazy chain of
evaluation."

Anyways even you agree that this visibility of underlaying steps wont't be
available. which does pose difficulties in terms of troubleshooting as well
as optimizations at step level. I think users will have hard time without
this. Its great that spark community working on different levels of
internal optimizations but its also important to give enough visibility to
users to enable them to debug issues and resolve bottleneck.
There is also no visibility into how spark utilizes shuffle memory space vs
user memory space vs cache space. It's a separate topic though. If
everything is working magically as a black box then it's fine but when you
have large number of people on this site complaining about  OOM and shuffle
error all the time you need to start providing some transparency to address
that.

Thanks


On Wed, May 25, 2016 at 6:41 PM, Mark Hamstra <m...@clearstorydata.com>
wrote:

> You appear to be misunderstanding the nature of a Stage.  Individual
> transformation steps such as `map` do not define the boundaries of Stages.
> Rather, a sequence of transformations in which there is only a
> NarrowDependency between each of the transformations will be pipelined into
> a single Stage.  It is only when there is a ShuffleDependency that a new
> Stage will be defined -- i.e. shuffle boundaries define Stage boundaries.
> With whole stage code gen in Spark 2.0, there will be even less opportunity
> to treat individual transformations within a sequence of narrow
> dependencies as though they were discrete, separable entities.  The Failed
> Stages portion of the Web UI will tell you which Stage in a Job failed, and
> the accompanying error log message will generally also give you some idea
> of which Tasks failed and why.  Tracing the error back further and at a
> different level of abstraction to lay blame on a particular transformation
> wouldn't be particularly easy.
>
> On Wed, May 25, 2016 at 5:28 PM, Nirav Patel <npa...@xactlycorp.com>
> wrote:
>
>> It's great that spark scheduler does optimized DAG processing and only
>> does lazy eval when some action is performed or shuffle dependency is
>> encountered. Sometime it goes further after shuffle dep before executing
>> anything. e.g. if there are map steps after shuffle then it doesn't stop at
>> shuffle to execute anything but goes to that next map steps until it finds
>> a reason(spark action) to execute. As a result stage that spark is running
>> can be internally series of (map -> shuffle -> map -> map -> collect) and
>> spark UI just shows its currently running 'collect' stage. SO  if job fails
>> at that point spark UI just says Collect failed but in fact it could be any
>> stage in that lazy chain of evaluation. Looking at executor logs gives some
>> insights but that's not always straightforward.
>> Correct me if I am wrong here but I think we need more visibility into
>> what's happening underneath so we can easily troubleshoot as well as
>> optimize our DAG.
>>
>> THanks
>>
>>
>>
>> [image: What's New with Xactly] <http://www.xactlycorp.com/email-click/>
>>
>> <https://www.nyse.com/quote/XNYS:XTLY>  [image: LinkedIn]
>> <https://www.linkedin.com/company/xactly-corporation>  [image: Twitter]
>> <https://twitter.com/Xactly>  [image: Facebook]
>> <https://www.facebook.com/XactlyCorp>  [image: YouTube]
>> <http://www.youtube.com/xactlycorporation>
>
>
>

-- 


[image: What's New with Xactly] <http://www.xactlycorp.com/email-click/>

<https://www.nyse.com/quote/XNYS:XTLY>  [image: LinkedIn] 
<https://www.linkedin.com/company/xactly-corporation>  [image: Twitter] 
<https://twitter.com/Xactly>  [image: Facebook] 
<https://www.facebook.com/XactlyCorp>  [image: YouTube] 
<http://www.youtube.com/xactlycorporation>


Spark UI doesn't give visibility on which stage job actually failed (due to lazy eval nature)

2016-05-25 Thread Nirav Patel
It's great that spark scheduler does optimized DAG processing and only does
lazy eval when some action is performed or shuffle dependency is
encountered. Sometime it goes further after shuffle dep before executing
anything. e.g. if there are map steps after shuffle then it doesn't stop at
shuffle to execute anything but goes to that next map steps until it finds
a reason(spark action) to execute. As a result stage that spark is running
can be internally series of (map -> shuffle -> map -> map -> collect) and
spark UI just shows its currently running 'collect' stage. SO  if job fails
at that point spark UI just says Collect failed but in fact it could be any
stage in that lazy chain of evaluation. Looking at executor logs gives some
insights but that's not always straightforward.
Correct me if I am wrong here but I think we need more visibility into
what's happening underneath so we can easily troubleshoot as well as
optimize our DAG.

THanks

-- 


[image: What's New with Xactly] 

  [image: LinkedIn] 
  [image: Twitter] 
  [image: Facebook] 
  [image: YouTube] 



Spark UI metrics - Task execution time and number of records processed

2016-05-18 Thread Nirav Patel
Hi,

I have been noticing that for shuffled tasks(groupBy, Join) reducer tasks
are not evenly loaded. Most of them (90%) finished super fast but there are
some outliers that takes much longer as you can see from "Max" value in
following metric. Metric is from Join operation done on two RDDs. I tried
repartitioning both rdd with HashPartitioner before join. It's certainly
faster then before where I was not doing repartitioning. But it still slows
and looks like its not allocating equal number of records to each
partitions. Could this be just result of data skew? Or something else can
be done here?

Summary Metrics for 4000 Completed Tasks
MetricMin25th percentileMedian75th percentileMax
Duration 89 ms 3 s 7 s 14 s 5.9 min

-- 


[image: What's New with Xactly] 

  [image: LinkedIn] 
  [image: Twitter] 
  [image: Facebook] 
  [image: YouTube] 



API to study key cardinality and distribution and other important statistics about data at certain stage

2016-05-13 Thread Nirav Patel
Hi,

Problem is every time job fails or perform poorly at certain stages you
need to study your data distribution just before THAT stage. Overall look
at input data set doesn't help very much if you have so many transformation
going on in DAG. I alway end up writing complicated typed code to run
analysis vs actual job to identify this. Shouldn't there be spark api to
examine this in better way. After all it does go through all the records
(in most cases) to perform transformation or action so as a side job it can
gather statistics as well when instructed.

Thanks

-- 


[image: What's New with Xactly] 

  [image: LinkedIn] 
  [image: Twitter] 
  [image: Facebook] 
  [image: YouTube] 



How to take executor memory dump

2016-05-11 Thread Nirav Patel
Hi,

I am hitting OutOfMemoryError issues with spark executors. It happens
mainly during shuffle. Executors gets killed with OutOfMemoryError. I have
try setting up spark.executor.extraJavaOptions to take memory dump but its
not happening.

spark.executor.extraJavaOptions = "-XX:+UseCompressedOops
-XX:-HeapDumpOnOutOfMemoryError -*XX:OnOutOfMemoryError='kill -9 %p; jmap
-heap %p > **/home/mycorp/npatel/jmap_%p*' -
*XX:HeapDumpPath=/opt/cores/spark* -XX:+UseG1GC -verbose:gc
-XX:+PrintGCDetails
-Xloggc:/home/mycorp/npatel/insights-jobs/gclogs/gc_%p.log
-XX:+PrintGCTimeStamps"

Following is what I see repeatedly in yarn application logs after job fails.

# java.lang.OutOfMemoryError: Java heap space
# -XX:OnOutOfMemoryError=*"kill %p*
*kill -9 %p; jmap -heap %p"*
#   Executing /bin/sh -c "kill 30434
kill -9 30434"...

>From above logs it looks like spark executor by default have
'-XX:OnOutOfMemoryError=kill %p' and then it incorrectly append my custom
arguments.


Following is linux process info for one particular executor which confirms
above.

mycorp   29113 29109 99 08:56 ?04:13:46
/usr/java/jdk1.7.0_51/bin/java -Dorg.jboss.netty.epollBugWorkaround=true
-server *-XX:OnOutOfMemoryError=kill %p *-Xms23000m -Xmx23000m
-XX:+UseCompressedOops -XX:NewRatio=2 -XX:ConcGCThreads=2
-XX:ParallelGCThreads=2 -XX:-HeapDumpOnOutOfMemoryError
*-XX:OnOutOfMemoryError=kill
-9 %p;**jmap -heap %p > **/home/mycorp/npatel/jmap_%p*
-XX:HeapDumpPath=/opt/cores/spark
-XX:+UseG1GC -verbose:gc -XX:+PrintGCDetails
-Xloggc:/home/mycorp/npatel/gclogs/gc%p.log -XX:+PrintGCTimeStamps
-Djava.io.tmpdir=/tmp/hadoop-mycorp/nm-local-dir/usercache/mycorp/appcache/application_1461196034441_24756/container_1461196034441_24756_01_12/tmp
-Dspark.driver.port=43095 -Dspark.akka.threads=32
-Dspark.yarn.app.container.log.dir=/opt/mapr/hadoop/hadoop-2.5.1/logs/userlogs/application_1461196034441_24756/container_1461196034441_24756_01_12
org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url
akka.tcp://sparkDriver@10.250.70.116:43095/user/CoarseGrainedScheduler
--executor-id 11 --hostname hdn1.mycorpcorporation.local --cores 6 --app-id
application_1461196034441_24756 --user-class-path
file:/tmp/hadoop-mycorp/nm-local-dir/usercache/mycorp/appcache/application_1461196034441_24756/container_1461196034441_24756_01_12/__app__.jar


Also tried taking dump of running executor using jmap -dump. but it fails
with exception in middle of it. It still generate some dump if I used -F
option. However that file seem corrupted and not getting load into eclipse
MAT or VisualVM.


So what is the correct way to set this executor opts and ultimately take
executor memory dump?

More specifically:

1) To take heap dump on particular location with application id and process
id in file name
2) Put GC logs in particular location with application id and process id in
file name. currently it does but with literal %p in a file name

Thanks

-- 


[image: What's New with Xactly] 

  [image: LinkedIn] 
  [image: Twitter] 
  [image: Facebook] 
  [image: YouTube] 



Re: How to verify if spark is using kryo serializer for shuffle

2016-05-08 Thread Nirav Patel
Yes my mistake. I am using Spark 1.5.2 not 2.x.

I looked at running spark driver jvm process on linux. Looks like my
settings are not being applied to driver. We use oozie spark action to
launch spark. I will have to investigate more on that.

hopefully spark is or have replaced memory killer Java serializer to better
streaming serializer.

Thanks

On Sun, May 8, 2016 at 9:33 AM, Ted Yu <yuzhih...@gmail.com> wrote:

> See the following:
> [SPARK-7997][CORE] Remove Akka from Spark Core and Streaming
>
> I guess you meant you are using Spark 1.5.1
>
> For the time being, consider increasing spark.driver.memory
>
> Cheers
>
> On Sun, May 8, 2016 at 9:14 AM, Nirav Patel <npa...@xactlycorp.com> wrote:
>
>> Yes, I am using yarn client mode hence I specified am settings too.
>> What you mean akka is moved out of picture? I am using spark 2.5.1
>>
>> Sent from my iPhone
>>
>> On May 8, 2016, at 6:39 AM, Ted Yu <yuzhih...@gmail.com> wrote:
>>
>> Are you using YARN client mode ?
>>
>> See
>> https://spark.apache.org/docs/latest/running-on-yarn.html
>>
>> In cluster mode, spark.yarn.am.memory is not effective.
>>
>> For Spark 2.0, akka is moved out of the picture.
>> FYI
>>
>> On Sat, May 7, 2016 at 8:24 PM, Nirav Patel <npa...@xactlycorp.com>
>> wrote:
>>
>>> I have 20 executors, 6 cores each. Total 5 stages. It fails on 5th one.
>>> All of them have 6474 tasks. 5th task is a count operations and it also
>>> performs aggregateByKey as a part of it lazy evaluation.
>>> I am setting:
>>> spark.driver.memory=10G, spark.yarn.am.memory=2G and
>>> spark.driver.maxResultSize=9G
>>>
>>>
>>> On a side note, could it be something to do with java serialization
>>> library, ByteArrayOutputStream using byte array? Can it be replaced by
>>> some better serializing library?
>>>
>>> https://bugs.openjdk.java.net/browse/JDK-8055949
>>> https://bugs.openjdk.java.net/browse/JDK-8136527
>>>
>>> Thanks
>>>
>>> On Sat, May 7, 2016 at 4:51 PM, Ashish Dubey <ashish@gmail.com>
>>> wrote:
>>>
>>>> Driver maintains the complete metadata of application ( scheduling of
>>>> executor and maintaining the messaging to control the execution )
>>>> This code seems to be failing in that code path only. With that said
>>>> there is Jvm overhead based on num of executors , stages and tasks in your
>>>> app. Do you know your driver heap size and application structure ( num of
>>>> stages and tasks )
>>>>
>>>> Ashish
>>>>
>>>> On Saturday, May 7, 2016, Nirav Patel <npa...@xactlycorp.com> wrote:
>>>>
>>>>> Right but this logs from spark driver and spark driver seems to use
>>>>> Akka.
>>>>>
>>>>> ERROR [sparkDriver-akka.actor.default-dispatcher-17]
>>>>> akka.actor.ActorSystemImpl: Uncaught fatal error from thread
>>>>> [sparkDriver-akka.remote.default-remote-dispatcher-5] shutting down
>>>>> ActorSystem [sparkDriver]
>>>>>
>>>>> I saw following logs before above happened.
>>>>>
>>>>> 2016-05-06 09:49:17,813 INFO
>>>>> [sparkDriver-akka.actor.default-dispatcher-17]
>>>>> org.apache.spark.MapOutputTrackerMasterEndpoint: Asked to send map output
>>>>> locations for shuffle 1 to hdn6.xactlycorporation.local:44503
>>>>>
>>>>>
>>>>> As far as I know driver is just driving shuffle operation but not
>>>>> actually doing anything within its own system that will cause memory 
>>>>> issue.
>>>>> Can you explain in what circumstances I could see this error in driver
>>>>> logs? I don't do any collect or any other driver operation that would 
>>>>> cause
>>>>> this. It fails when doing aggregateByKey operation but that should happen
>>>>> in executor JVM NOT in driver JVM.
>>>>>
>>>>>
>>>>> Thanks
>>>>>
>>>>> On Sat, May 7, 2016 at 11:58 AM, Ted Yu <yuzhih...@gmail.com> wrote:
>>>>>
>>>>>> bq.   at akka.serialization.JavaSerializer.toBinary(
>>>>>> Serializer.scala:129)
>>>>>>
>>>>>> It was Akka which uses JavaSerializer
>>>>>>
>>>>>> Cheers
>>>>>>
>>>>>> On Sat, May 7

Re: How to verify if spark is using kryo serializer for shuffle

2016-05-08 Thread Nirav Patel
Yes, I am using yarn client mode hence I specified am settings too.
What you mean akka is moved out of picture? I am using spark 2.5.1 

Sent from my iPhone

> On May 8, 2016, at 6:39 AM, Ted Yu <yuzhih...@gmail.com> wrote:
> 
> Are you using YARN client mode ?
> 
> See
> https://spark.apache.org/docs/latest/running-on-yarn.html
> 
> In cluster mode, spark.yarn.am.memory is not effective.
> 
> For Spark 2.0, akka is moved out of the picture.
> FYI
> 
>> On Sat, May 7, 2016 at 8:24 PM, Nirav Patel <npa...@xactlycorp.com> wrote:
>> I have 20 executors, 6 cores each. Total 5 stages. It fails on 5th one. All 
>> of them have 6474 tasks. 5th task is a count operations and it also performs 
>> aggregateByKey as a part of it lazy evaluation. 
>> I am setting:
>> spark.driver.memory=10G, spark.yarn.am.memory=2G and 
>> spark.driver.maxResultSize=9G 
>> 
>> 
>> On a side note, could it be something to do with java serialization library, 
>> ByteArrayOutputStream using byte array? Can it be replaced by some better 
>> serializing library?
>> 
>> https://bugs.openjdk.java.net/browse/JDK-8055949
>> https://bugs.openjdk.java.net/browse/JDK-8136527
>> 
>> Thanks
>> 
>>> On Sat, May 7, 2016 at 4:51 PM, Ashish Dubey <ashish@gmail.com> wrote:
>>> Driver maintains the complete metadata of application ( scheduling of 
>>> executor and maintaining the messaging to control the execution )
>>> This code seems to be failing in that code path only. With that said there 
>>> is Jvm overhead based on num of executors , stages and tasks in your app. 
>>> Do you know your driver heap size and application structure ( num of stages 
>>> and tasks )
>>> 
>>> Ashish 
>>> 
>>>> On Saturday, May 7, 2016, Nirav Patel <npa...@xactlycorp.com> wrote:
>>>> Right but this logs from spark driver and spark driver seems to use Akka.
>>>> 
>>>> ERROR [sparkDriver-akka.actor.default-dispatcher-17] 
>>>> akka.actor.ActorSystemImpl: Uncaught fatal error from thread 
>>>> [sparkDriver-akka.remote.default-remote-dispatcher-5] shutting down 
>>>> ActorSystem [sparkDriver]
>>>> 
>>>> I saw following logs before above happened.
>>>> 
>>>> 2016-05-06 09:49:17,813 INFO 
>>>> [sparkDriver-akka.actor.default-dispatcher-17] 
>>>> org.apache.spark.MapOutputTrackerMasterEndpoint: Asked to send map output 
>>>> locations for shuffle 1 to hdn6.xactlycorporation.local:44503
>>>> 
>>>> 
>>>> 
>>>> As far as I know driver is just driving shuffle operation but not actually 
>>>> doing anything within its own system that will cause memory issue. Can you 
>>>> explain in what circumstances I could see this error in driver logs? I 
>>>> don't do any collect or any other driver operation that would cause this. 
>>>> It fails when doing aggregateByKey operation but that should happen in 
>>>> executor JVM NOT in driver JVM.
>>>> 
>>>> 
>>>> 
>>>> Thanks
>>>> 
>>>> 
>>>>> On Sat, May 7, 2016 at 11:58 AM, Ted Yu <yuzhih...@gmail.com> wrote:
>>>>> bq.   at akka.serialization.JavaSerializer.toBinary(Serializer.scala:129)
>>>>> 
>>>>> It was Akka which uses JavaSerializer
>>>>> 
>>>>> Cheers
>>>>> 
>>>>>> On Sat, May 7, 2016 at 11:13 AM, Nirav Patel <npa...@xactlycorp.com> 
>>>>>> wrote:
>>>>>> Hi,
>>>>>> 
>>>>>> I thought I was using kryo serializer for shuffle.  I could verify it 
>>>>>> from spark UI - Environment tab that 
>>>>>> spark.serializer org.apache.spark.serializer.KryoSerializer
>>>>>> spark.kryo.registrator   
>>>>>> com.myapp.spark.jobs.conf.SparkSerializerRegistrator
>>>>>> 
>>>>>> 
>>>>>> But when I see following error in Driver logs it looks like spark is 
>>>>>> using JavaSerializer 
>>>>>> 
>>>>>> 2016-05-06 09:49:26,490 ERROR 
>>>>>> [sparkDriver-akka.actor.default-dispatcher-17] 
>>>>>> akka.actor.ActorSystemImpl: Uncaught fatal error from thread 
>>>>>> [sparkDriver-akka.remote.default-remote-dispatcher-6] shutting down 
>>>>>> ActorSystem [sparkDriver]
>>>>>> 
>>>>>

Re: How to verify if spark is using kryo serializer for shuffle

2016-05-07 Thread Nirav Patel
I have 20 executors, 6 cores each. Total 5 stages. It fails on 5th one. All
of them have 6474 tasks. 5th task is a count operations and it also
performs aggregateByKey as a part of it lazy evaluation.
I am setting:
spark.driver.memory=10G, spark.yarn.am.memory=2G and
spark.driver.maxResultSize=9G


On a side note, could it be something to do with java serialization
library, ByteArrayOutputStream using byte array? Can it be replaced by some
better serializing library?

https://bugs.openjdk.java.net/browse/JDK-8055949
https://bugs.openjdk.java.net/browse/JDK-8136527

Thanks

On Sat, May 7, 2016 at 4:51 PM, Ashish Dubey <ashish@gmail.com> wrote:

> Driver maintains the complete metadata of application ( scheduling of
> executor and maintaining the messaging to control the execution )
> This code seems to be failing in that code path only. With that said there
> is Jvm overhead based on num of executors , stages and tasks in your app.
> Do you know your driver heap size and application structure ( num of stages
> and tasks )
>
> Ashish
>
> On Saturday, May 7, 2016, Nirav Patel <npa...@xactlycorp.com> wrote:
>
>> Right but this logs from spark driver and spark driver seems to use Akka.
>>
>> ERROR [sparkDriver-akka.actor.default-dispatcher-17]
>> akka.actor.ActorSystemImpl: Uncaught fatal error from thread
>> [sparkDriver-akka.remote.default-remote-dispatcher-5] shutting down
>> ActorSystem [sparkDriver]
>>
>> I saw following logs before above happened.
>>
>> 2016-05-06 09:49:17,813 INFO
>> [sparkDriver-akka.actor.default-dispatcher-17]
>> org.apache.spark.MapOutputTrackerMasterEndpoint: Asked to send map output
>> locations for shuffle 1 to hdn6.xactlycorporation.local:44503
>>
>>
>> As far as I know driver is just driving shuffle operation but not
>> actually doing anything within its own system that will cause memory issue.
>> Can you explain in what circumstances I could see this error in driver
>> logs? I don't do any collect or any other driver operation that would cause
>> this. It fails when doing aggregateByKey operation but that should happen
>> in executor JVM NOT in driver JVM.
>>
>>
>> Thanks
>>
>> On Sat, May 7, 2016 at 11:58 AM, Ted Yu <yuzhih...@gmail.com> wrote:
>>
>>> bq.   at akka.serialization.JavaSerializer.toBinary(
>>> Serializer.scala:129)
>>>
>>> It was Akka which uses JavaSerializer
>>>
>>> Cheers
>>>
>>> On Sat, May 7, 2016 at 11:13 AM, Nirav Patel <npa...@xactlycorp.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I thought I was using kryo serializer for shuffle.  I could verify it
>>>> from spark UI - Environment tab that
>>>> spark.serializer org.apache.spark.serializer.KryoSerializer
>>>> spark.kryo.registrator
>>>> com.myapp.spark.jobs.conf.SparkSerializerRegistrator
>>>>
>>>>
>>>> But when I see following error in Driver logs it looks like spark is
>>>> using JavaSerializer
>>>>
>>>> 2016-05-06 09:49:26,490 ERROR
>>>> [sparkDriver-akka.actor.default-dispatcher-17] akka.actor.ActorSystemImpl:
>>>> Uncaught fatal error from thread
>>>> [sparkDriver-akka.remote.default-remote-dispatcher-6] shutting down
>>>> ActorSystem [sparkDriver]
>>>>
>>>> java.lang.OutOfMemoryError: Java heap space
>>>>
>>>> at java.util.Arrays.copyOf(Arrays.java:2271)
>>>>
>>>> at
>>>> java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
>>>>
>>>> at
>>>> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
>>>>
>>>> at
>>>> java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
>>>>
>>>> at
>>>> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)
>>>>
>>>> at
>>>> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785)
>>>>
>>>> at
>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188)
>>>>
>>>> at
>>>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
>>>>
>>>> at
>>>> akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply$mcV$sp(Serializer.scala:129)
>>>>
>>>> at
>>>> akka.serialization.JavaSeriali

Re: How to verify if spark is using kryo serializer for shuffle

2016-05-07 Thread Nirav Patel
Right but this logs from spark driver and spark driver seems to use Akka.

ERROR [sparkDriver-akka.actor.default-dispatcher-17]
akka.actor.ActorSystemImpl: Uncaught fatal error from thread
[sparkDriver-akka.remote.default-remote-dispatcher-5] shutting down
ActorSystem [sparkDriver]

I saw following logs before above happened.

2016-05-06 09:49:17,813 INFO [sparkDriver-akka.actor.default-dispatcher-17]
org.apache.spark.MapOutputTrackerMasterEndpoint: Asked to send map output
locations for shuffle 1 to hdn6.xactlycorporation.local:44503


As far as I know driver is just driving shuffle operation but not actually
doing anything within its own system that will cause memory issue. Can you
explain in what circumstances I could see this error in driver logs? I
don't do any collect or any other driver operation that would cause this.
It fails when doing aggregateByKey operation but that should happen in
executor JVM NOT in driver JVM.


Thanks

On Sat, May 7, 2016 at 11:58 AM, Ted Yu <yuzhih...@gmail.com> wrote:

> bq.   at akka.serialization.JavaSerializer.toBinary(Serializer.scala:129)
>
> It was Akka which uses JavaSerializer
>
> Cheers
>
> On Sat, May 7, 2016 at 11:13 AM, Nirav Patel <npa...@xactlycorp.com>
> wrote:
>
>> Hi,
>>
>> I thought I was using kryo serializer for shuffle.  I could verify it
>> from spark UI - Environment tab that
>> spark.serializer org.apache.spark.serializer.KryoSerializer
>> spark.kryo.registrator
>> com.myapp.spark.jobs.conf.SparkSerializerRegistrator
>>
>>
>> But when I see following error in Driver logs it looks like spark is
>> using JavaSerializer
>>
>> 2016-05-06 09:49:26,490 ERROR
>> [sparkDriver-akka.actor.default-dispatcher-17] akka.actor.ActorSystemImpl:
>> Uncaught fatal error from thread
>> [sparkDriver-akka.remote.default-remote-dispatcher-6] shutting down
>> ActorSystem [sparkDriver]
>>
>> java.lang.OutOfMemoryError: Java heap space
>>
>> at java.util.Arrays.copyOf(Arrays.java:2271)
>>
>> at
>> java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
>>
>> at
>> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
>>
>> at
>> java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
>>
>> at
>> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)
>>
>> at
>> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785)
>>
>> at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188)
>>
>> at
>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
>>
>> at
>> akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply$mcV$sp(Serializer.scala:129)
>>
>> at
>> akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
>>
>> at
>> akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
>>
>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>>
>> at
>> akka.serialization.JavaSerializer.toBinary(Serializer.scala:129)
>>
>> at
>> akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:36)
>>
>> at
>> akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:843)
>>
>> at
>> akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:843)
>>
>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>>
>> at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:842)
>>
>> at akka.remote.EndpointWriter.writeSend(Endpoint.scala:743)
>>
>> at
>> akka.remote.EndpointWriter$$anonfun$2.applyOrElse(Endpoint.scala:718)
>>
>> at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
>>
>> at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:411)
>>
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>>
>> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>>
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>>
>> at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>>
>> at
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>>
>> at
>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>
>> at
>> 

How to verify if spark is using kryo serializer for shuffle

2016-05-07 Thread Nirav Patel
Hi,

I thought I was using kryo serializer for shuffle.  I could verify it from
spark UI - Environment tab that
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.kryo.registrator com.myapp.spark.jobs.conf.SparkSerializerRegistrator


But when I see following error in Driver logs it looks like spark is using
JavaSerializer

2016-05-06 09:49:26,490 ERROR
[sparkDriver-akka.actor.default-dispatcher-17] akka.actor.ActorSystemImpl:
Uncaught fatal error from thread
[sparkDriver-akka.remote.default-remote-dispatcher-6] shutting down
ActorSystem [sparkDriver]

java.lang.OutOfMemoryError: Java heap space

at java.util.Arrays.copyOf(Arrays.java:2271)

at
java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)

at
java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)

at
java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)

at
java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)

at
java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785)

at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188)

at
java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)

at
akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply$mcV$sp(Serializer.scala:129)

at
akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)

at
akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)

at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)

at akka.serialization.JavaSerializer.toBinary(Serializer.scala:129)

at
akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:36)

at
akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:843)

at
akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:843)

at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)

at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:842)

at akka.remote.EndpointWriter.writeSend(Endpoint.scala:743)

at
akka.remote.EndpointWriter$$anonfun$2.applyOrElse(Endpoint.scala:718)

at akka.actor.Actor$class.aroundReceive(Actor.scala:467)

at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:411)

at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)

at akka.actor.ActorCell.invoke(ActorCell.scala:487)

at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)

at akka.dispatch.Mailbox.run(Mailbox.scala:220)

at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)

at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)



What I am missing here?

Thanks

-- 


[image: What's New with Xactly] 

  [image: LinkedIn] 
  [image: Twitter] 
  [image: Facebook] 
  [image: YouTube] 



Re: Spark 1.5.2 Shuffle Blocks - running out of memory

2016-05-06 Thread Nirav Patel
Is this a limit of spark shuffle block currently?

On Tue, May 3, 2016 at 11:18 AM, Nirav Patel <npa...@xactlycorp.com> wrote:

> Hi,
>
> My spark application getting killed abruptly during a groupBy operation
> where shuffle happens. All shuffle happens with PROCESS_LOCAL locality. I
> see following in driver logs. Should not this logs be in executors? Anyhow
> looks like ByteBuffer is running out of memory. What will be workaround for
> this?
>
>
> 2016-05-02 22:38:53,595 INFO
> [sparkDriver-akka.actor.default-dispatcher-14]
> org.apache.spark.MapOutputTrackerMasterEndpoint: Asked to send map output
> locations for shuffle 3 to hdn2.mycorp:45993
> 2016-05-02 22:38:53,832 INFO
> [sparkDriver-akka.actor.default-dispatcher-14]
> org.apache.spark.storage.BlockManagerInfo: Removed broadcast_4_piece0 on
> 10.250.70.117:52328 in memory (size: 2.1 KB, free: 15.5 MB)
> 2016-05-02 22:39:03,704 WARN [New I/O worker #5]
> org.jboss.netty.channel.DefaultChannelPipeline: An exception was thrown by
> a user handler while handling an exception event ([id: 0xa8147f0c, /
> 10.250.70.110:48056 => /10.250.70.117:38300] EXCEPTION:
> java.lang.OutOfMemoryError: Java heap space)
> java.lang.OutOfMemoryError: Java heap space
> at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
> at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
> at
> org.jboss.netty.buffer.CompositeChannelBuffer.toByteBuffer(CompositeChannelBuffer.java:649)
> at
> org.jboss.netty.buffer.AbstractChannelBuffer.toByteBuffer(AbstractChannelBuffer.java:530)
> at
> org.jboss.netty.channel.socket.nio.SocketSendBufferPool.acquire(SocketSendBufferPool.java:77)
> at
> org.jboss.netty.channel.socket.nio.SocketSendBufferPool.acquire(SocketSendBufferPool.java:46)
> at
> org.jboss.netty.channel.socket.nio.AbstractNioWorker.write0(AbstractNioWorker.java:194)
> at
> org.jboss.netty.channel.socket.nio.AbstractNioWorker.writeFromTaskLoop(AbstractNioWorker.java:152)
> at
> org.jboss.netty.channel.socket.nio.AbstractNioChannel$WriteTask.run(AbstractNioChannel.java:335)
> at
> org.jboss.netty.channel.socket.nio.AbstractNioSelector.processTaskQueue(AbstractNioSelector.java:366)
> at
> org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:290)
> at
> org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:90)
> at
> org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:744)
> 2016-05-02 22:39:05,783 ERROR
> [sparkDriver-akka.actor.default-dispatcher-14]
> org.apache.spark.rpc.akka.ErrorMonitor: Uncaught fatal error from thread
> [sparkDriver-akka.remote.default-remote-dispatcher-5] shutting down
> ActorSystem [sparkDriver]
> java.lang.OutOfMemoryError: Java heap space
> at java.util.Arrays.copyOf(Arrays.java:2271)
> at
> java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
> at
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> at
> java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
> at
> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)
> at
> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785)
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188)
> at
> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
> at
> akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply$mcV$sp(Serializer.scala:129)
> at
> akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
> at
> akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
> at akka.serialization.JavaSerializer.toBinary(Serializer.scala:129)
> at
> akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:36)
> at
> akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:843)
> at
> akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:843)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
> at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:842)
> at akka.remote.EndpointWriter.writeSend(Endpoint.scala:743)
> a

Spark 1.5.2 Shuffle Blocks - running out of memory

2016-05-03 Thread Nirav Patel
Hi,

My spark application getting killed abruptly during a groupBy operation
where shuffle happens. All shuffle happens with PROCESS_LOCAL locality. I
see following in driver logs. Should not this logs be in executors? Anyhow
looks like ByteBuffer is running out of memory. What will be workaround for
this?


2016-05-02 22:38:53,595 INFO [sparkDriver-akka.actor.default-dispatcher-14]
org.apache.spark.MapOutputTrackerMasterEndpoint: Asked to send map output
locations for shuffle 3 to hdn2.mycorp:45993
2016-05-02 22:38:53,832 INFO [sparkDriver-akka.actor.default-dispatcher-14]
org.apache.spark.storage.BlockManagerInfo: Removed broadcast_4_piece0 on
10.250.70.117:52328 in memory (size: 2.1 KB, free: 15.5 MB)
2016-05-02 22:39:03,704 WARN [New I/O worker #5]
org.jboss.netty.channel.DefaultChannelPipeline: An exception was thrown by
a user handler while handling an exception event ([id: 0xa8147f0c, /
10.250.70.110:48056 => /10.250.70.117:38300] EXCEPTION:
java.lang.OutOfMemoryError: Java heap space)
java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
at
org.jboss.netty.buffer.CompositeChannelBuffer.toByteBuffer(CompositeChannelBuffer.java:649)
at
org.jboss.netty.buffer.AbstractChannelBuffer.toByteBuffer(AbstractChannelBuffer.java:530)
at
org.jboss.netty.channel.socket.nio.SocketSendBufferPool.acquire(SocketSendBufferPool.java:77)
at
org.jboss.netty.channel.socket.nio.SocketSendBufferPool.acquire(SocketSendBufferPool.java:46)
at
org.jboss.netty.channel.socket.nio.AbstractNioWorker.write0(AbstractNioWorker.java:194)
at
org.jboss.netty.channel.socket.nio.AbstractNioWorker.writeFromTaskLoop(AbstractNioWorker.java:152)
at
org.jboss.netty.channel.socket.nio.AbstractNioChannel$WriteTask.run(AbstractNioChannel.java:335)
at
org.jboss.netty.channel.socket.nio.AbstractNioSelector.processTaskQueue(AbstractNioSelector.java:366)
at
org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:290)
at
org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:90)
at
org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
2016-05-02 22:39:05,783 ERROR
[sparkDriver-akka.actor.default-dispatcher-14]
org.apache.spark.rpc.akka.ErrorMonitor: Uncaught fatal error from thread
[sparkDriver-akka.remote.default-remote-dispatcher-5] shutting down
ActorSystem [sparkDriver]
java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:2271)
at
java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
at
java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
at
java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
at
java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)
at
java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785)
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188)
at
java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
at
akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply$mcV$sp(Serializer.scala:129)
at
akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
at
akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at akka.serialization.JavaSerializer.toBinary(Serializer.scala:129)
at
akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:36)
at
akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:843)
at
akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:843)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:842)
at akka.remote.EndpointWriter.writeSend(Endpoint.scala:743)
at
akka.remote.EndpointWriter$$anonfun$2.applyOrElse(Endpoint.scala:718)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:411)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at

Re: aggregateByKey - external combine function

2016-04-29 Thread Nirav Patel
Any thoughts?

I can explain more on problem but basically shuffle data doesn't seem to
fit in reducer memory (32GB) and I am looking ways to process them on
disk+memory.

Thanks

On Thu, Apr 28, 2016 at 10:07 AM, Nirav Patel <npa...@xactlycorp.com> wrote:

> Hi,
>
> I tried to convert a groupByKey operation to aggregateByKey in a hope to
> avoid memory and high gc issue when dealing with 200GB of data.
> I needed to create a Collection of resulting key-value pairs which
> represent all combinations of given key.
>
> My merge fun definition is as follows:
>
> private def processDataMerge(map1: collection.mutable.Map[String,
> UserDataSet],
>   map2:
> collection.mutable.Map[String, UserDataSet])
> : collection.mutable.Map[String, UserDataSet] = {
>
> //psuedo code
>
> map1 + map2
> (Set[combEle1], Set[combEle2] ... ) = map1.map(...extract all elements
> here)
> comb1 = cominatorics(Set[CombELe1])
> ..
> totalcombinations = comb1 + comb2 + ..
>
> map1 + totalcombinations.map(comb => (comb -> UserDataSet))
>
> }
>
>
> Output of one merge(or seq) is basically combinations of input collection
> elements and so and so on. So finally you get all combinations for given
> key.
>
> Its performing worst using aggregateByKey then groupByKey with same
> configuration. GroupByKey used to halt at last 9 partitions out of 4000.
> This one is halting even earlier. (halting due to high GC). I kill the job
> after it halts for hours on same task.
>
> I give 25GB executor memory and 4GB overhead. My cluster can't allocate
> more than 32GB per executor.
>
> I thought of custom partitioning my keys so there's less data per key and
> hence less combination. that will help with data skew but wouldn't in the
> end it would come to same thing? Like at some point it will need to merge
> key-values spread across different salt and it will come to memory issue at
> that point!
>
> Any pointer to resolve this? perhaps an external merge ?
>
> Thanks
> Nirav
>
>
>
> Thanks
>
>
>
>
>

-- 


[image: What's New with Xactly] <http://www.xactlycorp.com/email-click/>

<https://www.nyse.com/quote/XNYS:XTLY>  [image: LinkedIn] 
<https://www.linkedin.com/company/xactly-corporation>  [image: Twitter] 
<https://twitter.com/Xactly>  [image: Facebook] 
<https://www.facebook.com/XactlyCorp>  [image: YouTube] 
<http://www.youtube.com/xactlycorporation>


aggregateByKey - external combine function

2016-04-28 Thread Nirav Patel
Hi,

I tried to convert a groupByKey operation to aggregateByKey in a hope to
avoid memory and high gc issue when dealing with 200GB of data.
I needed to create a Collection of resulting key-value pairs which
represent all combinations of given key.

My merge fun definition is as follows:

private def processDataMerge(map1: collection.mutable.Map[String,
UserDataSet],
  map2:
collection.mutable.Map[String, UserDataSet])
: collection.mutable.Map[String, UserDataSet] = {

//psuedo code

map1 + map2
(Set[combEle1], Set[combEle2] ... ) = map1.map(...extract all elements here)
comb1 = cominatorics(Set[CombELe1])
..
totalcombinations = comb1 + comb2 + ..

map1 + totalcombinations.map(comb => (comb -> UserDataSet))

}


Output of one merge(or seq) is basically combinations of input collection
elements and so and so on. So finally you get all combinations for given
key.

Its performing worst using aggregateByKey then groupByKey with same
configuration. GroupByKey used to halt at last 9 partitions out of 4000.
This one is halting even earlier. (halting due to high GC). I kill the job
after it halts for hours on same task.

I give 25GB executor memory and 4GB overhead. My cluster can't allocate
more than 32GB per executor.

I thought of custom partitioning my keys so there's less data per key and
hence less combination. that will help with data skew but wouldn't in the
end it would come to same thing? Like at some point it will need to merge
key-values spread across different salt and it will come to memory issue at
that point!

Any pointer to resolve this? perhaps an external merge ?

Thanks
Nirav



Thanks

-- 


[image: What's New with Xactly] 

  [image: LinkedIn] 
  [image: Twitter] 
  [image: Facebook] 
  [image: YouTube] 



GroubByKey Iterable[T] - Memory usage

2016-04-25 Thread Nirav Patel
Hi,

Is the Iterable from out of GroupByKey is loaded fully into memory of
reducer task or can it also be on disk?

Also, is there a way to evacuate from memory once reducer is done iterating
it and want to use memory for something else.

Thanks

-- 


[image: What's New with Xactly] 

  [image: LinkedIn] 
  [image: Twitter] 
  [image: Facebook] 
  [image: YouTube] 



SparkDriver throwing java.lang.OutOfMemoryError: Java heap space

2016-04-04 Thread Nirav Patel
Hi,

We are using spark 1.5.2 and recently hitting this issue after our dataset
grew from 140GB to 160GB. Error is thrown during shuffle fetch on reduce
side which all should happen on executors and executor should report them!
However its gets reported only on driver. SparkContext gets shutdown from
driver side after this error occur.

Here's what I see in driver logs.



2016-04-04 03:51:32,889 INFO [sparkDriver-akka.actor.default-dispatcher-17]
org.apache.spark.MapOutputTrackerMasterEndpoint: Asked to send map output
locations for shuffle 3 to hdn3.mycomp:37339
2016-04-04 03:51:32,890 INFO [sparkDriver-akka.actor.default-dispatcher-17]
org.apache.spark.MapOutputTrackerMasterEndpoint: Asked to send map output
locations for shuffle 3 to hdn3.mycomp:57666
2016-04-04 03:51:33,133 INFO [sparkDriver-akka.actor.default-dispatcher-21]
org.apache.spark.storage.BlockManagerInfo: Removed broadcast_12_piece0 on
10.250.70.117:42566 in memory (size: 1939.0 B, free: 232.5 MB)
2016-04-04 03:51:38,432 ERROR
[sparkDriver-akka.actor.default-dispatcher-14]
org.apache.spark.rpc.akka.ErrorMonitor: Uncaught fatal error from thread
[sparkDriver-akka.remote.default-remote-dispatcher-6] shutting down
ActorSystem [sparkDriver]
java.lang.OutOfMemoryError: Java heap space
at
com.google.protobuf.AbstractMessageLite.toByteArray(AbstractMessageLite.java:62)
at
akka.remote.transport.AkkaPduProtobufCodec$.constructMessage(AkkaPduCodec.scala:138)
at akka.remote.EndpointWriter.writeSend(Endpoint.scala:740)
at akka.remote.EndpointWriter$$anonfun$2.applyOrElse(Endpoint.scala:718)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:411)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2016-04-04 03:51:38,432 ERROR
[sparkDriver-akka.actor.default-dispatcher-21] akka.actor.ActorSystemImpl:
Uncaught fatal error from thread
[sparkDriver-akka.remote.default-remote-dispatcher-6] shutting down
ActorSystem [sparkDriver]
java.lang.OutOfMemoryError: Java heap space
at
com.google.protobuf.AbstractMessageLite.toByteArray(AbstractMessageLite.java:62)
at
akka.remote.transport.AkkaPduProtobufCodec$.constructMessage(AkkaPduCodec.scala:138)
at akka.remote.EndpointWriter.writeSend(Endpoint.scala:740)
at akka.remote.EndpointWriter$$anonfun$2.applyOrElse(Endpoint.scala:718)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:411)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2016-04-04 03:51:40,246 ERROR [sparkDriver-akka.actor.default-dispatcher-4]
akka.actor.ActorSystemImpl: Uncaught fatal error from thread
[sparkDriver-akka.remote.default-remote-dispatcher-6] shutting down
ActorSystem [sparkDriver]
java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:2271)
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
at
java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
at
java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)
at
java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
at
akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply$mcV$sp(Serializer.scala:129)
at
akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
at
akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at akka.serialization.JavaSerializer.toBinary(Serializer.scala:129)
at 

Re: spark 1.5.2 - value filterByRange is not a member of org.apache.spark.rdd.RDD[(myKey, myData)]

2016-04-02 Thread Nirav Patel
In second class I re-declared following and compile error went away. Your
soln worked too.

 implicit val rowKeyOrdering = rowKeyOrd

Thanks
Nirav



On Wed, Mar 30, 2016 at 7:36 PM, Ted Yu <yuzhih...@gmail.com> wrote:

> Have you tried the following construct ?
>
> new OrderedRDDFunctions[K, V, (K, V)](rdd).sortByKey()
>
> See core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
>
> On Wed, Mar 30, 2016 at 5:20 AM, Nirav Patel <npa...@xactlycorp.com>
> wrote:
>
>> Hi, I am trying to use filterByRange feature of spark OrderedRDDFunctions
>> in a hope that it will speed up filtering by scanning only required
>> partitions.
>> I have created Paired RDD with a RangePartitioner in one scala class and
>> in another class I am trying to access this RDD and do following:
>>
>> In first scala class called RDDInitializer  I do:
>>
>>  implicit val rowKeyOrdering = rowKeyOrd
>>
>> val repartitionRdd = rowdataRdd.partitionBy(new RangePartitioner(
>> minPartitions.toInt, dataRdd, true))
>>
>> dataRdd  = repartitionRdd.sortByKey()
>>
>>
>> In second scala class I do:
>>
>> import org.apache.spark.SparkContext._
>>
>> RDDInitializer.dataRdd.filterByRange(myKeyFirst, myKeyLast)
>> But I am getting following compile error:
>>
>> "value filterByRange is not a member of org.apache.spark.rdd.RDD[(myKey,
>> myData)]"
>>
>>
>> Looks like I can use all methods of OrderedRDDFunctions inside first
>> scala class where implicit rowKeyOrdering is defined but not in second
>> class.
>>
>>
>> Please help me resolve this compile error.
>>
>>
>> Thanks
>>
>> Nirav
>>
>>
>>
>>
>>
>> [image: What's New with Xactly] <http://www.xactlycorp.com/email-click/>
>>
>> <https://www.nyse.com/quote/XNYS:XTLY>  [image: LinkedIn]
>> <https://www.linkedin.com/company/xactly-corporation>  [image: Twitter]
>> <https://twitter.com/Xactly>  [image: Facebook]
>> <https://www.facebook.com/XactlyCorp>  [image: YouTube]
>> <http://www.youtube.com/xactlycorporation>
>
>
>

-- 


[image: What's New with Xactly] <http://www.xactlycorp.com/email-click/>

<https://www.nyse.com/quote/XNYS:XTLY>  [image: LinkedIn] 
<https://www.linkedin.com/company/xactly-corporation>  [image: Twitter] 
<https://twitter.com/Xactly>  [image: Facebook] 
<https://www.facebook.com/XactlyCorp>  [image: YouTube] 
<http://www.youtube.com/xactlycorporation>


Multiple lookups; consolidate result and run further aggregations

2016-04-02 Thread Nirav Patel
I will start by question: Is spark lookup function on pair rdd is a driver
action. ie result is returned to driver?

I have list of Keys on driver side and I want to perform multiple parallel
lookups on pair rdd which returns Seq[V]; consolidate results; and perform
further aggregation/transformation over cluster.

val seqVal = lookupKeys.flatMap(key => {

dataRdd.lookup(key)

  })


Here's what I think will happen internally:

lookup up for Seq[V]  return result to driver

Consolidation of each Seq[v] has to happen on driver due to flatMap function

All subsequent operation will happen on driver side unless I do
sparkContext.parallelize(seqVal)

Is this correct?

Also, what I am trying to do is efficient multiple lookup. Another option
is to broadcast lookup keys and perform join.

Please advice.

Thanks
Nirav

-- 


[image: What's New with Xactly] 

  [image: LinkedIn] 
  [image: Twitter] 
  [image: Facebook] 
  [image: YouTube] 



Re: How to efficiently Scan (not filter nor lookup) part of Paird RDD or Ordered RDD

2016-04-02 Thread Nirav Patel
@IIya Ganellin, not sure how zipWithIndex() will do less then O(n) scan.
Spark doc doesnt mention anything about it.

I found solution with spark 1.5.2 OrderedRDDFunctions. It has filterByRange
api.

Thanks

On Sun, Jan 24, 2016 at 10:27 PM, Sonal Goyal <sonalgoy...@gmail.com> wrote:

> One thing you can also look at is to save your data in a way that can be
> accessed through file patterns. Eg by hour, zone etc so that you only load
> what you need.
> On Jan 24, 2016 10:00 PM, "Ilya Ganelin" <ilgan...@gmail.com> wrote:
>
>> The solution I normally use is to zipWithIndex() and then use the filter
>> operation. Filter is an O(m) operation where m is the size of your
>> partition, not an O(N) operation.
>>
>> -Ilya Ganelin
>>
>> On Sat, Jan 23, 2016 at 5:48 AM, Nirav Patel <npa...@xactlycorp.com>
>> wrote:
>>
>>> Problem is I have RDD of about 10M rows and it keeps growing. Everytime
>>> when we want to perform query and compute on subset of data we have to use
>>> filter and then some aggregation. Here I know filter goes through each
>>> partitions and every rows of RDD which may not be efficient at all.
>>>
>>> Spark having Ordered RDD functions I dont see why it's so difficult to
>>> implement such function. Cassandra/Hbase has it for years where they can
>>> fetch data only from certain partitions based on your rowkey. Scala TreeMap
>>> has Range function to do the same.
>>>
>>> I think people have been looking for this for while. I see several post
>>> asking this.
>>>
>>>
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Does-filter-on-an-RDD-scan-every-data-item-td20170.html#a26048
>>>
>>> By the way, I assume there
>>> Thanks
>>> Nirav
>>>
>>>
>>>
>>>
>>> [image: What's New with Xactly] <http://www.xactlycorp.com/email-click/>
>>>
>>> <https://www.nyse.com/quote/XNYS:XTLY>  [image: LinkedIn]
>>> <https://www.linkedin.com/company/xactly-corporation>  [image: Twitter]
>>> <https://twitter.com/Xactly>  [image: Facebook]
>>> <https://www.facebook.com/XactlyCorp>  [image: YouTube]
>>> <http://www.youtube.com/xactlycorporation>
>>
>>
>>

-- 


[image: What's New with Xactly] <http://www.xactlycorp.com/email-click/>

<https://www.nyse.com/quote/XNYS:XTLY>  [image: LinkedIn] 
<https://www.linkedin.com/company/xactly-corporation>  [image: Twitter] 
<https://twitter.com/Xactly>  [image: Facebook] 
<https://www.facebook.com/XactlyCorp>  [image: YouTube] 
<http://www.youtube.com/xactlycorporation>


spark 1.5.2 - value filterByRange is not a member of org.apache.spark.rdd.RDD[(myKey, myData)]

2016-03-30 Thread Nirav Patel
Hi, I am trying to use filterByRange feature of spark OrderedRDDFunctions
in a hope that it will speed up filtering by scanning only required
partitions.
I have created Paired RDD with a RangePartitioner in one scala class and in
another class I am trying to access this RDD and do following:

In first scala class called RDDInitializer  I do:

 implicit val rowKeyOrdering = rowKeyOrd

val repartitionRdd = rowdataRdd.partitionBy(new RangePartitioner(
minPartitions.toInt, dataRdd, true))

dataRdd  = repartitionRdd.sortByKey()


In second scala class I do:

import org.apache.spark.SparkContext._

RDDInitializer.dataRdd.filterByRange(myKeyFirst, myKeyLast)
But I am getting following compile error:

"value filterByRange is not a member of org.apache.spark.rdd.RDD[(myKey,
myData)]"


Looks like I can use all methods of OrderedRDDFunctions inside first scala
class where implicit rowKeyOrdering is defined but not in second class.


Please help me resolve this compile error.


Thanks

Nirav

-- 


[image: What's New with Xactly] 

  [image: LinkedIn] 
  [image: Twitter] 
  [image: Facebook] 
  [image: YouTube] 



Re: Compress individual RDD

2016-03-15 Thread Nirav Patel
Thanks Sabarish, I thought of same. will try that.

Hi Ted, good question. I guess one way is to have an api like
`rdd.persist(storageLevel,
compress)` where 'compress' can be true or false.

On Tue, Mar 15, 2016 at 5:18 PM, Sabarish Sasidharan <sabarish@gmail.com
> wrote:

> It will compress only rdds with serialization enabled in the persistence
> mode. So you could skip _SER modes for your other rdds. Not perfect but
> something.
> On 15-Mar-2016 4:33 pm, "Nirav Patel" <npa...@xactlycorp.com> wrote:
>
>> Hi,
>>
>> I see that there's following spark config to compress an RDD.  My guess
>> is it will compress all RDDs of a given SparkContext, right?  If so, is
>> there a way to instruct spark context to only compress some rdd and leave
>> others uncompressed ?
>>
>> Thanks
>>
>> spark.rdd.compress false Whether to compress serialized RDD partitions
>> (e.g. forStorageLevel.MEMORY_ONLY_SER). Can save substantial space at
>> the cost of some extra CPU time.
>>
>>
>>
>> [image: What's New with Xactly] <http://www.xactlycorp.com/email-click/>
>>
>> <https://www.nyse.com/quote/XNYS:XTLY>  [image: LinkedIn]
>> <https://www.linkedin.com/company/xactly-corporation>  [image: Twitter]
>> <https://twitter.com/Xactly>  [image: Facebook]
>> <https://www.facebook.com/XactlyCorp>  [image: YouTube]
>> <http://www.youtube.com/xactlycorporation>
>
>

-- 


[image: What's New with Xactly] <http://www.xactlycorp.com/email-click/>

<https://www.nyse.com/quote/XNYS:XTLY>  [image: LinkedIn] 
<https://www.linkedin.com/company/xactly-corporation>  [image: Twitter] 
<https://twitter.com/Xactly>  [image: Facebook] 
<https://www.facebook.com/XactlyCorp>  [image: YouTube] 
<http://www.youtube.com/xactlycorporation>


Compress individual RDD

2016-03-15 Thread Nirav Patel
Hi,

I see that there's following spark config to compress an RDD.  My guess is
it will compress all RDDs of a given SparkContext, right?  If so, is there
a way to instruct spark context to only compress some rdd and leave others
uncompressed ?

Thanks

spark.rdd.compress false Whether to compress serialized RDD partitions
(e.g. forStorageLevel.MEMORY_ONLY_SER). Can save substantial space at the
cost of some extra CPU time.

-- 


[image: What's New with Xactly] 

  [image: LinkedIn] 
  [image: Twitter] 
  [image: Facebook] 
  [image: YouTube] 



Re: Job fails at saveAsHadoopDataset stage due to Lost Executor due to reason unknown so far

2016-03-03 Thread Nirav Patel
It's write once table. Mainly used for read/query intensive application. We
in fact generate comma separated string from an array and store it in
single column qualifer.  I will look into approach you suggested.

Reading of this table is via spark. Its analytic application which loads
hbase table as a spark rdd. When request comes we run spark transformations
and collect actions. Do you think of any better way to load such hbase
table as an rdd? Currently we have a RDD made of scala case object where
each field is populated from value from hbase column qaulifier. Where
comma-separated string is loaded as Array[Double]

Thanks


On Thu, Mar 3, 2016 at 9:30 PM, Ted Yu <yuzhih...@gmail.com> wrote:

> Since I am not familiar with your use case, I assume that you want to
> avoid the cost of parsing the comma-separated string when writing to
> hbase table.
>
> This schema is not friendly to query which involves only subset of the
> values. HBase doesn't have limit on the number of columns in a given column
> family. So you may consider breaking the comma-separated string into
> multiple columns.
>
> From your first paragraph, caching value of 1 indeed was high.
> You can use binary search to get to a reasonable value for caching.
>
> Thanks
>
> On Thu, Mar 3, 2016 at 7:52 AM, Nirav Patel <npa...@xactlycorp.com> wrote:
>
>> Hi Ted,
>>
>> I'd say about 70th percentile keys have 2 columns each having a string of
>> 20k comma separated values. Top few hundred row keys have about 100-700k
>> comma separated values for those keys. I know that an extra FAT table.
>>
>> Yes I can remove  "hConf.setBoolean("hbase.cluster.distributed", true)".
>>
>> ps - any suggestion on breaking down hbase column values is appreciated.
>> Design is there are two column families in table. They both share common
>> rowkey (filter criterias). However one column family column qualifiers has
>> raw values in a comma-separated string form that we cannot pre-aggregate.
>>
>> Thanks
>> Nirav
>>
>> On Thu, Mar 3, 2016 at 8:16 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>
>>> bq.  hConf.setBoolean("hbase.cluster.distributed", true)
>>>
>>> Not sure why the above is needed. If hbase-site.xml is on the classpath,
>>> it should contain the above setting already.
>>>
>>> FYI
>>>
>>> On Thu, Mar 3, 2016 at 6:08 AM, Ted Yu <yuzhih...@gmail.com> wrote:
>>>
>>>> From the log snippet you posted, it was not clear why connection got
>>>> lost. You can lower the value for caching and see if GC activity gets
>>>> lower.
>>>>
>>>> How wide are the rows in hbase table ?
>>>>
>>>> Thanks
>>>>
>>>> On Mar 3, 2016, at 1:01 AM, Nirav Patel <npa...@xactlycorp.com> wrote:
>>>>
>>>> so why does 'saveAsHadoopDataset' incurs so much memory pressure?
>>>> Should I try to reduce hbase caching value ?
>>>>
>>>> On Wed, Mar 2, 2016 at 7:51 AM, Nirav Patel <npa...@xactlycorp.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I have a spark jobs that runs on yarn and keeps failing at line where i 
>>>>> do :
>>>>>
>>>>>
>>>>> val hConf = HBaseConfiguration.create
>>>>> hConf.setInt("hbase.client.scanner.caching", 1)
>>>>> hConf.setBoolean("hbase.cluster.distributed", true)
>>>>>
>>>>> new PairRDDFunctions(hbaseRdd).saveAsHadoopDataset(jobConfig)
>>>>>
>>>>>
>>>>> Basically at this stage multiple Executors fails after high GC 
>>>>> activities. However none of the executor logs, driver logs or node 
>>>>> manager logs indicate any OutOfMemory errors or GC Overhead Exceeded 
>>>>> errors or memory limits exceeded errors. I don't see any other reason for 
>>>>> Executor failures as well.
>>>>>
>>>>>
>>>>> Driver Logs:
>>>>>
>>>>> Failing Oozie Launcher, Main class 
>>>>> [org.apache.oozie.action.hadoop.SparkMain], main() threw exception, Job 
>>>>> aborted due to stage failure: Task 388 in stage 22.0 failed 4 times, most 
>>>>> recent failure: Lost task 388.3 in stage 22.0 (TID 32141, maprnode5): 
>>>>> ExecutorLostFailure (executor 5 lost)
>>>>> Driver stacktrace:
>>>>> org.apache.spark.SparkException: Job aborted due to stage failure: T

Re: Job fails at saveAsHadoopDataset stage due to Lost Executor due to reason unknown so far

2016-03-03 Thread Nirav Patel
Hi Ted,

I'd say about 70th percentile keys have 2 columns each having a string of
20k comma separated values. Top few hundred row keys have about 100-700k
comma separated values for those keys. I know that an extra FAT table.

Yes I can remove  "hConf.setBoolean("hbase.cluster.distributed", true)".

ps - any suggestion on breaking down hbase column values is appreciated.
Design is there are two column families in table. They both share common
rowkey (filter criterias). However one column family column qualifiers has
raw values in a comma-separated string form that we cannot pre-aggregate.

Thanks
Nirav

On Thu, Mar 3, 2016 at 8:16 PM, Ted Yu <yuzhih...@gmail.com> wrote:

> bq.  hConf.setBoolean("hbase.cluster.distributed", true)
>
> Not sure why the above is needed. If hbase-site.xml is on the classpath,
> it should contain the above setting already.
>
> FYI
>
> On Thu, Mar 3, 2016 at 6:08 AM, Ted Yu <yuzhih...@gmail.com> wrote:
>
>> From the log snippet you posted, it was not clear why connection got
>> lost. You can lower the value for caching and see if GC activity gets
>> lower.
>>
>> How wide are the rows in hbase table ?
>>
>> Thanks
>>
>> On Mar 3, 2016, at 1:01 AM, Nirav Patel <npa...@xactlycorp.com> wrote:
>>
>> so why does 'saveAsHadoopDataset' incurs so much memory pressure? Should
>> I try to reduce hbase caching value ?
>>
>> On Wed, Mar 2, 2016 at 7:51 AM, Nirav Patel <npa...@xactlycorp.com>
>> wrote:
>>
>>> Hi,
>>>
>>> I have a spark jobs that runs on yarn and keeps failing at line where i do :
>>>
>>>
>>> val hConf = HBaseConfiguration.create
>>> hConf.setInt("hbase.client.scanner.caching", 1)
>>> hConf.setBoolean("hbase.cluster.distributed", true)
>>>
>>> new PairRDDFunctions(hbaseRdd).saveAsHadoopDataset(jobConfig)
>>>
>>>
>>> Basically at this stage multiple Executors fails after high GC activities. 
>>> However none of the executor logs, driver logs or node manager logs 
>>> indicate any OutOfMemory errors or GC Overhead Exceeded errors or memory 
>>> limits exceeded errors. I don't see any other reason for Executor failures 
>>> as well.
>>>
>>>
>>> Driver Logs:
>>>
>>> Failing Oozie Launcher, Main class 
>>> [org.apache.oozie.action.hadoop.SparkMain], main() threw exception, Job 
>>> aborted due to stage failure: Task 388 in stage 22.0 failed 4 times, most 
>>> recent failure: Lost task 388.3 in stage 22.0 (TID 32141, maprnode5): 
>>> ExecutorLostFailure (executor 5 lost)
>>> Driver stacktrace:
>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task 388 
>>> in stage 22.0 failed 4 times, most recent failure: Lost task 388.3 in stage 
>>> 22.0 (TID 32141, maprnode5): ExecutorLostFailure (executor 5 lost)
>>> Driver stacktrace:
>>> at 
>>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
>>> at 
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
>>> at 
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
>>> at 
>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>> at 
>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
>>> at 
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
>>> at 
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
>>> at scala.Option.foreach(Option.scala:236)
>>> at 
>>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
>>> at 
>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
>>> at 
>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
>>> at 
>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
>>> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>> at 
>>> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
>>> at org.apa

Re: Job fails at saveAsHadoopDataset stage due to Lost Executor due to reason unknown so far

2016-03-03 Thread Nirav Patel
so why does 'saveAsHadoopDataset' incurs so much memory pressure? Should I
try to reduce hbase caching value ?

On Wed, Mar 2, 2016 at 7:51 AM, Nirav Patel <npa...@xactlycorp.com> wrote:

> Hi,
>
> I have a spark jobs that runs on yarn and keeps failing at line where i do :
>
>
> val hConf = HBaseConfiguration.create
> hConf.setInt("hbase.client.scanner.caching", 1)
> hConf.setBoolean("hbase.cluster.distributed", true)
>
> new PairRDDFunctions(hbaseRdd).saveAsHadoopDataset(jobConfig)
>
>
> Basically at this stage multiple Executors fails after high GC activities. 
> However none of the executor logs, driver logs or node manager logs indicate 
> any OutOfMemory errors or GC Overhead Exceeded errors or memory limits 
> exceeded errors. I don't see any other reason for Executor failures as well.
>
>
> Driver Logs:
>
> Failing Oozie Launcher, Main class 
> [org.apache.oozie.action.hadoop.SparkMain], main() threw exception, Job 
> aborted due to stage failure: Task 388 in stage 22.0 failed 4 times, most 
> recent failure: Lost task 388.3 in stage 22.0 (TID 32141, maprnode5): 
> ExecutorLostFailure (executor 5 lost)
> Driver stacktrace:
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 388 
> in stage 22.0 failed 4 times, most recent failure: Lost task 388.3 in stage 
> 22.0 (TID 32141, maprnode5): ExecutorLostFailure (executor 5 lost)
> Driver stacktrace:
>   at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
>   at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>   at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
>   at scala.Option.foreach(Option.scala:236)
>   at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
>   at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>   at 
> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:1824)
>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:1837)
>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:1914)
>   at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1124)
>   at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1065)
>   at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1065)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
>   at org.apache.spark.rdd.RDD.withScope(RDD.scala:310)
>   at 
> org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1065)
>
>
>
> Executor logs:
>
>
> 16/02/24 11:09:47 INFO executor.Executor: Finished task 224.0 in stage 8.0 
> (TID 15318). 2099 bytes result sent to driver
> 16/02/24 11:09:47 INFO executor.CoarseGrainedExecutorBackend: Got assigned 
> task 15333
> 16/02/24 11:09:47 INFO executor.Executor: Running task 239.0 in stage 8.0 
> (TID 15333)
> 16/02/24 11:09:47 INFO storage.ShuffleBlockFetcherIterator: Getting 125 
> non-empty blocks out of 3007 blocks
> 16/02/24 11:09:47 INFO storage.ShuffleBlockFetcherIterator: Started 14 remote 
> fetches in 10 ms
> 16/02/24 11:11:47 ERROR server.TransportChannelHandler: Connection to 
> maprnode5 has been quiet for 12 ms while there are outstanding requests. 
> Assuming connection is dead; please adjust spark.network.timeout if this is 
> wrong.
> 16/02/24 11:11:47 ERROR client.TransportResponseHandler: Still have 1 

Re: Spark executor killed without apparent reason

2016-03-03 Thread Nirav Patel
There was nothing in nodemanager logs that indicated why container was
killed.

Here's the guess: Since killed executors were experiencing high GC
activities (full GC) before death they most likely failed to respond to
heart beat to driver or nodemanager and got killed due to it.

This is more relevant to issue:
16/02/24 11:11:47 ERROR server.TransportChannelHandler: Connection to
maprnode5 has been quiet for 12 ms while there are outstanding
requests. Assuming connection is dead; please adjust spark.network.timeout
if this is wrong.

Following has nothing to do with this. It was raised as I manually killed
application at some point after too many executors were getting killed.
" ERROR yarn.ApplicationMaster: RECEIVED SIGNAL 15: SIGTERM"

Thanks

On Wed, Mar 2, 2016 at 8:22 AM, Nirav Patel <npa...@xactlycorp.com> wrote:

> I think that was due to manually killing application. ExecutorLost started
>  around 04:46:21 and application was manually killed around 05:54:41
>
>
> yarn-xactly-nodemanager-hdn4.xactlycorporation.local.log:2016-03-01
> 05:54:41,500 INFO
> org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl:
> Stopping container with container Id:
> *container_1456722312951_0450_01_01*
>
> yarn-xactly-nodemanager-hdn4.xactlycorporation.local.log:2016-03-01
> 05:54:41,500 INFO org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger:
> USER=xactly IP=10.250.70.119 OPERATION=Stop Container Request
> TARGET=ContainerManageImpl RESULT=SUCCESS
> APPID=application_1456722312951_0450 CONTAINERID=
> *container_1456722312951_0450_01_01*
>
> yarn-xactly-nodemanager-hdn4.xactlycorporation.local.log:2016-03-01
> 05:54:41,500 INFO
> org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container:
> Container *container_1456722312951_0450_01_01* transitioned from
> RUNNING to KILLING
>
> yarn-xactly-nodemanager-hdn4.xactlycorporation.local.log:2016-03-01
> 05:54:41,501 INFO
> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch:
> Cleaning up container *container_1456722312951_0450_01_01*
>
> yarn-xactly-nodemanager-hdn4.xactlycorporation.local.log:2016-03-01
> 05:54:41,507 WARN
> org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor: Exit
> code from container *container_1456722312951_0450_01_01* is : 143
>
> yarn-xactly-nodemanager-hdn4.xactlycorporation.local.log:2016-03-01
> 05:54:41,520 INFO
> org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container:
> Container *container_1456722312951_0450_01_01* transitioned from
> KILLING to CONTAINER_CLEANEDUP_AFTER_KILL
>
> yarn-xactly-nodemanager-hdn4.xactlycorporation.local.log:2016-03-01
> 05:54:41,557 INFO
> org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor:
> Deleting absolute path :
> /tmp/hadoop-xactly/nm-local-dir/usercache/xactly/appcache/application_1456722312951_0450/
> *container_1456722312951_0450_01_01*
>
> yarn-xactly-nodemanager-hdn4.xactlycorporation.local.log:2016-03-01
> 05:54:41,558 INFO org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger:
> USER=xactly OPERATION=Container Finished - Killed TARGET=ContainerImpl
> RESULT=SUCCESS APPID=application_1456722312951_0450 CONTAINERID=
> *container_1456722312951_0450_01_01*
>
> yarn-xactly-nodemanager-hdn4.xactlycorporation.local.log:2016-03-01
> 05:54:41,558 INFO
> org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container:
> Container *container_1456722312951_0450_01_01* transitioned from
> CONTAINER_CLEANEDUP_AFTER_KILL to DONE
>
> yarn-xactly-nodemanager-hdn4.xactlycorporation.local.log:2016-03-01
> 05:54:41,566 INFO
> org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application:
> Removing *container_1456722312951_0450_01_01* from application
> application_1456722312951_0450
>
> yarn-xactly-nodemanager-hdn4.xactlycorporation.local.log:2016-03-01
> 05:54:41,567 INFO
> org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.AppLogAggregatorImpl:
> Considering container*container_1456722312951_0450_01_01* for
> log-aggregation
>
> yarn-xactly-nodemanager-hdn4.xactlycorporation.local.log:2016-03-01
> 05:54:41,567 INFO org.apache.spark.network.yarn.YarnShuffleService:
> Stopping container *container_1456722312951_0450_01_01*
>
> yarn-xactly-nodemanager-hdn4.xactlycorporation.local.log:2016-03-01
> 05:54:42,504 INFO
> org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl: Removed
> completed containers from NM context:
> [container_1456722312951_0450_01_27,
> container_1456722312951_0450_01_30,
> *container_1456722312951_0450_01_01*]
>
> yarn-xactly-nodemanager-hdn4.xactlycorporation.local.log

Re: Spark execuotr Memory profiling

2016-03-01 Thread Nirav Patel
Thanks Nilesh,

Thanks for sharing those docs. I have came across most of those tuning in
past and believe me I have tune the hack of out of this job. What I can't
beleive is spark needs 4x more resource then MapReduce to run the same job
(for dataset magnitude of >100GB).
I was able to run my job after giving ridiculous amount of executor memory
(20g) and overhead(4g)! MapReduce counterpart worked just with 2gb mapper
and 6gb reducer!

I am still not been able to take a heap dump of stuck spark executor so
that I can analyze memory usage. But there's definitely some limitation
with framework which doesn't makes it ideal at all for shuffle heavy jobs!



On Sat, Feb 20, 2016 at 10:29 PM, Kuchekar <kuchekar.nil...@gmail.com>
wrote:

> Hi Nirav,
>
> I recently attended the Spark Summit East 2016 and almost
> every talk about errors faced by community and/or tuning topics for Spark
> mentioned this being the main problem (Executor lost and JVM out of
> memory).
>
>  Checkout this blogs that explains how to tune spark
> <http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/>,
> cheatsheet for tuning spark <http://techsuppdiva.github.io/spark1.6.html>
> .
>
> Hope this helps, keep the community posted what resolved your issue if it
> does.
>
> Thanks.
>
> Kuchekar, Nilesh
>
> On Sat, Feb 20, 2016 at 11:29 AM, Nirav Patel <npa...@xactlycorp.com>
> wrote:
>
>> Thanks Nilesh. I don't think there;s heavy communication between driver
>> and executor. However I'll try the settings you suggested.
>>
>> I can not replace groupBy with reduceBy as its not an associative
>> operation.
>>
>> It is very frustrating to be honest. It was a piece of cake with map
>> reduce compare to amount to time I am putting in tuning spark make things
>> work. To remove doubt that executor might be running multiple tasks
>> (executor.cores) and hence reduce to share memory, I set executor.cores to
>> 1 so only 1 task have all the 15gb to its disposal!. Which is already 3
>> times it needs for most skewed key. I am going to need to profile for sure
>> to understand what spark executors are doing there. For sure they are not
>> willing to explain the situation but rather will say 'use reduceBy'
>>
>>
>>
>>
>>
>> On Thu, Feb 11, 2016 at 9:42 AM, Kuchekar <kuchekar.nil...@gmail.com>
>> wrote:
>>
>>> Hi Nirav,
>>>
>>>   I faced similar issue with Yarn, EMR 1.5.2 and
>>> following Spark Conf helped me. You can set the values accordingly
>>>
>>> conf= (SparkConf().set("spark.master","yarn-client").setAppName(
>>> "HalfWay").set("spark.driver.memory", "15G").set("spark.yarn.am.memory",
>>> "15G"))
>>>
>>> conf=conf.set("spark.driver.maxResultSize","10G").set(
>>> "spark.storage.memoryFraction","0.6").set("spark.shuffle.memoryFraction"
>>> ,"0.6").set("spark.yarn.executor.memoryOverhead","4000")
>>>
>>> conf = conf.set("spark.executor.cores","4").set("spark.executor.memory",
>>> "15G").set("spark.executor.instances","6")
>>>
>>> Is it also possible to use reduceBy in place of groupBy that might help
>>> the shuffling too.
>>>
>>>
>>> Kuchekar, Nilesh
>>>
>>> On Wed, Feb 10, 2016 at 8:09 PM, Nirav Patel <npa...@xactlycorp.com>
>>> wrote:
>>>
>>>> We have been trying to solve memory issue with a spark job that
>>>> processes 150GB of data (on disk). It does a groupBy operation; some of the
>>>> executor will receive somehwere around (2-4M scala case objects) to work
>>>> with. We are using following spark config:
>>>>
>>>> "executorInstances": "15",
>>>>
>>>>  "executorCores": "1", (we reduce it to one so single task gets
>>>> all the executorMemory! at least that's the assumption here)
>>>>
>>>>  "executorMemory": "15000m",
>>>>
>>>>  "minPartitions": "2000",
>>>>
>>>>  "taskCpus": "1",
>>>>
>>>>  "executorMemoryOverhead": "1300",
>>>>
>>>>  "shuffleManager": "tungsten-sort",
>>>>
>>>>   "storageF

Job fails at saveAsHadoopDataset stage due to Lost Executor due to reason unknown so far

2016-03-01 Thread Nirav Patel
Hi,

I have a spark jobs that runs on yarn and keeps failing at line where i do :


val hConf = HBaseConfiguration.create
hConf.setInt("hbase.client.scanner.caching", 1)
hConf.setBoolean("hbase.cluster.distributed", true)

new PairRDDFunctions(hbaseRdd).saveAsHadoopDataset(jobConfig)


Basically at this stage multiple Executors fails after high GC
activities. However none of the executor logs, driver logs or node
manager logs indicate any OutOfMemory errors or GC Overhead Exceeded
errors or memory limits exceeded errors. I don't see any other reason
for Executor failures as well.



Driver Logs:

Failing Oozie Launcher, Main class
[org.apache.oozie.action.hadoop.SparkMain], main() threw exception,
Job aborted due to stage failure: Task 388 in stage 22.0 failed 4
times, most recent failure: Lost task 388.3 in stage 22.0 (TID 32141,
maprnode5): ExecutorLostFailure (executor 5 lost)
Driver stacktrace:
org.apache.spark.SparkException: Job aborted due to stage failure:
Task 388 in stage 22.0 failed 4 times, most recent failure: Lost task
388.3 in stage 22.0 (TID 32141, maprnode5): ExecutorLostFailure
(executor 5 lost)
Driver stacktrace:
at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at scala.Option.foreach(Option.scala:236)
at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at 
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1824)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1837)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1914)
at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1124)
at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1065)
at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1065)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:310)
at 
org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1065)



Executor logs:


16/02/24 11:09:47 INFO executor.Executor: Finished task 224.0 in stage
8.0 (TID 15318). 2099 bytes result sent to driver
16/02/24 11:09:47 INFO executor.CoarseGrainedExecutorBackend: Got
assigned task 15333
16/02/24 11:09:47 INFO executor.Executor: Running task 239.0 in stage
8.0 (TID 15333)
16/02/24 11:09:47 INFO storage.ShuffleBlockFetcherIterator: Getting
125 non-empty blocks out of 3007 blocks
16/02/24 11:09:47 INFO storage.ShuffleBlockFetcherIterator: Started 14
remote fetches in 10 ms
16/02/24 11:11:47 ERROR server.TransportChannelHandler: Connection to
maprnode5 has been quiet for 12 ms while there are outstanding
requests. Assuming connection is dead; please adjust
spark.network.timeout if this is wrong.
16/02/24 11:11:47 ERROR client.TransportResponseHandler: Still have 1
requests outstanding when connection from maprnode5 is closed
16/02/24 11:11:47 ERROR shuffle.OneForOneBlockFetcher: Failed while
starting block fetches
java.io.IOException: Connection from maprnode5 closed
at 
org.apache.spark.network.client.TransportResponseHandler.channelUnregistered(TransportResponseHandler.java:104)
at 
org.apache.spark.network.server.TransportChannelHandler.channelUnregistered(TransportChannelHandler.java:91)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
at 

Re: Spark executor killed without apparent reason

2016-03-01 Thread Nirav Patel
Hi Ryan,

I did search "OutOfMemoryError" earlier and just now but it doesn't
indicate anything else.

Another thing is Job fails at "saveAsHadoopDataset" call to huge rdd. Most
of the executors fails at this stage. I don't understand that as well.
Because that should be a straight write job to filesystem. All the complex
joins and logic were in previous stages which all ran successfully.

Thanks
Nirav

On Wed, Mar 2, 2016 at 2:22 AM, Shixiong(Ryan) Zhu <shixi...@databricks.com>
wrote:

> Could you search "OutOfMemoryError" in the executor logs? It could be
> "OufOfMemoryError: Direct Buffer Memory" or something else.
>
> On Tue, Mar 1, 2016 at 6:23 AM, Nirav Patel <npa...@xactlycorp.com> wrote:
>
>> Hi,
>>
>> We are using spark 1.5.2 or yarn. We have a spark application utilizing
>> about 15GB executor memory and 1500 overhead. However, at certain stage we
>> notice higher GC time (almost same as task time) spent. These executors are
>> bound to get killed at some point. However, nodemanager or resource manager
>> logs doesn't indicate failure due to 'beyond physical/virtual memory
>> limits' nor I see any 'heap space' or 'gc overhead exceeded' errors in
>> executor logs. Some of these high GC executor gets killed eventually but I
>> can't seem to find reason. Based on application logs it seems like executor
>> didn't respond to driver for long period of time and connection was reset.
>>
>> Following are logs from 'yarn logs -applicationId appId_1232_xxx'
>>
>>
>> 16/02/24 11:09:47 INFO executor.Executor: Finished task 224.0 in stage
>> 8.0 (TID 15318). 2099 bytes result sent to driver
>> 16/02/24 11:09:47 INFO executor.CoarseGrainedExecutorBackend: Got
>> assigned task 15333
>> 16/02/24 11:09:47 INFO executor.Executor: Running task 239.0 in stage 8.0
>> (TID 15333)
>> 16/02/24 11:09:47 INFO storage.ShuffleBlockFetcherIterator: Getting 125
>> non-empty blocks out of 3007 blocks
>> 16/02/24 11:09:47 INFO storage.ShuffleBlockFetcherIterator: Started 14
>> remote fetches in 10 ms
>> 16/02/24 11:11:47 ERROR server.TransportChannelHandler: Connection to
>> maprnode5 has been quiet for 12 ms while there are outstanding
>> requests. Assuming connection is dead; please adjust spark.network.timeout
>> if this is wrong.
>> 16/02/24 11:11:47 ERROR client.TransportResponseHandler: Still have 1
>> requests outstanding when connection from maprnode5 is closed
>> 16/02/24 11:11:47 ERROR shuffle.OneForOneBlockFetcher: Failed while
>> starting block fetches
>> java.io.IOException: Connection from maprnode5 closed
>> at
>> org.apache.spark.network.client.TransportResponseHandler.channelUnregistered(TransportResponseHandler.java:104)
>> at
>> org.apache.spark.network.server.TransportChannelHandler.channelUnregistered(TransportChannelHandler.java:91)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
>> at
>> io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
>> at
>> io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
>> at
>> io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
>> at
>> io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:739)
>> at
>> io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:659)
>> at
>> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
>>   

Spark executor killed without apparent reason

2016-03-01 Thread Nirav Patel
Hi,

We are using spark 1.5.2 or yarn. We have a spark application utilizing
about 15GB executor memory and 1500 overhead. However, at certain stage we
notice higher GC time (almost same as task time) spent. These executors are
bound to get killed at some point. However, nodemanager or resource manager
logs doesn't indicate failure due to 'beyond physical/virtual memory
limits' nor I see any 'heap space' or 'gc overhead exceeded' errors in
executor logs. Some of these high GC executor gets killed eventually but I
can't seem to find reason. Based on application logs it seems like executor
didn't respond to driver for long period of time and connection was reset.

Following are logs from 'yarn logs -applicationId appId_1232_xxx'


16/02/24 11:09:47 INFO executor.Executor: Finished task 224.0 in stage 8.0
(TID 15318). 2099 bytes result sent to driver
16/02/24 11:09:47 INFO executor.CoarseGrainedExecutorBackend: Got assigned
task 15333
16/02/24 11:09:47 INFO executor.Executor: Running task 239.0 in stage 8.0
(TID 15333)
16/02/24 11:09:47 INFO storage.ShuffleBlockFetcherIterator: Getting 125
non-empty blocks out of 3007 blocks
16/02/24 11:09:47 INFO storage.ShuffleBlockFetcherIterator: Started 14
remote fetches in 10 ms
16/02/24 11:11:47 ERROR server.TransportChannelHandler: Connection to
maprnode5 has been quiet for 12 ms while there are outstanding
requests. Assuming connection is dead; please adjust spark.network.timeout
if this is wrong.
16/02/24 11:11:47 ERROR client.TransportResponseHandler: Still have 1
requests outstanding when connection from maprnode5 is closed
16/02/24 11:11:47 ERROR shuffle.OneForOneBlockFetcher: Failed while
starting block fetches
java.io.IOException: Connection from maprnode5 closed
at
org.apache.spark.network.client.TransportResponseHandler.channelUnregistered(TransportResponseHandler.java:104)
at
org.apache.spark.network.server.TransportChannelHandler.channelUnregistered(TransportChannelHandler.java:91)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
at
io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
at
io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
at
io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
at
io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:739)
at
io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:659)
at
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
at
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at java.lang.Thread.run(Thread.java:744)
16/02/24 11:11:47 INFO shuffle.RetryingBlockFetcher: Retrying fetch (1/3)
for 6 outstanding blocks after 5000 ms
16/02/24 11:11:52 INFO client.TransportClientFactory: Found inactive
connection to maprnode5, creating a new one.
16/02/24 11:12:16 WARN server.TransportChannelHandler: Exception in
connection from maprnode5
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:192)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
at
io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:313)
at
io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881)
at
io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:242)
at
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
at

  1   2   >