Re: Does spark read the same file twice, if two stages are using the same DataFrame?

2023-05-09 Thread Nitin Siwach
I do not think InMemoryFileIndex means it is caching the data. The caches
get shown as InMemoryTableScan. InMemoryFileIndex is just for partition
discovery and partition pruning.
Any read will always show up as a scan from InMemoryFileIndex. It is not
cached data. It is a cached file index. Please correct my understanding if
I am wrong

Even the following code shows a scan from an InMemoryFileIndex
```
df1 = spark.read.csv("./df1.csv", header=True, schema = schema)
df1.explain(mode = "extended")
```

output:
```

== Parsed Logical Plan ==
Relation [index#50,0#51] csv

== Analyzed Logical Plan ==
index: string, 0: string
Relation [index#50,0#51] csv

== Optimized Logical Plan ==
Relation [index#50,0#51] csv

== Physical Plan ==
FileScan csv [index#50,0#51] Batched: false, DataFilters: [], Format:
CSV, Location: InMemoryFileIndex(1
paths)[file:/home/nitin/work/df1.csv], PartitionFilters: [],
PushedFilters: [], ReadSchema: struct

```

On Mon, May 8, 2023 at 1:07 AM Mich Talebzadeh 
wrote:

> When I run this job in local mode  spark-submit --master local[4]
>
> with
>
> spark = SparkSession.builder \
> .appName("tests") \
> .enableHiveSupport() \
> .getOrCreate()
> spark.conf.set("spark.sql.adaptive.enabled", "true")
> df3.explain(extended=True)
>
> and no caching
>
> I see this plan
>
> == Parsed Logical Plan ==
> 'Join UsingJoin(Inner, [index])
> :- Relation [index#0,0#1] csv
> +- Aggregate [index#11], [index#11, avg(cast(0#12 as double)) AS avg(0)#7]
>+- Relation [index#11,0#12] csv
>
> == Analyzed Logical Plan ==
> index: string, 0: string, avg(0): double
> Project [index#0, 0#1, avg(0)#7]
> +- Join Inner, (index#0 = index#11)
>:- Relation [index#0,0#1] csv
>+- Aggregate [index#11], [index#11, avg(cast(0#12 as double)) AS
> avg(0)#7]
>   +- Relation [index#11,0#12] csv
>
> == Optimized Logical Plan ==
> Project [index#0, 0#1, avg(0)#7]
> +- Join Inner, (index#0 = index#11)
>:- Filter isnotnull(index#0)
>:  +- Relation [index#0,0#1] csv
>+- Aggregate [index#11], [index#11, avg(cast(0#12 as double)) AS
> avg(0)#7]
>   +- Filter isnotnull(index#11)
>  +- Relation [index#11,0#12] csv
>
> == Physical Plan ==
> AdaptiveSparkPlan isFinalPlan=false
> +- Project [index#0, 0#1, avg(0)#7]
>+- BroadcastHashJoin [index#0], [index#11], Inner, BuildRight, false
>   :- Filter isnotnull(index#0)
>   :  +- FileScan csv [index#0,0#1] Batched: false, DataFilters:
> [isnotnull(index#0)], Format: CSV, Location: InMemoryFileIndex(1
> paths)[hdfs://rhes75:9000/tmp/df1.csv], PartitionFilters: [],
> PushedFilters: [IsNotNull(index)], ReadSchema:
> struct
>   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0,
> string, true]),false), [plan_id=174]
>  +- HashAggregate(keys=[index#11], functions=[avg(cast(0#12 as
> double))], output=[index#11, avg(0)#7])
> +- Exchange hashpartitioning(index#11, 200),
> ENSURE_REQUIREMENTS, [plan_id=171]
>+- HashAggregate(keys=[index#11],
> functions=[partial_avg(cast(0#12 as double))], output=[index#11, sum#28,
> count#29L])
>   +- Filter isnotnull(index#11)
>  +- FileScan csv [index#11,0#12] Batched: false,
> DataFilters: [isnotnull(index#11)], Format: CSV, Location:
> InMemoryFileIndex(1 paths)[hdfs://rhes75:9000/tmp/df1.csv],
> PartitionFilters: [], PushedFilters: [IsNotNull(index)], ReadSchema:
> struct
>
>
> so two in memory file scans for the csv file. So it caches the data
> already given the small result set. Do you see this?
>
> HTH
>
>
> Mich Talebzadeh,
> Lead Solutions Architect/Engineering Lead
> Palantir Technologies Limited
> London
> United Kingdom
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sun, 7 May 2023 at 17:48, Nitin Siwach  wrote:
>
>> Thank you for the help Mich :)
>>
>> I have not started with a pandas DF. I have used pandas to create a dummy
>> .csv which I dump on the disk that I intend to use to showcase my pain
>> point. Providing pandas code was to ensure an end-to-end runnable example
>> is provided and the effort on anyone trying to help me out is minimized
>&

Re: Does spark read the same file twice, if two stages are using the same DataFrame?

2023-05-07 Thread Nitin Siwach
Thank you for the help Mich :)

I have not started with a pandas DF. I have used pandas to create a dummy
.csv which I dump on the disk that I intend to use to showcase my pain
point. Providing pandas code was to ensure an end-to-end runnable example
is provided and the effort on anyone trying to help me out is minimized

I don't think Spark validating the file existence qualifies as an action
according to Spark parlance. Sure there would be an analysis exception in
case the file is not found as per the location provided, however, if you
provided a schema and a valid path then no job would show up on the spark
UI validating (IMO) that no action has been taken. (1 Action necessarily
equals at least one job). If you don't provide the schema then a job is
triggered (an action) to infer the schema for subsequent logical planning.

Since I am just demonstrating my lack of understanding I have chosen local
mode. Otherwise, I do use google buckets to host all the data

This being said I think my question is something entirely different. It is
that calling one action  (df3.count()) is reading the same csv twice. I do
not understand that. So far, I always thought that data should be persisted
only in case a DAG subset is to be reused by several actions.


On Sun, May 7, 2023 at 9:47 PM Mich Talebzadeh 
wrote:

> You have started with panda DF which won't scale outside of the driver
> itself.
>
> Let us put that aside.
> df1.to_csv("./df1.csv",index_label = "index")  ## write the dataframe to
> the underlying file system
>
> starting with spark
>
> df1 = spark.read.csv("./df1.csv", header=True, schema = schema) ## read
> the dataframe from the underlying file system
>
> That is your first action because spark needs to validate that file
> (exiss) and the schema. What will happen if that file does not exist
>
> csvlocation="/tmp/df1.csv"
> csvlocation2="/tmp/df5.csv"
> df1=  pd.DataFrame(np.arange(1_000).reshape(-1,1))
> df1.index = np.random.choice(range(10),size=1000)
> df1.to_csv(csvlocation,index_label = "index")
> Schema = StructType([StructField('index', StringType(), True),
>  StructField('0', StringType(), True)])
>
> df1 = spark.read.csv(*csvlocation2*, header=True, schema =
> Schema).cache()  ## incorrect location
>
> df2 = df1.groupby("index").agg(F.mean("0"))
> df3 = df1.join(df2,on='index')
> df3.show()
> #df3.explain()
> df3.count()
>
>
> error
>
> pyspark.errors.exceptions.captured.AnalysisException: [PATH_NOT_FOUND]
> Path does not exist: hdfs://rhes75:9000/tmp/df5.csv.
>
> In a distributed env, that csv file has to be available to all spark
> workers. Either you copy that file to all worker nodes or you put in HDFS
> or S3 or  gs:// locations to be available to all.
>
> It does not even get to df3.count()
>
> HTH
>
> Mich Talebzadeh,
> Lead Solutions Architect/Engineering Lead
> Palantir Technologies Limited
> London
> United Kingdom
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sun, 7 May 2023 at 15:53, Nitin Siwach  wrote:
>
>> Thank you for your response, Sir.
>>
>> My understanding is that the final ```df3.count()``` is the only action
>> in the code I have attached. In fact, I tried running the rest of the code
>> (commenting out just the final df3.count()) and, as I expected, no
>> computations were triggered
>>
>> On Sun, 7 May, 2023, 20:16 Mich Talebzadeh, 
>> wrote:
>>
>>>
>>> ...However, In my case here I am calling just one action. ..
>>>
>>> ok, which line  in your code is called one action?
>>>
>>>
>>> Mich Talebzadeh,
>>> Lead Solutions Architect/Engineering Lead
>>> Palantir Technologies Limited
>>> London
>>> United Kingdom
>>>
>>>
>>>view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>&g

Re: Does spark read the same file twice, if two stages are using the same DataFrame?

2023-05-07 Thread Nitin Siwach
Thank you for your response, Sir.

My understanding is that the final ```df3.count()``` is the only action in
the code I have attached. In fact, I tried running the rest of the code
(commenting out just the final df3.count()) and, as I expected, no
computations were triggered

On Sun, 7 May, 2023, 20:16 Mich Talebzadeh, 
wrote:

>
> ...However, In my case here I am calling just one action. ..
>
> ok, which line  in your code is called one action?
>
>
> Mich Talebzadeh,
> Lead Solutions Architect/Engineering Lead
> Palantir Technologies Limited
> London
> United Kingdom
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sun, 7 May 2023 at 14:13, Nitin Siwach  wrote:
>
>> @Vikas Kumar 
>> I am sorry but I thought that you had answered the other question that I
>> had raised to the same email address yesterday. It was around the SQL tab
>> in web UI and the output of .explain showing different plans.
>>
>> I get how using .cache I can ensure that the data from a particular
>> checkpoint is reused and the computations do not happen again.
>>
>> However, In my case here I am calling just one action. Within the purview
>> of one action Spark should not rerun the overlapping parts of the DAG. I do
>> not understand why the file scan is happening several times. I can easily
>> mitigate the issue by using window functions and creating all the columns
>> in one go without having to use several joins later on. That being said
>> this particular behavior is what I am trying ot understand. The golden rule
>> "The DAG overlaps wont run several times for one action" seems not to be
>> apocryphal. If you can shed some light on this matter I would appreciate it
>>
>> @weiruanl...@gmail.com  My datasets are very
>> small as you can see in the sample examples that I am creating as the first
>> part of the code
>>
>> Really appreciate you guys helping me out with this :)
>>
>> On Sun, May 7, 2023 at 12:23 PM Winston Lai 
>> wrote:
>>
>>> When your memory is not sufficient to keep the cached data for your jobs
>>> in two different stages, it might be read twice because Spark might have to
>>> clear the previous cache for other jobs. In those cases, a spill may
>>> triggered when Spark write your data from memory to disk.
>>>
>>> One way to to check is to read Spark UI. When Spark cache the data, you
>>> will see a little green dot connected to the blue rectangle in the Spark
>>> UI. If you see this green dot twice on your two stages, likely Spark spill
>>> the data after your first job and read it again in the second run. You can
>>> also confirm it in other metrics from Spark UI.
>>>
>>> That is my personal understanding based on what I have read and seen on
>>> my job runs. If there is any mistake, be free to correct me.
>>>
>>> Thank You & Best Regards
>>> Winston Lai
>>> --
>>> *From:* Nitin Siwach 
>>> *Sent:* Sunday, May 7, 2023 12:22:32 PM
>>> *To:* Vikas Kumar 
>>> *Cc:* User 
>>> *Subject:* Re: Does spark read the same file twice, if two stages are
>>> using the same DataFrame?
>>>
>>> Thank you tons, Vikas :). That makes so much sense now
>>>
>>> I'm in learning phase and was just browsing through various concepts of
>>> spark with self made small examples.
>>>
>>> It didn't make sense to me that the two physical plans should be
>>> different. But, now I understand what you're saying.
>>>
>>> Again, thank you for helping me out
>>>
>>> On Sun, 7 May, 2023, 07:48 Vikas Kumar,  wrote:
>>>
>>>
>>> Spark came up with a plan but that may or may not be optimal plan given
>>> the system settings.
>>> If you do df1.cache() , i am guessing spark will not read df1 twice.
>>>
>>> Btw, Why do you have adaptive enabled to be false?
>>>
>>> On Sat, May 6, 2023, 1:46 PM Nitin Siwach  wrote:
>>>
>>> I hope this email finds you well :)
>>>
>>> The following code re

Re: Does spark read the same file twice, if two stages are using the same DataFrame?

2023-05-07 Thread Nitin Siwach
@Vikas Kumar 
I am sorry but I thought that you had answered the other question that I
had raised to the same email address yesterday. It was around the SQL tab
in web UI and the output of .explain showing different plans.

I get how using .cache I can ensure that the data from a particular
checkpoint is reused and the computations do not happen again.

However, In my case here I am calling just one action. Within the purview
of one action Spark should not rerun the overlapping parts of the DAG. I do
not understand why the file scan is happening several times. I can easily
mitigate the issue by using window functions and creating all the columns
in one go without having to use several joins later on. That being said
this particular behavior is what I am trying ot understand. The golden rule
"The DAG overlaps wont run several times for one action" seems not to be
apocryphal. If you can shed some light on this matter I would appreciate it

@weiruanl...@gmail.com  My datasets are very small
as you can see in the sample examples that I am creating as the first part
of the code

Really appreciate you guys helping me out with this :)

On Sun, May 7, 2023 at 12:23 PM Winston Lai  wrote:

> When your memory is not sufficient to keep the cached data for your jobs
> in two different stages, it might be read twice because Spark might have to
> clear the previous cache for other jobs. In those cases, a spill may
> triggered when Spark write your data from memory to disk.
>
> One way to to check is to read Spark UI. When Spark cache the data, you
> will see a little green dot connected to the blue rectangle in the Spark
> UI. If you see this green dot twice on your two stages, likely Spark spill
> the data after your first job and read it again in the second run. You can
> also confirm it in other metrics from Spark UI.
>
> That is my personal understanding based on what I have read and seen on my
> job runs. If there is any mistake, be free to correct me.
>
> Thank You & Best Regards
> Winston Lai
> --
> *From:* Nitin Siwach 
> *Sent:* Sunday, May 7, 2023 12:22:32 PM
> *To:* Vikas Kumar 
> *Cc:* User 
> *Subject:* Re: Does spark read the same file twice, if two stages are
> using the same DataFrame?
>
> Thank you tons, Vikas :). That makes so much sense now
>
> I'm in learning phase and was just browsing through various concepts of
> spark with self made small examples.
>
> It didn't make sense to me that the two physical plans should be
> different. But, now I understand what you're saying.
>
> Again, thank you for helping me out
>
> On Sun, 7 May, 2023, 07:48 Vikas Kumar,  wrote:
>
>
> Spark came up with a plan but that may or may not be optimal plan given
> the system settings.
> If you do df1.cache() , i am guessing spark will not read df1 twice.
>
> Btw, Why do you have adaptive enabled to be false?
>
> On Sat, May 6, 2023, 1:46 PM Nitin Siwach  wrote:
>
> I hope this email finds you well :)
>
> The following code reads the same csv twice even though only one action is
> called
>
> End to end runnable example:
> ```
> import pandas as pd
> import numpy as np
>
> df1=  pd.DataFrame(np.arange(1_000).reshape(-1,1))
> df1.index = np.random.choice(range(10),size=1000)
> df1.to_csv("./df1.csv",index_label = "index")
>
> 
>
> from pyspark.sql import SparkSession
> from pyspark.sql import functions as F
> from pyspark.sql.types import StructType, StringType, StructField
>
> spark =
> SparkSession.builder.config("spark.sql.autoBroadcastJoinThreshold","-1").\
> config("spark.sql.adaptive.enabled","false").getOrCreate()
>
> schema = StructType([StructField('index', StringType(), True),
>  StructField('0', StringType(), True)])
>
> df1 = spark.read.csv("./df1.csv", header=True, schema = schema)
>
> df2 = df1.groupby("index").agg(F.mean("0"))
> df3 = df1.join(df2,on='index')
>
> df3.explain()
> df3.count()
> ```
>
> The sql tab in the web UI shows the following:
>
> [image: screencapture-localhost-4040-SQL-execution-2023-05-06-19_48_41.png]
>
> As you can see, the df1 file is read twice. Is this the expected
> behaviour? Why is that happening? I have just one action so the same part
> of the pipeline should not run multiple times.
>
> I have read the answer [here][1]
> <https://stackoverflow.com/questions/37894099/does-spark-read-the-same-file-twice-if-two-stages-are-using-the-same-rdd>.
> The question is almost the same it is just that in that question the RDDs
> are used and I am using dataframe in pyspark API. In th

What is DataFilters and while joining why is the filter isnotnull[joinKey] applied twice

2023-01-31 Thread Nitin Siwach
Pyspark version:3.1.3

*Question 1: *What is DataFilters in spark physical plan? How is it
different from PushedFilters?
*Question 2:* When joining two datasets, Why is the filter isnotnull
applied twice on the joining key column? In the physical plan, it is once
applied as a PushedFilter and then explicitly applied right after it. Why
is that so?


code:

import os
import pandas as pd, numpy as np
import pyspark
spark=pyspark.sql.SparkSession.builder.getOrCreate()

save_loc = "gs://monsoon-credittech.appspot.com/spark_datasets/random_tests/
"

df1 =
spark.createDataFrame(pd.DataFrame({'a':np.random.choice([1,2,None],size =
1000, p = [0.47,0.48,0.05]),
 'b': np.random.random(1000)}))

df2 =
spark.createDataFrame(pd.DataFrame({'a':np.random.choice([1,2,None],size =
1000, p = [0.47,0.48,0.05]),
 'b': np.random.random(1000)}))

df1.write.parquet(os.path.join(save_loc,"dfl_key_int"))
df2.write.parquet(os.path.join(save_loc,"dfr_key_int"))

dfl_int = spark.read.parquet(os.path.join(save_loc,"dfl_key_int"))
dfr_int = spark.read.parquet(os.path.join(save_loc,"dfr_key_int"))

dfl_int.join(dfr_int,on='a',how='inner').explain()



output:

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [a#23L, b#24, b#28]
   +- BroadcastHashJoin [a#23L], [a#27L], Inner, BuildRight, false
  :- Filter isnotnull(a#23L)
  :  +- FileScan parquet [a#23L,b#24] Batched: true, DataFilters:
[isnotnull(a#23L)], Format: Parquet, Location:
InMemoryFileIndex[gs://monsoon-credittech.appspot.com/spark_datasets/random_tests/dfl_key_int],
PartitionFilters: [], PushedFilters: [IsNotNull(a)], ReadSchema:
struct
  +- BroadcastExchange HashedRelationBroadcastMode(List(input[0,
bigint, false]),false), [id=#75]
 +- Filter isnotnull(a#27L)
+- FileScan parquet [a#27L,b#28] Batched: true,
DataFilters: [isnotnull(a#27L)], Format: Parquet, Location:
InMemoryFileIndex[gs://monsoon-credittech.appspot.com/spark_datasets/random_tests/dfr_key_int],
PartitionFilters: [], PushedFilters: [IsNotNull(a)], ReadSchema:
struct



-- 
Regards,
Nitin


bucketBy in pyspark not retaining partition information

2022-01-31 Thread Nitin Siwach
I am reading two datasets that I saved to the disk with ```bucketBy```
option on the same key with the same number of partitions. When I read them
back and join them, they should not result in a shuffle.

But, that isn't the case I am seeing.

*The following code demonstrates the alleged behavior:*

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder.config("spark.sql.autoBroadcastJoinThreshold",
"-1").getOrCreate()
import random

data1 = [(i,random.randint(1,5),random.randint(1,5)) for t in range(2) for
i in range(5)]
data2 = [(i,random.randint(1,5),random.randint(1,5)) for t in range(2) for
i in range(5)]
df1=spark.createDataFrame(data1,schema = 'a int,b int,c int')
df2=spark.createDataFrame(data1,schema = 'a int,b int,c int')

parquet_path1 = './bucket_test_parquet1'
parquet_path2 = './bucket_test_parquet2'

df1.write.bucketBy(5,"a").format("parquet").saveAsTable('df',path=parquet_path1,mode='overwrite')
df2.write.bucketBy(5,"a").format("parquet").saveAsTable('df',path=parquet_path2,mode='overwrite')

read_parquet1 = spark.read.format("parquet").load(parquet_path1,header=True)
read_parquet1.createOrReplaceTempView("read_parquet1")
read_parquet1.createOrReplaceTempView('read_parquet1')
read_parquet1 = spark.sql("SELECT * from read_parquet1")

read_parquet2 = spark.read.format("parquet").load(parquet_path2,header=True)
read_parquet2.createOrReplaceTempView("read_parquet2")
read_parquet2.createOrReplaceTempView('read_parquet2')
read_parquet2 = spark.sql("SELECT * from read_parquet2")
read_parquet1.join(read_parquet2,on='a').explain()

*The output that I am getting is*

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [a#24, b#25, c#26, b#34, c#35]
   +- SortMergeJoin [a#24], [a#33], Inner
  :- Sort [a#24 ASC NULLS FIRST], false, 0
  :  +- Exchange hashpartitioning(a#24, 200), ENSURE_REQUIREMENTS, [id=#61]
  : +- Filter isnotnull(a#24)
  :+- FileScan parquet [a#24,b#25,c#26] Batched: true,
DataFilters: [isnotnull(a#24)], Format: Parquet, Location:
InMemoryFileIndex(1
paths)[file:/home/nitin/pymonsoon/bucket_test_parquet1],
PartitionFilters: [], PushedFilters: [IsNotNull(a)], ReadSchema:
struct
  +- Sort [a#33 ASC NULLS FIRST], false, 0
 +- Exchange hashpartitioning(a#33, 200), ENSURE_REQUIREMENTS, [id=#62]
+- Filter isnotnull(a#33)
   +- FileScan parquet [a#33,b#34,c#35] Batched: true,
DataFilters: [isnotnull(a#33)], Format: Parquet, Location:
InMemoryFileIndex(1
paths)[file:/home/nitin/pymonsoon/bucket_test_parquet2],
PartitionFilters: [], PushedFilters: [IsNotNull(a)], ReadSchema:
struct


*Which clearly has hashpartitioning goiong on. Kindly, help me clarify
the utility of ```bucketBy```*


understanding iterator of series to iterator of series pandasUDF

2022-01-04 Thread Nitin Siwach
I understand pandasUDF as follows:

1. There are multiple partitions per worker
2. Multiple arrow batches are converted per partition
3. Sent to python process
4. In the case of Series to Series the pandasUDF is applied to each arrow
batch one after the other? **(So, is it that (a) - The vectorisation is at
the arrow batch level but each batch, in turn, is processed sequentially by
the worker. Or, is it that (b) - The arrow batches are combined after all
have arrived and then the pandasUDF is applied to the whole?)** I think it
is (b). i.e. the arrow batches are combined. I have given my reasoning below

Given this understanding and blackbishop's answer I have the following
further questions:

*How exactly is Iterator versions of pandasUDFs working?*

1. If there is some expensive initialization then why can we not do that in
the case of series to series pandasUDF as well. In the case of iterator of
series to iterator of series the initialization is done and is shared
across all the workers and used for all the arrow batches. Why can not the
same process be followed for a series to series pandasUDF? initialize -->
Share to workers --> once all the arrow batches are combined on a worker,
Apply?
2. I can see that we might want to separate out the execution of i/o and
python code on arrow batches so as one batch is being read in the pandasUDF
is being run on the previous batch. (Why is this not done in the case of
series to series? **This is why I think all the arrow batches are combined
before running them through the pandasUDF. Because, otherwise the same i/o
parallelization benefits are available for series to series pandasUDF as
well**

One more question:

1. Since the output is an Iterator of Series, where is the vectorisation
then? Is it that the pandasUDF is run on an entire arrow batch and then the
result is emitted row by row? Or, is the pandasUDF processing the arrow
batches row by row and then emitting the result (This loses vectorisation
as I see it)


Re: Read hdfs files in spark streaming

2019-06-11 Thread nitin jain
Hi Deepak,
Please let us know - how you managed it ?

Thanks,
NJ

On Mon, Jun 10, 2019 at 4:42 PM Deepak Sharma  wrote:

> Thanks All.
> I managed to get this working.
> Marking this thread as closed.
>
> On Mon, Jun 10, 2019 at 4:14 PM Deepak Sharma 
> wrote:
>
>> This is the project requirement , where paths are being streamed in kafka
>> topic.
>> Seems it's not possible using spark structured streaming.
>>
>>
>> On Mon, Jun 10, 2019 at 3:59 PM Shyam P  wrote:
>>
>>> Hi Deepak,
>>>  Why are you getting paths from kafka topic? any specific reason to do
>>> so ?
>>>
>>> Regards,
>>> Shyam
>>>
>>> On Mon, Jun 10, 2019 at 10:44 AM Deepak Sharma 
>>> wrote:
>>>
 The context is different here.
 The file path are coming as messages in kafka topic.
 Spark streaming (structured) consumes form this topic.
 Now it have to get the value from the message , thus the path to file.
 read the json stored at the file location into another df.

 Thanks
 Deepak

 On Sun, Jun 9, 2019 at 11:03 PM vaquar khan 
 wrote:

> Hi Deepak,
>
> You can use textFileStream.
>
> https://spark.apache.org/docs/2.2.0/streaming-programming-guide.html
>
> Plz start using stackoverflow to ask question to other ppl so get
> benefits of answer
>
>
> Regards,
> Vaquar khan
>
> On Sun, Jun 9, 2019, 8:08 AM Deepak Sharma 
> wrote:
>
>> I am using spark streaming application to read from  kafka.
>> The value coming from kafka message is path to hdfs file.
>> I am using spark 2.x , spark.read.stream.
>> What is the best way to read this path in spark streaming and then
>> read the json stored at the hdfs path , may be using spark.read.json , 
>> into
>> a df inside the spark streaming app.
>> Thanks a lot in advance
>>
>> --
>> Thanks
>> Deepak
>>
>

 --
 Thanks
 Deepak
 www.bigdatabig.com
 www.keosha.net

>>>
>>
>> --
>> Thanks
>> Deepak
>> www.bigdatabig.com
>> www.keosha.net
>>
>
>
> --
> Thanks
> Deepak
> www.bigdatabig.com
> www.keosha.net
>


Re: MLib : Non Linear Optimization

2016-09-09 Thread Nitin Sareen
Yes, we are using primarily these two algorithms.

1.   Interior point trust-region line-search algorithm

2. Active-set trust-region line-search algorithm


We are performing optimizations with constraints & thresholds etc.


We are primarily using Lindo / SAS modules but want to get away from SAS
due to the cost, it would be really good to have these algorithms in Spark
ML.


Let me know if you need any more info, i can share some snippets if
required.


Thanks,

Nitin

On Thu, Sep 8, 2016 at 2:08 PM, Robin East <robin.e...@xense.co.uk> wrote:

> Do you have any particular algorithms in mind? If you state the most
> common algorithms you use then it might stimulate the appropriate comments.
>
>
>
> > On 8 Sep 2016, at 05:04, nsareen <nsar...@gmail.com> wrote:
> >
> > Any answer to this question group ?
> >
> >
> >
> > --
> > View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/MLib-Non-Linear-Optimization-tp27645p27676.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
>
>


Re: Populating tables using hive and spark

2016-08-22 Thread Nitin Kumar
Hi Furcy,

If I execute the command "ANALYZE TABLE TEST_ORC COMPUTE STATISTICS" before
checking the count from hive, Hive returns the correct count albeit it does
not spawn a map-reduce job for computing the count.

I'm running a HDP 2.4 Cluster with Hive 1.2.1.2.4 and Spark 1.6.1

If others can concur we can go ahead and report it as a bug.

Regards,
Nitin



On Mon, Aug 22, 2016 at 4:15 PM, Furcy Pin <furcy@flaminem.com> wrote:

> Hi Nitin,
>
> I confirm that there is something odd here.
>
> I did the following test :
>
> create table test_orc (id int, name string, dept string) stored as ORC;
> insert into table test_orc values (1, 'abc', 'xyz');
> insert into table test_orc values (2, 'def', 'xyz');
> insert into table test_orc values (3, 'pqr', 'xyz');
> insert into table test_orc values (4, 'ghi', 'xyz');
>
>
> I ended up with 4 files on hdfs:
>
> 00_0
> 00_0_copy_1
> 00_0_copy_2
> 00_0_copy_3
>
>
> Then I renamed 00_0_copy_2 to part-0, and I still got COUNT(*) = 4
> with hive.
> So this is not a file name issue.
>
> I then removed one of the files, and I got this :
>
> > SELECT COUNT(1) FROM test_orc ;
> +--+--+
> | _c0  |
> +--+--+
> | 4|
> +--+--+
>
> > SELECT * FROM test_orc ;
> +--+++--+
> | test_orc.id  | test_orc.name  | test_orc.dept  |
> +--+++--+
> | 1| abc| xyz|
> | 2| def| xyz|
> | 4| ghi| xyz|
> +--+++--+
> 3 rows selected (0.162 seconds)
>
> So, my guess is that when Hive inserts data, it must keep somewhere in the
> metastore the number of rows in the table.
> However, if the files are modified by someone else than Hive itself,
> (either manually or with Spark), you end up with an inconsistency.
>
> So I guess we can call it a bug:
>
> Hive should detect that the files changed and invalidate its
> pre-calculated count.
> Optionally, Spark should be nice with Hive and update the the count when
> inserting.
>
> I don't know if this bug has already been reported, and I tested on Hive
> 1.1.0, so perhaps it is already solved in later releases.
>
> Regards,
>
> Furcy
>
>
> On Mon, Aug 22, 2016 at 9:34 AM, Nitin Kumar <nk94.nitinku...@gmail.com>
> wrote:
>
>> Hi!
>>
>> I've noticed that hive has problems in registering new data records if
>> the same table is written to using both the hive terminal and spark sql.
>> The problem is demonstrated through the commands listed below
>>
>> 
>> hive> use default;
>> hive> create table test_orc (id int, name string, dept string) stored as
>> ORC;
>> hive> insert into table test_orc values (1, 'abc', 'xyz');
>> hive> insert into table test_orc values (2, 'def', 'xyz');
>> hive> select count(*) from test_orc;
>> OK
>> 2
>> hive> select distinct(name) from test_orc;
>> OK
>> abc
>> def
>>
>> *** files in hdfs path in warehouse for the created table ***
>>
>>
>> ​
>>
>> >>> data_points = [(3, 'pqr', 'xyz'), (4, 'ghi', 'xyz')]
>> >>> column_names = ['identity_id', 'emp_name', 'dept_name']
>> >>> data_df = sqlContext.createDataFrame(data_points, column_names)
>> >>> data_df.show()
>>
>> +---++-+
>> |identity_id|emp_name|dept_name|
>> +---++-+
>> |  3| pqr|  xyz|
>> |  4| ghi|  xyz|
>> +---++-+
>>
>> >>> data_df.registerTempTable('temp_table')
>> >>> sqlContext.sql('insert into table default.test_orc select * from
>> temp_table')
>>
>> *** files in hdfs path in warehouse for the created table ***
>>
>> ​
>> hive> select count(*) from test_orc; (Does not launch map-reduce job)
>> OK
>> 2
>> hive> select distinct(name) from test_orc; (Launches map-reduce job)
>> abc
>> def
>> ghi
>> pqr
>>
>> hive> create table test_orc_new like test_orc stored as ORC;
>> hive> insert into table test_orc_new select * from test_orc;
>> hive> select count(*) from test_orc_new;
>> OK
>> 4
>> ==
>>
>> Even if I restart the hive services I cannot get the proper count output
>> from hive. This problem only occurs if the table is written to using both
>> hive and spark. If only spark is used to insert records into the table
>> multiple times, the count query in the hive terminal works perfectly fine.
>>
>> This problem occurs for tables stored with different storage formats as
>> well (textFile etc.)
>>
>> Is this because of the different naming conventions used by hive and
>> spark to write records to hdfs? Or maybe it is not a recommended practice
>> to write tables using different services?
>>
>> Your thoughts and comments on this matter would be highly appreciated!
>>
>> Thanks!
>> Nitin
>>
>>
>>
>


Populating tables using hive and spark

2016-08-22 Thread Nitin Kumar
Hi!

I've noticed that hive has problems in registering new data records if the
same table is written to using both the hive terminal and spark sql. The
problem is demonstrated through the commands listed below


hive> use default;
hive> create table test_orc (id int, name string, dept string) stored as
ORC;
hive> insert into table test_orc values (1, 'abc', 'xyz');
hive> insert into table test_orc values (2, 'def', 'xyz');
hive> select count(*) from test_orc;
OK
2
hive> select distinct(name) from test_orc;
OK
abc
def

*** files in hdfs path in warehouse for the created table ***


​

>>> data_points = [(3, 'pqr', 'xyz'), (4, 'ghi', 'xyz')]
>>> column_names = ['identity_id', 'emp_name', 'dept_name']
>>> data_df = sqlContext.createDataFrame(data_points, column_names)
>>> data_df.show()

+---++-+
|identity_id|emp_name|dept_name|
+---++-+
|  3| pqr|  xyz|
|  4| ghi|  xyz|
+---++-+

>>> data_df.registerTempTable('temp_table')
>>> sqlContext.sql('insert into table default.test_orc select * from
temp_table')

*** files in hdfs path in warehouse for the created table ***

​
hive> select count(*) from test_orc; (Does not launch map-reduce job)
OK
2
hive> select distinct(name) from test_orc; (Launches map-reduce job)
abc
def
ghi
pqr

hive> create table test_orc_new like test_orc stored as ORC;
hive> insert into table test_orc_new select * from test_orc;
hive> select count(*) from test_orc_new;
OK
4
==

Even if I restart the hive services I cannot get the proper count output
from hive. This problem only occurs if the table is written to using both
hive and spark. If only spark is used to insert records into the table
multiple times, the count query in the hive terminal works perfectly fine.

This problem occurs for tables stored with different storage formats as
well (textFile etc.)

Is this because of the different naming conventions used by hive and spark
to write records to hdfs? Or maybe it is not a recommended practice to
write tables using different services?

Your thoughts and comments on this matter would be highly appreciated!

Thanks!
Nitin


Re: Apache Spark : spark.eventLog.dir on Windows Environment

2015-07-21 Thread Nitin Kalra
Hi Akhil,

I don't have HADOOP_HOME or HADOOP_CONF_DIR and even winutils.exe ? What's
the configuration required for this ? From where can I get winutils.exe ?

Thanks and Regards,
Nitin Kalra


On Tue, Jul 21, 2015 at 1:30 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 Do you have HADOOP_HOME, HADOOP_CONF_DIR and hadoop's winutils.exe in the
 environment?

 Thanks
 Best Regards

 On Mon, Jul 20, 2015 at 5:45 PM, nitinkalra2000 nitinkalra2...@gmail.com
 wrote:

 Hi All,

 I am working on Spark 1.4 on windows environment. I have to set eventLog
 directory so that I can reopen the Spark UI after application has
 finished.

 But I am not able to set eventLog.dir, It gives an error on Windows
 environment.

 Configuation is :

 entry key=spark.eventLog.enabled value=true /
 entry key=spark.eventLog.dir value=file:///c:/sparklogs /

 Exception I get :

 java.io.IOException: Cannot run program cygpath: CreateProcess error=2,
 The system cannot find the file specified
 at java.lang.ProcessBuilder.start(Unknown Source)
 at org.apache.hadoop.util.Shell.runCommand(Shell.java:206)

 I have also tried installing Cygwin but still the error doesn't go.

 Can anybody give any advice on it?

 I have posted the same question on Stackoverflow as well :

 http://stackoverflow.com/questions/31468716/apache-spark-spark-eventlog-dir-on-windows-environment

 Thanks
 Nitin




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-spark-eventLog-dir-on-Windows-Environment-tp23913.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Does HiveContext connect to HiveServer2?

2015-06-24 Thread Nitin kak
Hi Marcelo,

The issue does not happen while connecting to the hive metstore, that works
fine. It seems that HiveContext only uses Hive CLI to execute the queries
while HiveServer2 does not support it. I dont think you can specify any
configuration in hive-site.xml which can make it connect to HiveServer2.

It becomes a blocking issue in case of Sentry where HiveServer2 does the
translation of authenticated users to hive user (which is the only user
that can access directories under hive/warehouse when Sentry is ON). The
HiveContext is able to access the metastore and then tries to access the
files under hive warehouse directory where i fails with permission error:

*org.apache.hadoop.security.AccessControlException: Permission denied:
user=kakn, access=READ_EXECUTE,
inode=/user/hive/warehouse/rt_freewheel_mastering.db/digital_profile_cluster_in:hive:hive:drwxrwx--t*

On Tue, Mar 24, 2015 at 1:43 PM, Marcelo Vanzin van...@cloudera.com wrote:

 spark-submit --files /path/to/hive-site.xml

 On Tue, Mar 24, 2015 at 10:31 AM, Udit Mehta ume...@groupon.com wrote:
  Another question related to this, how can we propagate the hive-site.xml
 to
  all workers when running in the yarn cluster mode?
 
  On Tue, Mar 24, 2015 at 10:09 AM, Marcelo Vanzin van...@cloudera.com
  wrote:
 
  It does neither. If you provide a Hive configuration to Spark,
  HiveContext will connect to your metastore server, otherwise it will
  create its own metastore in the working directory (IIRC).
 
  On Tue, Mar 24, 2015 at 8:58 AM, nitinkak001 nitinkak...@gmail.com
  wrote:
   I am wondering if HiveContext connects to HiveServer2 or does it work
   though
   Hive CLI. The reason I am asking is because Cloudera has deprecated
 Hive
   CLI.
  
   If the connection is through HiverServer2, is there a way to specify
   user
   credentials?
  
  
  
   --
   View this message in context:
  
 http://apache-spark-user-list.1001560.n3.nabble.com/Does-HiveContext-connect-to-HiveServer2-tp22200.html
   Sent from the Apache Spark User List mailing list archive at
 Nabble.com.
  
   -
   To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
   For additional commands, e-mail: user-h...@spark.apache.org
  
 
 
 
  --
  Marcelo
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 
 



 --
 Marcelo



Re: Hive query execution from Spark(through HiveContext) failing with Apache Sentry

2015-06-22 Thread Nitin kak
Any response to this guys?

On Fri, Jun 19, 2015 at 2:34 PM, Nitin kak nitinkak...@gmail.com wrote:

 Any other suggestions guys?

 On Wed, Jun 17, 2015 at 7:54 PM, Nitin kak nitinkak...@gmail.com wrote:

 With Sentry, only hive user has the permission for read/write/execute on
 the subdirectories of warehouse. All the users get translated to hive
 when interacting with hiveserver2. But i think HiveContext is bypassing
 hiveserver2.


 On Wednesday, June 17, 2015, ayan guha guha.a...@gmail.com wrote:

 Try to grant read execute access through sentry.
 On 18 Jun 2015 05:47, Nitin kak nitinkak...@gmail.com wrote:

 I am trying to run a hive query from Spark code using HiveContext
 object. It was running fine earlier but since the Apache Sentry has been
 set installed the process is failing with this exception :

 *org.apache.hadoop.security.AccessControlException: Permission denied:
 user=kakn, access=READ_EXECUTE,
 inode=/user/hive/warehouse/rt_freewheel_mastering.db/digital_profile_cluster_in:hive:hive:drwxrwx--t*

 I have pasted the full stack trace at the end of this post. My username
 kakn is a registered user with Sentry. I know that Spark takes all the
 configurations from hive-site.xml to execute the hql, so I added a few
 Sentry specific properties but seem to have no effect. I have attached the
 hive-site.xml

 *property*
 *namehive.security.authorization.task.factory/name*
 *
 valueorg.apache.sentry.binding.hive.SentryHiveAuthorizationTaskFactoryImpl/value*
 *  /property*
 *  property*
 *namehive.metastore.pre.event.listeners/name*
 *
 valueorg.apache.sentry.binding.metastore.MetastoreAuthzBinding/value*
 *descriptionlist of comma seperated listeners for metastore
 events./description*
 *  /property*
 *  property*
 *namehive.warehouse.subdir.inherit.perms/name*
 *valuetrue/value*
 * /property*




 *org.apache.hadoop.security.AccessControlException: Permission denied:
 user=kakn, access=READ_EXECUTE,
 inode=/user/hive/warehouse/rt_freewheel_mastering.db/digital_profile_cluster_in:hive:hive:drwxrwx--t*
 * at
 org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkFsPermission(DefaultAuthorizationProvider.java:257)*
 * at
 org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.check(DefaultAuthorizationProvider.java:238)*
 * at
 org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkPermission(DefaultAuthorizationProvider.java:151)*
 * at
 org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:138)*
 * at
 org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6287)*
 * at
 org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6269)*
 * at
 org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPathAccess(FSNamesystem.java:6194)*
 * at
 org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getListingInt(FSNamesystem.java:4793)*
 * at
 org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getListing(FSNamesystem.java:4755)*
 * at
 org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getListing(NameNodeRpcServer.java:800)*
 * at
 org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.getListing(AuthorizationProviderProxyClientProtocol.java:310)*
 * at
 org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getListing(ClientNamenodeProtocolServerSideTranslatorPB.java:606)*
 * at
 org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)*
 * at
 org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:587)*
 * at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1026)*
 * at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2013)*
 * at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2009)*
 * at java.security.AccessController.doPrivileged(Native Method)*
 * at javax.security.auth.Subject.doAs(Subject.java:415)*
 * at
 org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1642)*
 * at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2007)*

 * at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
 Method)*
 * at
 sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)*
 * at
 sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)*
 * at java.lang.reflect.Constructor.newInstance(Constructor.java:526)*
 * at
 org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)*
 * at
 org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)*
 * at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:1895)*
 * at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:1876)*
 * at
 org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:654

Re: Hive query execution from Spark(through HiveContext) failing with Apache Sentry

2015-06-19 Thread Nitin kak
Any other suggestions guys?

On Wed, Jun 17, 2015 at 7:54 PM, Nitin kak nitinkak...@gmail.com wrote:

 With Sentry, only hive user has the permission for read/write/execute on
 the subdirectories of warehouse. All the users get translated to hive
 when interacting with hiveserver2. But i think HiveContext is bypassing
 hiveserver2.


 On Wednesday, June 17, 2015, ayan guha guha.a...@gmail.com wrote:

 Try to grant read execute access through sentry.
 On 18 Jun 2015 05:47, Nitin kak nitinkak...@gmail.com wrote:

 I am trying to run a hive query from Spark code using HiveContext
 object. It was running fine earlier but since the Apache Sentry has been
 set installed the process is failing with this exception :

 *org.apache.hadoop.security.AccessControlException: Permission denied:
 user=kakn, access=READ_EXECUTE,
 inode=/user/hive/warehouse/rt_freewheel_mastering.db/digital_profile_cluster_in:hive:hive:drwxrwx--t*

 I have pasted the full stack trace at the end of this post. My username
 kakn is a registered user with Sentry. I know that Spark takes all the
 configurations from hive-site.xml to execute the hql, so I added a few
 Sentry specific properties but seem to have no effect. I have attached the
 hive-site.xml

 *property*
 *namehive.security.authorization.task.factory/name*
 *
 valueorg.apache.sentry.binding.hive.SentryHiveAuthorizationTaskFactoryImpl/value*
 *  /property*
 *  property*
 *namehive.metastore.pre.event.listeners/name*
 *
 valueorg.apache.sentry.binding.metastore.MetastoreAuthzBinding/value*
 *descriptionlist of comma seperated listeners for metastore
 events./description*
 *  /property*
 *  property*
 *namehive.warehouse.subdir.inherit.perms/name*
 *valuetrue/value*
 * /property*




 *org.apache.hadoop.security.AccessControlException: Permission denied:
 user=kakn, access=READ_EXECUTE,
 inode=/user/hive/warehouse/rt_freewheel_mastering.db/digital_profile_cluster_in:hive:hive:drwxrwx--t*
 * at
 org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkFsPermission(DefaultAuthorizationProvider.java:257)*
 * at
 org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.check(DefaultAuthorizationProvider.java:238)*
 * at
 org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkPermission(DefaultAuthorizationProvider.java:151)*
 * at
 org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:138)*
 * at
 org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6287)*
 * at
 org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6269)*
 * at
 org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPathAccess(FSNamesystem.java:6194)*
 * at
 org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getListingInt(FSNamesystem.java:4793)*
 * at
 org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getListing(FSNamesystem.java:4755)*
 * at
 org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getListing(NameNodeRpcServer.java:800)*
 * at
 org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.getListing(AuthorizationProviderProxyClientProtocol.java:310)*
 * at
 org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getListing(ClientNamenodeProtocolServerSideTranslatorPB.java:606)*
 * at
 org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)*
 * at
 org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:587)*
 * at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1026)*
 * at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2013)*
 * at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2009)*
 * at java.security.AccessController.doPrivileged(Native Method)*
 * at javax.security.auth.Subject.doAs(Subject.java:415)*
 * at
 org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1642)*
 * at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2007)*

 * at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
 Method)*
 * at
 sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)*
 * at
 sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)*
 * at java.lang.reflect.Constructor.newInstance(Constructor.java:526)*
 * at
 org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)*
 * at
 org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)*
 * at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:1895)*
 * at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:1876)*
 * at
 org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:654)*
 * at
 org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:104

Hive query execution from Spark(through HiveContext) failing with Apache Sentry

2015-06-17 Thread Nitin kak
I am trying to run a hive query from Spark code using HiveContext object.
It was running fine earlier but since the Apache Sentry has been set
installed the process is failing with this exception :

*org.apache.hadoop.security.AccessControlException: Permission denied:
user=kakn, access=READ_EXECUTE,
inode=/user/hive/warehouse/rt_freewheel_mastering.db/digital_profile_cluster_in:hive:hive:drwxrwx--t*

I have pasted the full stack trace at the end of this post. My username
kakn is a registered user with Sentry. I know that Spark takes all the
configurations from hive-site.xml to execute the hql, so I added a few
Sentry specific properties but seem to have no effect. I have attached the
hive-site.xml

*property*
*namehive.security.authorization.task.factory/name*
*
valueorg.apache.sentry.binding.hive.SentryHiveAuthorizationTaskFactoryImpl/value*
*  /property*
*  property*
*namehive.metastore.pre.event.listeners/name*
*
valueorg.apache.sentry.binding.metastore.MetastoreAuthzBinding/value*
*descriptionlist of comma seperated listeners for metastore
events./description*
*  /property*
*  property*
*namehive.warehouse.subdir.inherit.perms/name*
*valuetrue/value*
* /property*




*org.apache.hadoop.security.AccessControlException: Permission denied:
user=kakn, access=READ_EXECUTE,
inode=/user/hive/warehouse/rt_freewheel_mastering.db/digital_profile_cluster_in:hive:hive:drwxrwx--t*
* at
org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkFsPermission(DefaultAuthorizationProvider.java:257)*
* at
org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.check(DefaultAuthorizationProvider.java:238)*
* at
org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkPermission(DefaultAuthorizationProvider.java:151)*
* at
org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:138)*
* at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6287)*
* at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6269)*
* at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPathAccess(FSNamesystem.java:6194)*
* at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getListingInt(FSNamesystem.java:4793)*
* at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getListing(FSNamesystem.java:4755)*
* at
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getListing(NameNodeRpcServer.java:800)*
* at
org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.getListing(AuthorizationProviderProxyClientProtocol.java:310)*
* at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getListing(ClientNamenodeProtocolServerSideTranslatorPB.java:606)*
* at
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)*
* at
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:587)*
* at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1026)*
* at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2013)*
* at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2009)*
* at java.security.AccessController.doPrivileged(Native Method)*
* at javax.security.auth.Subject.doAs(Subject.java:415)*
* at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1642)*
* at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2007)*

* at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)*
* at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)*
* at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)*
* at java.lang.reflect.Constructor.newInstance(Constructor.java:526)*
* at
org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)*
* at
org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)*
* at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:1895)*
* at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:1876)*
* at
org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:654)*
* at
org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:104)*
* at
org.apache.hadoop.hdfs.DistributedFileSystem$14.doCall(DistributedFileSystem.java:716)*
* at
org.apache.hadoop.hdfs.DistributedFileSystem$14.doCall(DistributedFileSystem.java:712)*
* at
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)*
* at
org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:712)*
* at
org.apache.spark.sql.parquet.ParquetTypesConverter$.readMetaData(ParquetTypes.scala:440)*
* at
org.apache.spark.sql.parquet.ParquetTypesConverter$.readSchemaFromFile(ParquetTypes.scala:477)*
* at

Re: Hive query execution from Spark(through HiveContext) failing with Apache Sentry

2015-06-17 Thread Nitin kak
With Sentry, only hive user has the permission for read/write/execute on
the subdirectories of warehouse. All the users get translated to hive
when interacting with hiveserver2. But i think HiveContext is bypassing
hiveserver2.

On Wednesday, June 17, 2015, ayan guha guha.a...@gmail.com wrote:

 Try to grant read execute access through sentry.
 On 18 Jun 2015 05:47, Nitin kak nitinkak...@gmail.com
 javascript:_e(%7B%7D,'cvml','nitinkak...@gmail.com'); wrote:

 I am trying to run a hive query from Spark code using HiveContext object.
 It was running fine earlier but since the Apache Sentry has been set
 installed the process is failing with this exception :

 *org.apache.hadoop.security.AccessControlException: Permission denied:
 user=kakn, access=READ_EXECUTE,
 inode=/user/hive/warehouse/rt_freewheel_mastering.db/digital_profile_cluster_in:hive:hive:drwxrwx--t*

 I have pasted the full stack trace at the end of this post. My username
 kakn is a registered user with Sentry. I know that Spark takes all the
 configurations from hive-site.xml to execute the hql, so I added a few
 Sentry specific properties but seem to have no effect. I have attached the
 hive-site.xml

 *property*
 *namehive.security.authorization.task.factory/name*
 *
 valueorg.apache.sentry.binding.hive.SentryHiveAuthorizationTaskFactoryImpl/value*
 *  /property*
 *  property*
 *namehive.metastore.pre.event.listeners/name*
 *
 valueorg.apache.sentry.binding.metastore.MetastoreAuthzBinding/value*
 *descriptionlist of comma seperated listeners for metastore
 events./description*
 *  /property*
 *  property*
 *namehive.warehouse.subdir.inherit.perms/name*
 *valuetrue/value*
 * /property*




 *org.apache.hadoop.security.AccessControlException: Permission denied:
 user=kakn, access=READ_EXECUTE,
 inode=/user/hive/warehouse/rt_freewheel_mastering.db/digital_profile_cluster_in:hive:hive:drwxrwx--t*
 * at
 org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkFsPermission(DefaultAuthorizationProvider.java:257)*
 * at
 org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.check(DefaultAuthorizationProvider.java:238)*
 * at
 org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkPermission(DefaultAuthorizationProvider.java:151)*
 * at
 org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:138)*
 * at
 org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6287)*
 * at
 org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6269)*
 * at
 org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPathAccess(FSNamesystem.java:6194)*
 * at
 org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getListingInt(FSNamesystem.java:4793)*
 * at
 org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getListing(FSNamesystem.java:4755)*
 * at
 org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getListing(NameNodeRpcServer.java:800)*
 * at
 org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.getListing(AuthorizationProviderProxyClientProtocol.java:310)*
 * at
 org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getListing(ClientNamenodeProtocolServerSideTranslatorPB.java:606)*
 * at
 org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)*
 * at
 org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:587)*
 * at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1026)*
 * at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2013)*
 * at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2009)*
 * at java.security.AccessController.doPrivileged(Native Method)*
 * at javax.security.auth.Subject.doAs(Subject.java:415)*
 * at
 org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1642)*
 * at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2007)*

 * at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
 Method)*
 * at
 sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)*
 * at
 sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)*
 * at java.lang.reflect.Constructor.newInstance(Constructor.java:526)*
 * at
 org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)*
 * at
 org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)*
 * at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:1895)*
 * at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:1876)*
 * at
 org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:654)*
 * at
 org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:104)*
 * at
 org.apache.hadoop.hdfs.DistributedFileSystem$14.doCall

Re: Hive query execution from Spark(through HiveContext) failing with Apache Sentry

2015-06-17 Thread Nitin kak
With Sentry, only hive user has the permission for read/write/execute on
the subdirectories of warehouse. All the users get translated to hive
when interacting with hiveserver2. But i think HiveContext is bypassing
hiveserver2.

On Wednesday, June 17, 2015, ayan guha guha.a...@gmail.com wrote:

 Try to grant read execute access through sentry.
 On 18 Jun 2015 05:47, Nitin kak nitinkak...@gmail.com
 javascript:_e(%7B%7D,'cvml','nitinkak...@gmail.com'); wrote:

 I am trying to run a hive query from Spark code using HiveContext object.
 It was running fine earlier but since the Apache Sentry has been set
 installed the process is failing with this exception :

 *org.apache.hadoop.security.AccessControlException: Permission denied:
 user=kakn, access=READ_EXECUTE,
 inode=/user/hive/warehouse/rt_freewheel_mastering.db/digital_profile_cluster_in:hive:hive:drwxrwx--t*

 I have pasted the full stack trace at the end of this post. My username
 kakn is a registered user with Sentry. I know that Spark takes all the
 configurations from hive-site.xml to execute the hql, so I added a few
 Sentry specific properties but seem to have no effect. I have attached the
 hive-site.xml

 *property*
 *namehive.security.authorization.task.factory/name*
 *
 valueorg.apache.sentry.binding.hive.SentryHiveAuthorizationTaskFactoryImpl/value*
 *  /property*
 *  property*
 *namehive.metastore.pre.event.listeners/name*
 *
 valueorg.apache.sentry.binding.metastore.MetastoreAuthzBinding/value*
 *descriptionlist of comma seperated listeners for metastore
 events./description*
 *  /property*
 *  property*
 *namehive.warehouse.subdir.inherit.perms/name*
 *valuetrue/value*
 * /property*




 *org.apache.hadoop.security.AccessControlException: Permission denied:
 user=kakn, access=READ_EXECUTE,
 inode=/user/hive/warehouse/rt_freewheel_mastering.db/digital_profile_cluster_in:hive:hive:drwxrwx--t*
 * at
 org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkFsPermission(DefaultAuthorizationProvider.java:257)*
 * at
 org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.check(DefaultAuthorizationProvider.java:238)*
 * at
 org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkPermission(DefaultAuthorizationProvider.java:151)*
 * at
 org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:138)*
 * at
 org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6287)*
 * at
 org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6269)*
 * at
 org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPathAccess(FSNamesystem.java:6194)*
 * at
 org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getListingInt(FSNamesystem.java:4793)*
 * at
 org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getListing(FSNamesystem.java:4755)*
 * at
 org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getListing(NameNodeRpcServer.java:800)*
 * at
 org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.getListing(AuthorizationProviderProxyClientProtocol.java:310)*
 * at
 org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getListing(ClientNamenodeProtocolServerSideTranslatorPB.java:606)*
 * at
 org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)*
 * at
 org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:587)*
 * at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1026)*
 * at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2013)*
 * at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2009)*
 * at java.security.AccessController.doPrivileged(Native Method)*
 * at javax.security.auth.Subject.doAs(Subject.java:415)*
 * at
 org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1642)*
 * at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2007)*

 * at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
 Method)*
 * at
 sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)*
 * at
 sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)*
 * at java.lang.reflect.Constructor.newInstance(Constructor.java:526)*
 * at
 org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)*
 * at
 org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)*
 * at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:1895)*
 * at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:1876)*
 * at
 org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:654)*
 * at
 org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:104)*
 * at
 org.apache.hadoop.hdfs.DistributedFileSystem$14.doCall

Re: HiveContext test, Spark Context did not initialize after waiting 10000ms

2015-05-26 Thread Nitin kak
That is a much better solution than how I resolved it. I got around it by
placing comma separated jar paths for all the hive related jars in --jars
clause.

I will try your solution. Thanks for sharing it.

On Tue, May 26, 2015 at 4:14 AM, Mohammad Islam misla...@yahoo.com wrote:

 I got a similar problem.
 I'm not sure if your problem is already resolved.

 For the record, I solved this type of error by calling sc..setMaster(
 yarn-cluster);

 If you find the solution, please let us know.

 Regards,
 Mohammad





   On Friday, March 6, 2015 2:47 PM, nitinkak001 nitinkak...@gmail.com
 wrote:


 I am trying to run a Hive query from Spark using HiveContext. Here is the
 code

 / val conf = new SparkConf().setAppName(HiveSparkIntegrationTest)


 conf.set(spark.executor.extraClassPath,
 /opt/cloudera/parcels/CDH-5.2.0-1.cdh5.2.0.p0.36/lib/hive/lib);
 conf.set(spark.driver.extraClassPath,
 /opt/cloudera/parcels/CDH-5.2.0-1.cdh5.2.0.p0.36/lib/hive/lib);
 conf.set(spark.yarn.am.waitTime, 30L)

 val sc = new SparkContext(conf)

 val sqlContext = new HiveContext(sc)

 def inputRDD = sqlContext.sql(describe
 spark_poc.src_digital_profile_user);

 inputRDD.collect().foreach { println }

 println(inputRDD.schema.getClass.getName)
 /

 Getting this exception. Any clues? The weird part is if I try to do the
 same
 thing but in Java instead of Scala, it runs fine.

 /Exception in thread Driver java.lang.NullPointerException
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at

 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at

 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at

 org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:162)
 15/03/06 17:39:32 ERROR yarn.ApplicationMaster: SparkContext did not
 initialize after waiting for 1 ms. Please check earlier log output for
 errors. Failing the application.
 Exception in thread main java.lang.NullPointerException
 at

 org.apache.spark.deploy.yarn.ApplicationMaster.waitForSparkContextInitialized(ApplicationMaster.scala:218)
 at

 org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:110)
 at

 org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$main$1.apply$mcV$sp(ApplicationMaster.scala:434)
 at

 org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:53)
 at

 org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:52)
 at java.security.AccessController.doPrivileged(Native Method)
 at javax.security.auth.Subject.doAs(Subject.java:415)
 at

 org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1614)
 at

 org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:52)
 at

 org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:433)
 at

 org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala)
 15/03/06 17:39:32 INFO yarn.ApplicationMaster: AppMaster received a
 signal./



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/HiveContext-test-Spark-Context-did-not-initialize-after-waiting-1ms-tp21953.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






Re: how to clean shuffle write each iteration

2015-03-03 Thread nitin
Shuffle write will be cleaned if it is not referenced by any object
directly/indirectly. There is a garbage collector written inside spark which
periodically checks for weak references to RDDs/shuffle write/broadcast and
deletes them.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/how-to-clean-shuffle-write-each-iteration-tp21886p21889.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: SQLContext.applySchema strictness

2015-02-14 Thread nitin
AFAIK, this is the expected behavior. You have to make sure that the schema
matches the row. It won't give any error when you apply the schema as it
doesn't validate the nature of data.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SQLContext-applySchema-strictness-tp21650p21653.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark SQL - Point lookup optimisation in SchemaRDD?

2015-02-11 Thread nitin
I was able to resolve this use case (Thanks Cheng Lian) where I wanted to
launch executor on just the specific partition while also getting the batch
pruning optimisations of Spark SQL by doing following :-

val query = sql(SELECT * FROM cac
hedTable WHERE key = 1)
val plannedRDD = query.queryExecution.toRdd
val prunedRDD = PartitionPruningRDD.create(plannedRDD, _ == 3)
prunedRDD.collect()

Thanks a lot Cheng for suggesting the approach to do things other way round.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Point-lookup-optimisation-in-SchemaRDD-tp21555p21613.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark SQL - Point lookup optimisation in SchemaRDD?

2015-02-09 Thread nitin
Hi All,

I have a use case where I have cached my schemaRDD and I want to launch
executors just on the partition which I know of (prime use-case of
PartitionPruningRDD).

I tried something like following :-

val partitionIdx = 2
val schemaRdd = hiveContext.table(myTable) //myTable is cached in memory
val partitionPrunedRDD = new PartitionPrunedRDD(schemaRdd, _ ==
partitionIdx)
val partitionSchemaRDD = hiveContext.applySchema(partitionPrunedRDD,
schemaRdd.schema)
partitionSchemaRDD.registerTempTable(myTablePartition2)
hiveContext.hql(select * from myTablePartition2 where id=10001)

If I do this, if I expect my executor to run query in 500ms, it is running
in 3000-4000 ms. I think this is happening because I did applySchema and
lost the queryExecution plan. 

But, if I do partitionSchemaRDD.cache as well, then I get the 500ms
performance but in this case, same partition/data is getting cached twice. 

My question is that can we create a PartitionPruningCachedSchemaRDD like
class which can prune the partitions of InMemoryColumnarTableScan's
RDD[CachedBatch] and launch executor on just the selected partition(s)?

Thanks
-Nitin



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Point-lookup-optimisation-in-SchemaRDD-tp21555.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Driver Host under Yarn

2015-02-09 Thread nitin
Are you running in yarn-cluster or yarn-client mode?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Driver-Host-under-Yarn-tp21536p21556.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark (yarn-client mode) Hangs in final stages of Collect or Reduce

2015-02-09 Thread nitin
Have you checked the corresponding executor logs as well? I think information
provided by you here is less to actually understand your issue.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-yarn-client-mode-Hangs-in-final-stages-of-Collect-or-Reduce-tp21551p21557.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Where can I find logs set inside RDD processing functions?

2015-02-06 Thread Nitin kak
The yarn log aggregation is enabled and the logs which I get through yarn
logs -applicationId your_application_id
are no different than what I get through logs in Yarn Application tracking
URL. They still dont have the above logs.

On Fri, Feb 6, 2015 at 3:36 PM, Petar Zecevic petar.zece...@gmail.com
wrote:


 You can enable YARN log aggregation (yarn.log-aggregation-enable to true)
 and execute command
 yarn logs -applicationId your_application_id
 after your application finishes.

 Or you can look at them directly in HDFS in /tmp/logs/user/logs/
 applicationid/hostname

 On 6.2.2015. 19:50, nitinkak001 wrote:

 I am trying to debug my mapPartitionsFunction. Here is the code. There are
 two ways I am trying to log using log.info() or println(). I am running
 in
 yarn-cluster mode. While I can see the logs from driver code, I am not
 able
 to see logs from map, mapPartition functions in the Application Tracking
 URL. Where can I find the logs?

   /var outputRDD = partitionedRDD.mapPartitions(p = {
val outputList = new ArrayList[scala.Tuple3[Long, Long, Int]]
p.map({ case(key, value) = {
 log.info(Inside map)
 println(Inside map);
 for(i - 0 until outputTuples.size()){
   val outputRecord = outputTuples.get(i)
   if(outputRecord != null){
 outputList.add(outputRecord.getCurrRecordProfileID(),
 outputRecord.getWindowRecordProfileID, outputRecord.getScore())
   }
 }
  }
})
outputList.iterator()
  })/

 Here is my log4j.properties

 /log4j.rootCategory=INFO, console
 log4j.appender.console=org.apache.log4j.ConsoleAppender
 log4j.appender.console.target=System.err
 log4j.appender.console.layout=org.apache.log4j.PatternLayout
 log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p
 %c{1}: %m%n

 # Settings to quiet third party logs that are too verbose
 log4j.logger.org.eclipse.jetty=WARN
 log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR
 log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
 log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO/




 --
 View this message in context: http://apache-spark-user-list.
 1001560.n3.nabble.com/Where-can-I-find-logs-set-inside-
 RDD-processing-functions-tp21537.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Where can I find logs set inside RDD processing functions?

2015-02-06 Thread Nitin kak
yarn.nodemanager.remote-app-log-dir  is set to /tmp/logs

On Fri, Feb 6, 2015 at 4:14 PM, Ted Yu yuzhih...@gmail.com wrote:

 To add to What Petar said, when YARN log aggregation is enabled, consider
 specifying yarn.nodemanager.remote-app-log-dir which is where aggregated
 logs are saved.

 Cheers

 On Fri, Feb 6, 2015 at 12:36 PM, Petar Zecevic petar.zece...@gmail.com
 wrote:


 You can enable YARN log aggregation (yarn.log-aggregation-enable to true)
 and execute command
 yarn logs -applicationId your_application_id
 after your application finishes.

 Or you can look at them directly in HDFS in /tmp/logs/user/logs/
 applicationid/hostname


 On 6.2.2015. 19:50, nitinkak001 wrote:

 I am trying to debug my mapPartitionsFunction. Here is the code. There
 are
 two ways I am trying to log using log.info() or println(). I am running
 in
 yarn-cluster mode. While I can see the logs from driver code, I am not
 able
 to see logs from map, mapPartition functions in the Application Tracking
 URL. Where can I find the logs?

   /var outputRDD = partitionedRDD.mapPartitions(p = {
val outputList = new ArrayList[scala.Tuple3[Long, Long, Int]]
p.map({ case(key, value) = {
 log.info(Inside map)
 println(Inside map);
 for(i - 0 until outputTuples.size()){
   val outputRecord = outputTuples.get(i)
   if(outputRecord != null){
 outputList.add(outputRecord.
 getCurrRecordProfileID(),
 outputRecord.getWindowRecordProfileID, outputRecord.getScore())
   }
 }
  }
})
outputList.iterator()
  })/

 Here is my log4j.properties

 /log4j.rootCategory=INFO, console
 log4j.appender.console=org.apache.log4j.ConsoleAppender
 log4j.appender.console.target=System.err
 log4j.appender.console.layout=org.apache.log4j.PatternLayout
 log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p
 %c{1}: %m%n

 # Settings to quiet third party logs that are too verbose
 log4j.logger.org.eclipse.jetty=WARN
 log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR
 log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
 log4j.logger.org.apache.spark.repl.SparkILoop$
 SparkILoopInterpreter=INFO/




 --
 View this message in context: http://apache-spark-user-list.
 1001560.n3.nabble.com/Where-can-I-find-logs-set-inside-
 RDD-processing-functions-tp21537.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Sort based shuffle not working properly?

2015-02-03 Thread Nitin kak
This is an exerpt from the Design document of the implementation of Sort
based shuffle.. I am thinking I might be wrong in my perception of sort
based shuffle. Dont  completely understand it though.

*Motivation*
A sort­based shuffle can be more scalable than Spark’s current hash­based
one because it doesn’t require writing a separate file for each reduce task
from each mapper. Instead, we write a single sorted file and serve ranges
of it to different reducers. In jobs with a lot of reduce tasks (say
10,000+), this saves significant memory for compression and serialization
buffers and results in more sequential disk I/O.

*Implementation*
To perform a sort­based shuffle, each map task will produce one or more
output files sorted by a key’s partition ID, then merge­sort them to yield
a single output file. Because it’s only necessary to group the keys
together into partitions, we won’t bother to also sort them within each
partition

On Tue, Feb 3, 2015 at 5:41 PM, Nitin kak nitinkak...@gmail.com wrote:

 I thought thats what sort based shuffled did, sort the keys going to the
 same partition.

 I have tried (c1, c2) as (Int, Int) tuple as well. I don't think that
 ordering of c2 type is the problem here.

 On Tue, Feb 3, 2015 at 5:21 PM, Sean Owen so...@cloudera.com wrote:

 Hm, I don't think the sort partitioner is going to cause the result to
 be ordered by c1,c2 if you only partitioned on c1. I mean, it's not
 even guaranteed that the type of c2 has an ordering, right?

 On Tue, Feb 3, 2015 at 3:38 PM, nitinkak001 nitinkak...@gmail.com
 wrote:
  I am trying to implement secondary sort in spark as we do in map-reduce.
 
  Here is my data(tab separated, without c1, c2, c2).
  c1c2 c3
  1   2   4
  1   3   6
  2   4   7
  2   6   8
  3   5   5
  3   1   8
  3   2   0
 
  To do secondary sort, I create paried RDD as
 
  /((c1 + ,+ c2), row)/
 
  and then use a custom partitioner to partition only on c1. I have set
  /spark.shuffle.manager = SORT/ so the keys per partition are sorted.
 For the
  key 3 I am expecting to get
  (3, 1)
  (3, 2)
  (3, 5)
  but still getting the original order
  3,5
  3,1
  3,2
 
  Here is the custom partitioner code:
 
  /class StraightPartitioner(p: Int) extends org.apache.spark.Partitioner
 {
def numPartitions = p
def getPartition(key: Any) = {
  key.asInstanceOf[String].split(,)(0).toInt
}
 
  }/
 
  and driver code, please tell me what I am doing wrong
 
  /val conf = new SparkConf().setAppName(MapInheritanceExample)
  conf.set(spark.shuffle.manager, SORT);
  val sc = new SparkContext(conf)
  val pF = sc.textFile(inputFile)
 
  val log = LogFactory.getLog(MapFunctionTest)
  val partitionedRDD = pF.map { x =
 
  var arr = x.split(\t);
  (arr(0)+,+arr(1), null)
 
  }.partitionBy(new StraightPartitioner(10))
 
  var outputRDD = partitionedRDD.mapPartitions(p = {
p.map({ case(o, n) = {
 o
  }
})
  })/
 
 
 
  --
  View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Sort-based-shuffle-not-working-properly-tp21487.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 





Sort-basedshuffledesign.pdf
Description: Adobe PDF document

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Re: Sort based shuffle not working properly?

2015-02-03 Thread Nitin kak
I thought thats what sort based shuffled did, sort the keys going to the
same partition.

I have tried (c1, c2) as (Int, Int) tuple as well. I don't think that
ordering of c2 type is the problem here.

On Tue, Feb 3, 2015 at 5:21 PM, Sean Owen so...@cloudera.com wrote:

 Hm, I don't think the sort partitioner is going to cause the result to
 be ordered by c1,c2 if you only partitioned on c1. I mean, it's not
 even guaranteed that the type of c2 has an ordering, right?

 On Tue, Feb 3, 2015 at 3:38 PM, nitinkak001 nitinkak...@gmail.com wrote:
  I am trying to implement secondary sort in spark as we do in map-reduce.
 
  Here is my data(tab separated, without c1, c2, c2).
  c1c2 c3
  1   2   4
  1   3   6
  2   4   7
  2   6   8
  3   5   5
  3   1   8
  3   2   0
 
  To do secondary sort, I create paried RDD as
 
  /((c1 + ,+ c2), row)/
 
  and then use a custom partitioner to partition only on c1. I have set
  /spark.shuffle.manager = SORT/ so the keys per partition are sorted. For
 the
  key 3 I am expecting to get
  (3, 1)
  (3, 2)
  (3, 5)
  but still getting the original order
  3,5
  3,1
  3,2
 
  Here is the custom partitioner code:
 
  /class StraightPartitioner(p: Int) extends org.apache.spark.Partitioner {
def numPartitions = p
def getPartition(key: Any) = {
  key.asInstanceOf[String].split(,)(0).toInt
}
 
  }/
 
  and driver code, please tell me what I am doing wrong
 
  /val conf = new SparkConf().setAppName(MapInheritanceExample)
  conf.set(spark.shuffle.manager, SORT);
  val sc = new SparkContext(conf)
  val pF = sc.textFile(inputFile)
 
  val log = LogFactory.getLog(MapFunctionTest)
  val partitionedRDD = pF.map { x =
 
  var arr = x.split(\t);
  (arr(0)+,+arr(1), null)
 
  }.partitionBy(new StraightPartitioner(10))
 
  var outputRDD = partitionedRDD.mapPartitions(p = {
p.map({ case(o, n) = {
 o
  }
})
  })/
 
 
 
  --
  View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Sort-based-shuffle-not-working-properly-tp21487.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 



Re: Running beyond memory limits in ConnectedComponents

2015-01-15 Thread Nitin kak
Replying to all

Is this Overhead memory allocation used for any specific purpose.

For example, will it be any different if I do *--executor-memory 22G *with
overhead set to 0%(hypothetically) vs
*--executor-memory 20G* and overhead memory to default(9%) which
eventually brings the total memory asked by Spark to approximately 22G.

On Thu, Jan 15, 2015 at 12:54 PM, Nitin kak nitinkak...@gmail.com wrote:

 Is this Overhead memory allocation used for any specific purpose.

 For example, will it be any different if I do *--executor-memory 22G *with
 overhead set to 0%(hypothetically) vs
 *--executor-memory 20G* and overhead memory to default(9%) which
 eventually brings the total memory asked by Spark to approximately 22G.



 On Thu, Jan 15, 2015 at 12:10 PM, Sean Owen so...@cloudera.com wrote:

 This is a YARN setting. It just controls how much any container can
 reserve, including Spark executors. That is not the problem.

 You need Spark to ask for more memory from YARN, on top of the memory
 that is requested by --executor-memory. Your output indicates the default
 of 7% is too little. For example you can ask for 20GB for executors and ask
 for 2GB of overhead. Spark will ask for 22GB from YARN. (Of course, YARN
 needs to be set to allow containers of at least 22GB!)

 On Thu, Jan 15, 2015 at 4:31 PM, Nitin kak nitinkak...@gmail.com wrote:

 Thanks for sticking to this thread.

 I am guessing what memory my app requests and what Yarn requests on my
 part should be same and is determined by the value of
 *--executor-memory* which I had set to *20G*. Or can the two values be
 different?

 I checked in Yarn configurations(below), so I think that fits well into
 the memory overhead limits.


 Container Memory Maximum
 yarn.scheduler.maximum-allocation-mb
  MiBGiB
 Reset to the default value: 64 GiB
 http://10.1.1.49:7180/cmf/services/108/config#
 Override Instances
 http://10.1.1.49:7180/cmf/service/108/roleType/RESOURCEMANAGER/group/yarn-RESOURCEMANAGER-BASE/config/yarn_scheduler_maximum_allocation_mb?wizardMode=falsereturnUrl=%2Fcmf%2Fservices%2F108%2FconfigfilterValue=

 The largest amount of physical memory, in MiB, that can be requested for
 a container.





 On Thu, Jan 15, 2015 at 10:28 AM, Sean Owen so...@cloudera.com wrote:

 Those settings aren't relevant, I think. You're concerned with what
 your app requests, and what Spark requests of YARN on your behalf. (Of
 course, you can't request more than what your cluster allows for a
 YARN container for example, but that doesn't seem to be what is
 happening here.)

 You do not want to omit --executor-memory if you need large executor
 memory heaps, since then you just request the default and that is
 evidently not enough memory for your app.

 Look at http://spark.apache.org/docs/latest/running-on-yarn.html and
 spark.yarn.executor.memoryOverhead  By default it's 7% of your 20G or
 about 1.4G. You might set this higher to 2G to give more overhead.

 See the --config property=value syntax documented in
 http://spark.apache.org/docs/latest/submitting-applications.html

 On Thu, Jan 15, 2015 at 3:47 AM, Nitin kak nitinkak...@gmail.com
 wrote:
  Thanks Sean.
 
  I guess Cloudera Manager has parameters executor_total_max_heapsize
 and
  worker_max_heapsize which point to the parameters you mentioned above.
 
  How much should that cushon between the jvm heap size and yarn memory
 limit
  be?
 
  I tried setting jvm memory to 20g and yarn to 24g, but it gave the
 same
  error as above.
 
  Then, I removed the --executor-memory clause
 
  spark-submit --class ConnectedComponentsTest --master yarn-cluster
  --num-executors 7 --executor-cores 1
  target/scala-2.10/connectedcomponentstest_2.10-1.0.jar
 
  That is not giving GC, Out of memory exception
 
  15/01/14 21:20:33 WARN channel.DefaultChannelPipeline: An exception
 was
  thrown by a user handler while handling an exception event ([id:
 0x362d65d4,
  /10.1.1.33:35463 = /10.1.1.73:43389] EXCEPTION:
 java.lang.OutOfMemoryError:
  GC overhead limit exceeded)
  java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.lang.Object.clone(Native Method)
at akka.util.CompactByteString$.apply(ByteString.scala:410)
at akka.util.ByteString$.apply(ByteString.scala:22)
at
 
 akka.remote.transport.netty.TcpHandlers$class.onMessage(TcpSupport.scala:45)
at
 
 akka.remote.transport.netty.TcpServerHandler.onMessage(TcpSupport.scala:57)
at
 
 akka.remote.transport.netty.NettyServerHelpers$class.messageReceived(NettyHelpers.scala:43)
at
 
 akka.remote.transport.netty.ServerHandler.messageReceived(NettyTransport.scala:179)
at
 org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
at
 
 org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462)
at
 
 org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443

Re: Running beyond memory limits in ConnectedComponents

2015-01-15 Thread Nitin kak
I am sorry for the formatting error, the value for
*yarn.scheduler.maximum-allocation-mb
= 28G*

On Thu, Jan 15, 2015 at 11:31 AM, Nitin kak nitinkak...@gmail.com wrote:

 Thanks for sticking to this thread.

 I am guessing what memory my app requests and what Yarn requests on my
 part should be same and is determined by the value of *--executor-memory*
 which I had set to *20G*. Or can the two values be different?

 I checked in Yarn configurations(below), so I think that fits well into
 the memory overhead limits.


 Container Memory Maximum
 yarn.scheduler.maximum-allocation-mb
  MiBGiB
 Reset to the default value: 64 GiB
 http://10.1.1.49:7180/cmf/services/108/config#
 Override Instances
 http://10.1.1.49:7180/cmf/service/108/roleType/RESOURCEMANAGER/group/yarn-RESOURCEMANAGER-BASE/config/yarn_scheduler_maximum_allocation_mb?wizardMode=falsereturnUrl=%2Fcmf%2Fservices%2F108%2FconfigfilterValue=

 The largest amount of physical memory, in MiB, that can be requested for a
 container.





 On Thu, Jan 15, 2015 at 10:28 AM, Sean Owen so...@cloudera.com wrote:

 Those settings aren't relevant, I think. You're concerned with what
 your app requests, and what Spark requests of YARN on your behalf. (Of
 course, you can't request more than what your cluster allows for a
 YARN container for example, but that doesn't seem to be what is
 happening here.)

 You do not want to omit --executor-memory if you need large executor
 memory heaps, since then you just request the default and that is
 evidently not enough memory for your app.

 Look at http://spark.apache.org/docs/latest/running-on-yarn.html and
 spark.yarn.executor.memoryOverhead  By default it's 7% of your 20G or
 about 1.4G. You might set this higher to 2G to give more overhead.

 See the --config property=value syntax documented in
 http://spark.apache.org/docs/latest/submitting-applications.html

 On Thu, Jan 15, 2015 at 3:47 AM, Nitin kak nitinkak...@gmail.com wrote:
  Thanks Sean.
 
  I guess Cloudera Manager has parameters executor_total_max_heapsize and
  worker_max_heapsize which point to the parameters you mentioned above.
 
  How much should that cushon between the jvm heap size and yarn memory
 limit
  be?
 
  I tried setting jvm memory to 20g and yarn to 24g, but it gave the same
  error as above.
 
  Then, I removed the --executor-memory clause
 
  spark-submit --class ConnectedComponentsTest --master yarn-cluster
  --num-executors 7 --executor-cores 1
  target/scala-2.10/connectedcomponentstest_2.10-1.0.jar
 
  That is not giving GC, Out of memory exception
 
  15/01/14 21:20:33 WARN channel.DefaultChannelPipeline: An exception was
  thrown by a user handler while handling an exception event ([id:
 0x362d65d4,
  /10.1.1.33:35463 = /10.1.1.73:43389] EXCEPTION:
 java.lang.OutOfMemoryError:
  GC overhead limit exceeded)
  java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.lang.Object.clone(Native Method)
at akka.util.CompactByteString$.apply(ByteString.scala:410)
at akka.util.ByteString$.apply(ByteString.scala:22)
at
 
 akka.remote.transport.netty.TcpHandlers$class.onMessage(TcpSupport.scala:45)
at
 
 akka.remote.transport.netty.TcpServerHandler.onMessage(TcpSupport.scala:57)
at
 
 akka.remote.transport.netty.NettyServerHelpers$class.messageReceived(NettyHelpers.scala:43)
at
 
 akka.remote.transport.netty.ServerHandler.messageReceived(NettyTransport.scala:179)
at
 org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
at
 
 org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462)
at
 
 org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443)
at
 
 org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303)
at
 org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
at
 org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
at
 org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)
at
 
 org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:109)
at
 
 org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:312)
at
 
 org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:90)
at
 org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
at
 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
  15/01/14 21:20:33 ERROR util.Utils: Uncaught exception in thread
  SparkListenerBus
  java.lang.OutOfMemoryError: GC overhead limit exceeded
at
 scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:168)
at
 scala.collection.mutable.ListBuffer.$plus

Re: Running beyond memory limits in ConnectedComponents

2015-01-15 Thread Nitin kak
Thanks for sticking to this thread.

I am guessing what memory my app requests and what Yarn requests on my part
should be same and is determined by the value of *--executor-memory* which
I had set to *20G*. Or can the two values be different?

I checked in Yarn configurations(below), so I think that fits well into the
memory overhead limits.


Container Memory Maximum
yarn.scheduler.maximum-allocation-mb
 MiBGiB
Reset to the default value: 64 GiB
http://10.1.1.49:7180/cmf/services/108/config#
Override Instances
http://10.1.1.49:7180/cmf/service/108/roleType/RESOURCEMANAGER/group/yarn-RESOURCEMANAGER-BASE/config/yarn_scheduler_maximum_allocation_mb?wizardMode=falsereturnUrl=%2Fcmf%2Fservices%2F108%2FconfigfilterValue=

The largest amount of physical memory, in MiB, that can be requested for a
container.





On Thu, Jan 15, 2015 at 10:28 AM, Sean Owen so...@cloudera.com wrote:

 Those settings aren't relevant, I think. You're concerned with what
 your app requests, and what Spark requests of YARN on your behalf. (Of
 course, you can't request more than what your cluster allows for a
 YARN container for example, but that doesn't seem to be what is
 happening here.)

 You do not want to omit --executor-memory if you need large executor
 memory heaps, since then you just request the default and that is
 evidently not enough memory for your app.

 Look at http://spark.apache.org/docs/latest/running-on-yarn.html and
 spark.yarn.executor.memoryOverhead  By default it's 7% of your 20G or
 about 1.4G. You might set this higher to 2G to give more overhead.

 See the --config property=value syntax documented in
 http://spark.apache.org/docs/latest/submitting-applications.html

 On Thu, Jan 15, 2015 at 3:47 AM, Nitin kak nitinkak...@gmail.com wrote:
  Thanks Sean.
 
  I guess Cloudera Manager has parameters executor_total_max_heapsize and
  worker_max_heapsize which point to the parameters you mentioned above.
 
  How much should that cushon between the jvm heap size and yarn memory
 limit
  be?
 
  I tried setting jvm memory to 20g and yarn to 24g, but it gave the same
  error as above.
 
  Then, I removed the --executor-memory clause
 
  spark-submit --class ConnectedComponentsTest --master yarn-cluster
  --num-executors 7 --executor-cores 1
  target/scala-2.10/connectedcomponentstest_2.10-1.0.jar
 
  That is not giving GC, Out of memory exception
 
  15/01/14 21:20:33 WARN channel.DefaultChannelPipeline: An exception was
  thrown by a user handler while handling an exception event ([id:
 0x362d65d4,
  /10.1.1.33:35463 = /10.1.1.73:43389] EXCEPTION:
 java.lang.OutOfMemoryError:
  GC overhead limit exceeded)
  java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.lang.Object.clone(Native Method)
at akka.util.CompactByteString$.apply(ByteString.scala:410)
at akka.util.ByteString$.apply(ByteString.scala:22)
at
 
 akka.remote.transport.netty.TcpHandlers$class.onMessage(TcpSupport.scala:45)
at
 
 akka.remote.transport.netty.TcpServerHandler.onMessage(TcpSupport.scala:57)
at
 
 akka.remote.transport.netty.NettyServerHelpers$class.messageReceived(NettyHelpers.scala:43)
at
 
 akka.remote.transport.netty.ServerHandler.messageReceived(NettyTransport.scala:179)
at
 org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
at
 
 org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462)
at
 
 org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443)
at
 
 org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303)
at
 org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
at
 org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
at
 org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)
at
 
 org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:109)
at
 
 org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:312)
at
 
 org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:90)
at
 org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
at
 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
  15/01/14 21:20:33 ERROR util.Utils: Uncaught exception in thread
  SparkListenerBus
  java.lang.OutOfMemoryError: GC overhead limit exceeded
at
 scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:168)
at
 scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:45)
at
 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
 
 scala.collection.TraversableLike$$anonfun$map$1.apply

Re: Running beyond memory limits in ConnectedComponents

2015-01-14 Thread Nitin kak
Thanks Sean.

I guess Cloudera Manager has parameters executor_total_max_heapsize
and worker_max_heapsize
which point to the parameters you mentioned above.

How much should that cushon between the jvm heap size and yarn memory limit
be?

I tried setting jvm memory to 20g and yarn to 24g, but it gave the same
error as above.

Then, I removed the --executor-memory clause

*spark-submit --class ConnectedComponentsTest --master yarn-cluster
 --num-executors 7 --executor-cores 1
target/scala-2.10/connectedcomponentstest_2.10-1.0.jar*

That is not giving GC, Out of memory exception

15/01/14 21:20:33 WARN channel.DefaultChannelPipeline: An exception
was thrown by a user handler while handling an exception event ([id:
0x362d65d4, /10.1.1.33:35463 = /10.1.1.73:43389] EXCEPTION:
java.lang.OutOfMemoryError: GC overhead limit exceeded)
java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.lang.Object.clone(Native Method)
at akka.util.CompactByteString$.apply(ByteString.scala:410)
at akka.util.ByteString$.apply(ByteString.scala:22)
at 
akka.remote.transport.netty.TcpHandlers$class.onMessage(TcpSupport.scala:45)
at 
akka.remote.transport.netty.TcpServerHandler.onMessage(TcpSupport.scala:57)
at 
akka.remote.transport.netty.NettyServerHelpers$class.messageReceived(NettyHelpers.scala:43)
at 
akka.remote.transport.netty.ServerHandler.messageReceived(NettyTransport.scala:179)
at 
org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
at 
org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462)
at 
org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443)
at 
org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303)
at 
org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
at 
org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)
at 
org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:109)
at 
org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:312)
at 
org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:90)
at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
15/01/14 21:20:33 ERROR util.Utils: Uncaught exception in thread
SparkListenerBus
java.lang.OutOfMemoryError: GC overhead limit exceeded
at scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:168)
at scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:45)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.json4s.JsonDSL$class.seq2jvalue(JsonDSL.scala:68)
at org.json4s.JsonDSL$.seq2jvalue(JsonDSL.scala:61)
at 
org.apache.spark.util.JsonProtocol$$anonfun$jobStartToJson$3.apply(JsonProtocol.scala:127)
at 
org.apache.spark.util.JsonProtocol$$anonfun$jobStartToJson$3.apply(JsonProtocol.scala:127)
at org.json4s.JsonDSL$class.pair2jvalue(JsonDSL.scala:79)
at org.json4s.JsonDSL$.pair2jvalue(JsonDSL.scala:61)
at 
org.apache.spark.util.JsonProtocol$.jobStartToJson(JsonProtocol.scala:127)
at 
org.apache.spark.util.JsonProtocol$.sparkEventToJson(JsonProtocol.scala:59)
at 
org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:92)
at 
org.apache.spark.scheduler.EventLoggingListener.onJobStart(EventLoggingListener.scala:118)
at 
org.apache.spark.scheduler.SparkListenerBus$$anonfun$postToAll$3.apply(SparkListenerBus.scala:50)
at 
org.apache.spark.scheduler.SparkListenerBus$$anonfun$postToAll$3.apply(SparkListenerBus.scala:50)
at 
org.apache.spark.scheduler.SparkListenerBus$$anonfun$foreachListener$1.apply(SparkListenerBus.scala:83)
at 
org.apache.spark.scheduler.SparkListenerBus$$anonfun$foreachListener$1.apply(SparkListenerBus.scala:81)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at 
org.apache.spark.scheduler.SparkListenerBus$class.foreachListener(SparkListenerBus.scala:81)
at 

Re: Spark 1.2 Release Date

2014-12-18 Thread nitin
Soon enough :)

http://apache-spark-developers-list.1001551.n3.nabble.com/RESULT-VOTE-Release-Apache-Spark-1-2-0-RC2-td9815.html



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-2-Release-Date-tp20765p20766.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark-sql with join terribly slow.

2014-12-17 Thread nitin
This might be because Spark SQL first does a shuffle on both the tables
involved in join on the Join condition as key.

I had a specific use case of join where I always Join on specific column
id and have an optimisation lined up for that in which i can cache the
data partitioned on JOIN key id and could prevent the shuffle by passing
the partition information to in-memory caching.

See - https://issues.apache.org/jira/browse/SPARK-4849

Thanks
-Nitin



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-sql-with-join-terribly-slow-tp20751p20756.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: SchemaRDD partition on specific column values?

2014-12-15 Thread Nitin Goyal
Hi Michael,

I have opened following JIRA for the same :-

https://issues.apache.org/jira/browse/SPARK-4849

I am having a look at the code to see what can be done and then we can have
a discussion over the approach.

Let me know if you have any comments/suggestions.

Thanks
-Nitin

On Sun, Dec 14, 2014 at 2:53 PM, Michael Armbrust mich...@databricks.com
wrote:

 I'm happy to discuss what it would take to make sure we can propagate this
 information correctly.  Please open a JIRA (and mention me in it).

 Regarding including it in 1.2.1, it depends on how invasive the change
 ends up being, but it is certainly possible.

 On Thu, Dec 11, 2014 at 3:55 AM, nitin nitin2go...@gmail.com wrote:

 Can we take this as a performance improvement task in Spark-1.2.1? I can
 help
 contribute for this.




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/SchemaRDD-partition-on-specific-column-values-tp20350p20623.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



-- 
Regards
Nitin Goyal


Re: SchemaRDD partition on specific column values?

2014-12-11 Thread nitin
Can we take this as a performance improvement task in Spark-1.2.1? I can help
contribute for this.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SchemaRDD-partition-on-specific-column-values-tp20350p20623.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: registerTempTable: Table not found

2014-12-09 Thread nitin
Looks like this issue has been fixed very recently and should be available in
next RC :-

http://apache-spark-developers-list.1001551.n3.nabble.com/CREATE-TABLE-AS-SELECT-does-not-work-with-temp-tables-in-1-2-0-td9662.html



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/registerTempTable-Table-not-found-tp20592p20593.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: PhysicalRDD problem?

2014-12-09 Thread Nitin Goyal
Hi Michael,

I think I have found the exact problem in my case. I see that we have
written something like following in Analyzer.scala :-

  // TODO: pass this in as a parameter.

  val fixedPoint = FixedPoint(100)


and


Batch(Resolution, fixedPoint,

  ResolveReferences ::

  ResolveRelations ::

  ResolveSortReferences ::

  NewRelationInstances ::

  ImplicitGenerate ::

  StarExpansion ::

  ResolveFunctions ::

  GlobalAggregates ::

  UnresolvedHavingClauseAttributes ::

  TrimGroupingAliases ::

  typeCoercionRules ++

  extendedRules : _*),

Perhaps in my case, it reaches the 100 iterations and break out of while
loop in RuleExecutor.scala and thus, doesn't resolve all the attributes.

Exception in my logs :-

14/12/10 04:45:28 INFO HiveContext$$anon$4: Max iterations (100) reached
for batch Resolution

14/12/10 04:45:28 ERROR [Sql]: Servlet.service() for servlet [Sql] in
context with path [] threw exception [Servlet execution threw an exception]
with root cause

org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved
attributes: 'T1.SP AS SP#6566,'T1.DOWN_BYTESHTTPSUBCR AS
DOWN_BYTESHTTPSUBCR#6567, tree:

'Project ['T1.SP AS SP#6566,'T1.DOWN_BYTESHTTPSUBCR AS
DOWN_BYTESHTTPSUBCR#6567]

...

...

...

at
org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$$anonfun$1.applyOrElse(Analyzer.scala:80)

at
org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$$anonfun$1.applyOrElse(Analyzer.scala:78)

at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:144)

at
org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:135)

at
org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$.apply(Analyzer.scala:78)

at
org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$.apply(Analyzer.scala:76)

at
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:61)

at
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:59)

at
scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)

at
scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60)

at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:34)

at
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:59)

at
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:51)

at scala.collection.immutable.List.foreach(List.scala:318)

at
org.apache.spark.sql.catalyst.rules.RuleExecutor.apply(RuleExecutor.scala:51)

at
org.apache.spark.sql.SQLContext$QueryExecution.analyzed$lzycompute(SQLContext.scala:411)

at
org.apache.spark.sql.SQLContext$QueryExecution.analyzed(SQLContext.scala:411)

at
org.apache.spark.sql.CacheManager$$anonfun$cacheQuery$1.apply(CacheManager.scala:86)

at org.apache.spark.sql.CacheManager$class.writeLock(CacheManager.scala:67)

at org.apache.spark.sql.CacheManager$class.cacheQuery(CacheManager.scala:85)

at org.apache.spark.sql.SQLContext.cacheQuery(SQLContext.scala:50)

 at org.apache.spark.sql.SchemaRDD.cache(SchemaRDD.scala:490)


I think the solution here is to have the FixedPoint constructor argument as
configurable/parameterized (also written as TODO). Do we have a plan to do
this in 1.2 release? Or I can take this up as a task for myself if you want
(since this is very crucial for our release).


Thanks

-Nitin

On Wed, Dec 10, 2014 at 1:06 AM, Michael Armbrust mich...@databricks.com
wrote:

 val newSchemaRDD = sqlContext.applySchema(existingSchemaRDD,
 existingSchemaRDD.schema)


 This line is throwing away the logical information about existingSchemaRDD
 and thus Spark SQL can't know how to push down projections or predicates
 past this operator.

 Can you describe more the problems that you see if you don't do this
 reapplication of the schema.




-- 
Regards
Nitin Goyal


Re: PhysicalRDD problem?

2014-12-09 Thread Nitin Goyal
I see that somebody had already raised a PR for this but it hasn't been
merged.

https://issues.apache.org/jira/browse/SPARK-4339

Can we merge this in next 1.2 RC?

Thanks
-Nitin


On Wed, Dec 10, 2014 at 11:50 AM, Nitin Goyal nitin2go...@gmail.com wrote:

 Hi Michael,

 I think I have found the exact problem in my case. I see that we have
 written something like following in Analyzer.scala :-

   // TODO: pass this in as a parameter.

   val fixedPoint = FixedPoint(100)


 and


 Batch(Resolution, fixedPoint,

   ResolveReferences ::

   ResolveRelations ::

   ResolveSortReferences ::

   NewRelationInstances ::

   ImplicitGenerate ::

   StarExpansion ::

   ResolveFunctions ::

   GlobalAggregates ::

   UnresolvedHavingClauseAttributes ::

   TrimGroupingAliases ::

   typeCoercionRules ++

   extendedRules : _*),

 Perhaps in my case, it reaches the 100 iterations and break out of while
 loop in RuleExecutor.scala and thus, doesn't resolve all the attributes.

 Exception in my logs :-

 14/12/10 04:45:28 INFO HiveContext$$anon$4: Max iterations (100) reached
 for batch Resolution

 14/12/10 04:45:28 ERROR [Sql]: Servlet.service() for servlet [Sql] in
 context with path [] threw exception [Servlet execution threw an exception]
 with root cause

 org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved
 attributes: 'T1.SP AS SP#6566,'T1.DOWN_BYTESHTTPSUBCR AS
 DOWN_BYTESHTTPSUBCR#6567, tree:

 'Project ['T1.SP AS SP#6566,'T1.DOWN_BYTESHTTPSUBCR AS
 DOWN_BYTESHTTPSUBCR#6567]

 ...

 ...

 ...

 at
 org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$$anonfun$1.applyOrElse(Analyzer.scala:80)

 at
 org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$$anonfun$1.applyOrElse(Analyzer.scala:78)

 at
 org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:144)

 at
 org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:135)

 at
 org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$.apply(Analyzer.scala:78)

 at
 org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$.apply(Analyzer.scala:76)

 at
 org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:61)

 at
 org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:59)

 at
 scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)

 at
 scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60)

 at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:34)

 at
 org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:59)

 at
 org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:51)

 at scala.collection.immutable.List.foreach(List.scala:318)

 at
 org.apache.spark.sql.catalyst.rules.RuleExecutor.apply(RuleExecutor.scala:51)

 at
 org.apache.spark.sql.SQLContext$QueryExecution.analyzed$lzycompute(SQLContext.scala:411)

 at
 org.apache.spark.sql.SQLContext$QueryExecution.analyzed(SQLContext.scala:411)

 at
 org.apache.spark.sql.CacheManager$$anonfun$cacheQuery$1.apply(CacheManager.scala:86)

 at org.apache.spark.sql.CacheManager$class.writeLock(CacheManager.scala:67)

 at
 org.apache.spark.sql.CacheManager$class.cacheQuery(CacheManager.scala:85)

 at org.apache.spark.sql.SQLContext.cacheQuery(SQLContext.scala:50)

  at org.apache.spark.sql.SchemaRDD.cache(SchemaRDD.scala:490)


 I think the solution here is to have the FixedPoint constructor argument
 as configurable/parameterized (also written as TODO). Do we have a plan to
 do this in 1.2 release? Or I can take this up as a task for myself if you
 want (since this is very crucial for our release).


 Thanks

 -Nitin

 On Wed, Dec 10, 2014 at 1:06 AM, Michael Armbrust mich...@databricks.com
 wrote:

 val newSchemaRDD = sqlContext.applySchema(existingSchemaRDD,
 existingSchemaRDD.schema)


 This line is throwing away the logical information about
 existingSchemaRDD and thus Spark SQL can't know how to push down
 projections or predicates past this operator.

 Can you describe more the problems that you see if you don't do this
 reapplication of the schema.




 --
 Regards
 Nitin Goyal




-- 
Regards
Nitin Goyal


PhysicalRDD problem?

2014-12-08 Thread nitin
Hi All,

I am facing following problem on Spark-1.2 rc1 where I get Treenode
exception (unresolved attributes) :-

https://issues.apache.org/jira/browse/SPARK-2063

To avoid this, I do something following :-

val newSchemaRDD = sqlContext.applySchema(existingSchemaRDD,
existingSchemaRDD.schema)

It seems to work with above code but I see that PROJECTIONS and PREDICATES
aren't pushed down in my
InMemoryTabularScan(spark.sql.inMemoryColumnarStorage.partitionPruning =
true) and get a performance hit. When I see the logical plan while
debugging, I see something like :-

...
Filter(col1#38=2,col2#39=3)
  PhysicalRDD[...]
InMemoryTabularScan[col1,col2,col3,col4,col5(InMEmoryRelation...)]

while I expect it to be :-

...
PhysicalRDD[...]
 
InMemoryTabularScan[col1#38=2,col2#39=3,col3,col4,col5(InMEmoryRelation...)]


Predicates and Projections do get pushed down when I don't create new RDD by
applying schema again and using the existing schema RDD further(in case of
simple queries) but then for complex queries, I get TreenodeException
(Unresolved Attributes) as I mentioned.

Let me know if you need any more info around my problem.

Thanks in Advance
-Nitin



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/PhysicalRDD-problem-tp20589.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



SchemaRDD partition on specific column values?

2014-12-04 Thread nitin
Hi All,

I want to hash partition (and then cache) a schema RDD in way that
partitions are based on hash of the values of a  column (ID column in my
case). 

e.g. if my table has ID column with values as 1,2,3,4,5,6,7,8,9 and
spark.sql.shuffle.partitions is configured as 3, then there should be 3
partitions and say for ID=1, all the tuples should be present in one
particular partition.

My actual use case is that I always get a query in which I have to join 2
cached tables on ID column, so it first partitions both tables on ID and
then apply JOIN and I want to avoid the partitioning based on ID by
preprocessing it (and then cache it).

Thanks in Advance



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SchemaRDD-partition-on-specific-column-values-tp20350.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: SchemaRDD partition on specific column values?

2014-12-04 Thread nitin
With some quick googling, I learnt that I can we can provide distribute by
coulmn_name in hive ql to distribute data based on a column values. My
question now if I use distribute by id, will there be any performance
improvements? Will I be able to avoid data movement in shuffle(Excahnge
before JOIN step) and improve overall performance?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SchemaRDD-partition-on-specific-column-values-tp20350p20424.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Is Spark 1.1.0 incompatible with Hive?

2014-10-27 Thread Nitin kak
Yes, I added all the Hive jars present in Cloudera distribution of Hadoop.
I added them because I was getting ClassNotFoundException for many required
classes(one example stack trace below). So, someone on the community
suggested to include the hive jars:

*Exception in thread main java.lang.NoClassDefFoundError:
org/apache/hadoop/hive/conf/HiveConf*
*at
org.apache.spark.sql.hive.api.java.JavaHiveContext.init(JavaHiveContext.scala:30)*
*at HiveContextExample.main(HiveContextExample.java:57)*
*at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
  at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:606)*
*at
org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:331)*
*at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)*
*at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)*
*Caused by: java.lang.ClassNotFoundException:
org.apache.hadoop.hive.conf.HiveConf*

On Mon, Oct 27, 2014 at 1:57 PM, Michael Armbrust mich...@databricks.com
wrote:

 No such method error almost always means you are mixing different versions
 of the same library on the classpath.  In this case it looks like you have
 more than one version of guava.  Have you added anything to the classpath?

 On Mon, Oct 27, 2014 at 8:36 AM, nitinkak001 nitinkak...@gmail.com
 wrote:

 I am working on running the following hive query from spark.

 /SELECT * FROM spark_poc.table_name DISTRIBUTE BY GEO_REGION,
 GEO_COUNTRY
 SORT BY IP_ADDRESS, COOKIE_ID/

 Ran into /java.lang.NoSuchMethodError:

 com.google.common.hash.HashFunction.hashInt(I)Lcom/google/common/hash/HashCode;/
 (complete stack trace at the bottom). Found a few mentions of this issue
 in
 the user list. It seems(from the below thread link) that there is a Guava
 version incompatibility between Spark 1.1.0 and Hive which is probably
 fixed
 in 1.2.0.

 /
 http://apache-spark-user-list.1001560.n3.nabble.com/Hive-From-Spark-td10110.html#a12671/

 *So, wanted to confirm, is Spark SQL 1.1.0 incompatible with Hive or is
 there a workaround to this?*



 /Exception in thread Driver java.lang.reflect.InvocationTargetException
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at

 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at

 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at

 org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:162)
 Caused by: java.lang.NoSuchMethodError:

 com.google.common.hash.HashFunction.hashInt(I)Lcom/google/common/hash/HashCode;
 at
 org.apache.spark.util.collection.OpenHashSet.org
 $apache$spark$util$collection$OpenHashSet$$hashcode(OpenHashSet.scala:261)
 at

 org.apache.spark.util.collection.OpenHashSet$mcI$sp.getPos$mcI$sp(OpenHashSet.scala:165)
 at

 org.apache.spark.util.collection.OpenHashSet$mcI$sp.contains$mcI$sp(OpenHashSet.scala:102)
 at

 org.apache.spark.util.SizeEstimator$$anonfun$visitArray$2.apply$mcVI$sp(SizeEstimator.scala:214)
 at
 scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
 at
 org.apache.spark.util.SizeEstimator$.visitArray(SizeEstimator.scala:210)
 at

 org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:169)
 at

 org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:161)
 at
 org.apache.spark.util.SizeEstimator$.estimate(SizeEstimator.scala:155)
 at

 org.apache.spark.util.collection.SizeTracker$class.takeSample(SizeTracker.scala:78)
 at

 org.apache.spark.util.collection.SizeTracker$class.afterUpdate(SizeTracker.scala:70)
 at

 org.apache.spark.util.collection.SizeTrackingVector.$plus$eq(SizeTrackingVector.scala:31)
 at
 org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:236)
 at
 org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:126)
 at
 org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:104)
 at
 org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:750)
 at
 org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:601)
 at
 org.apache.spark.storage.BlockManager.putSingle(BlockManager.scala:872)
 at

 org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:79)
 at

 org.apache.spark.broadcast.TorrentBroadcast.init(TorrentBroadcast.scala:68)
 at

 org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:36)
 at

 

Re: Is Spark 1.1.0 incompatible with Hive?

2014-10-27 Thread Nitin kak
I am now on CDH 5.2 which has the Hive module packaged in it.

On Mon, Oct 27, 2014 at 2:17 PM, Michael Armbrust mich...@databricks.com
wrote:

 Which version of CDH are you using?  I believe that hive is not correctly
 packaged in 5.1, but should work in 5.2.  Another option that people use is
 to deploy the plain Apache version of Spark on CDH Yarn.

 On Mon, Oct 27, 2014 at 11:10 AM, Nitin kak nitinkak...@gmail.com wrote:

 Yes, I added all the Hive jars present in Cloudera distribution of
 Hadoop. I added them because I was getting ClassNotFoundException for many
 required classes(one example stack trace below). So, someone on the
 community suggested to include the hive jars:

 *Exception in thread main java.lang.NoClassDefFoundError:
 org/apache/hadoop/hive/conf/HiveConf*
 *at
 org.apache.spark.sql.hive.api.java.JavaHiveContext.init(JavaHiveContext.scala:30)*
 *at HiveContextExample.main(HiveContextExample.java:57)*
 *at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
   at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   at java.lang.reflect.Method.invoke(Method.java:606)*
 *at
 org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:331)*
 *at
 org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)*
 *at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)*
 *Caused by: java.lang.ClassNotFoundException:
 org.apache.hadoop.hive.conf.HiveConf*

 On Mon, Oct 27, 2014 at 1:57 PM, Michael Armbrust mich...@databricks.com
  wrote:

 No such method error almost always means you are mixing different
 versions of the same library on the classpath.  In this case it looks like
 you have more than one version of guava.  Have you added anything to the
 classpath?

 On Mon, Oct 27, 2014 at 8:36 AM, nitinkak001 nitinkak...@gmail.com
 wrote:

 I am working on running the following hive query from spark.

 /SELECT * FROM spark_poc.table_name DISTRIBUTE BY GEO_REGION,
 GEO_COUNTRY
 SORT BY IP_ADDRESS, COOKIE_ID/

 Ran into /java.lang.NoSuchMethodError:

 com.google.common.hash.HashFunction.hashInt(I)Lcom/google/common/hash/HashCode;/
 (complete stack trace at the bottom). Found a few mentions of this
 issue in
 the user list. It seems(from the below thread link) that there is a
 Guava
 version incompatibility between Spark 1.1.0 and Hive which is probably
 fixed
 in 1.2.0.

 /
 http://apache-spark-user-list.1001560.n3.nabble.com/Hive-From-Spark-td10110.html#a12671/

 *So, wanted to confirm, is Spark SQL 1.1.0 incompatible with Hive or is
 there a workaround to this?*



 /Exception in thread Driver
 java.lang.reflect.InvocationTargetException
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at

 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at

 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at

 org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:162)
 Caused by: java.lang.NoSuchMethodError:

 com.google.common.hash.HashFunction.hashInt(I)Lcom/google/common/hash/HashCode;
 at
 org.apache.spark.util.collection.OpenHashSet.org
 $apache$spark$util$collection$OpenHashSet$$hashcode(OpenHashSet.scala:261)
 at

 org.apache.spark.util.collection.OpenHashSet$mcI$sp.getPos$mcI$sp(OpenHashSet.scala:165)
 at

 org.apache.spark.util.collection.OpenHashSet$mcI$sp.contains$mcI$sp(OpenHashSet.scala:102)
 at

 org.apache.spark.util.SizeEstimator$$anonfun$visitArray$2.apply$mcVI$sp(SizeEstimator.scala:214)
 at
 scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
 at
 org.apache.spark.util.SizeEstimator$.visitArray(SizeEstimator.scala:210)
 at

 org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:169)
 at

 org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:161)
 at
 org.apache.spark.util.SizeEstimator$.estimate(SizeEstimator.scala:155)
 at

 org.apache.spark.util.collection.SizeTracker$class.takeSample(SizeTracker.scala:78)
 at

 org.apache.spark.util.collection.SizeTracker$class.afterUpdate(SizeTracker.scala:70)
 at

 org.apache.spark.util.collection.SizeTrackingVector.$plus$eq(SizeTrackingVector.scala:31)
 at
 org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:236)
 at
 org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:126)
 at
 org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:104)
 at
 org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:750)
 at

 org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:601

Re: Is Spark 1.1.0 incompatible with Hive?

2014-10-27 Thread Nitin kak
Somehow worked by placing all the jars(except guava) in hive lib after
--jars. Had initially tried to place the jars under another temporary
folder and pointing the executor and driver extraClassPath to that
director, but didnt work.

On Mon, Oct 27, 2014 at 2:21 PM, Nitin kak nitinkak...@gmail.com wrote:

 I am now on CDH 5.2 which has the Hive module packaged in it.

 On Mon, Oct 27, 2014 at 2:17 PM, Michael Armbrust mich...@databricks.com
 wrote:

 Which version of CDH are you using?  I believe that hive is not correctly
 packaged in 5.1, but should work in 5.2.  Another option that people use is
 to deploy the plain Apache version of Spark on CDH Yarn.

 On Mon, Oct 27, 2014 at 11:10 AM, Nitin kak nitinkak...@gmail.com
 wrote:

 Yes, I added all the Hive jars present in Cloudera distribution of
 Hadoop. I added them because I was getting ClassNotFoundException for many
 required classes(one example stack trace below). So, someone on the
 community suggested to include the hive jars:

 *Exception in thread main java.lang.NoClassDefFoundError:
 org/apache/hadoop/hive/conf/HiveConf*
 *at
 org.apache.spark.sql.hive.api.java.JavaHiveContext.init(JavaHiveContext.scala:30)*
 *at HiveContextExample.main(HiveContextExample.java:57)*
 *at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
   at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   at java.lang.reflect.Method.invoke(Method.java:606)*
 *at
 org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:331)*
 *at
 org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)*
 *at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)*
 *Caused by: java.lang.ClassNotFoundException:
 org.apache.hadoop.hive.conf.HiveConf*

 On Mon, Oct 27, 2014 at 1:57 PM, Michael Armbrust 
 mich...@databricks.com wrote:

 No such method error almost always means you are mixing different
 versions of the same library on the classpath.  In this case it looks like
 you have more than one version of guava.  Have you added anything to the
 classpath?

 On Mon, Oct 27, 2014 at 8:36 AM, nitinkak001 nitinkak...@gmail.com
 wrote:

 I am working on running the following hive query from spark.

 /SELECT * FROM spark_poc.table_name DISTRIBUTE BY GEO_REGION,
 GEO_COUNTRY
 SORT BY IP_ADDRESS, COOKIE_ID/

 Ran into /java.lang.NoSuchMethodError:

 com.google.common.hash.HashFunction.hashInt(I)Lcom/google/common/hash/HashCode;/
 (complete stack trace at the bottom). Found a few mentions of this
 issue in
 the user list. It seems(from the below thread link) that there is a
 Guava
 version incompatibility between Spark 1.1.0 and Hive which is probably
 fixed
 in 1.2.0.

 /
 http://apache-spark-user-list.1001560.n3.nabble.com/Hive-From-Spark-td10110.html#a12671/

 *So, wanted to confirm, is Spark SQL 1.1.0 incompatible with Hive or is
 there a workaround to this?*



 /Exception in thread Driver
 java.lang.reflect.InvocationTargetException
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at

 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at

 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at

 org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:162)
 Caused by: java.lang.NoSuchMethodError:

 com.google.common.hash.HashFunction.hashInt(I)Lcom/google/common/hash/HashCode;
 at
 org.apache.spark.util.collection.OpenHashSet.org
 $apache$spark$util$collection$OpenHashSet$$hashcode(OpenHashSet.scala:261)
 at

 org.apache.spark.util.collection.OpenHashSet$mcI$sp.getPos$mcI$sp(OpenHashSet.scala:165)
 at

 org.apache.spark.util.collection.OpenHashSet$mcI$sp.contains$mcI$sp(OpenHashSet.scala:102)
 at

 org.apache.spark.util.SizeEstimator$$anonfun$visitArray$2.apply$mcVI$sp(SizeEstimator.scala:214)
 at
 scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
 at

 org.apache.spark.util.SizeEstimator$.visitArray(SizeEstimator.scala:210)
 at

 org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:169)
 at

 org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:161)
 at
 org.apache.spark.util.SizeEstimator$.estimate(SizeEstimator.scala:155)
 at

 org.apache.spark.util.collection.SizeTracker$class.takeSample(SizeTracker.scala:78)
 at

 org.apache.spark.util.collection.SizeTracker$class.afterUpdate(SizeTracker.scala:70)
 at

 org.apache.spark.util.collection.SizeTrackingVector.$plus$eq(SizeTrackingVector.scala:31)
 at

 org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:236