Re: Does spark read the same file twice, if two stages are using the same DataFrame?
I do not think InMemoryFileIndex means it is caching the data. The caches get shown as InMemoryTableScan. InMemoryFileIndex is just for partition discovery and partition pruning. Any read will always show up as a scan from InMemoryFileIndex. It is not cached data. It is a cached file index. Please correct my understanding if I am wrong Even the following code shows a scan from an InMemoryFileIndex ``` df1 = spark.read.csv("./df1.csv", header=True, schema = schema) df1.explain(mode = "extended") ``` output: ``` == Parsed Logical Plan == Relation [index#50,0#51] csv == Analyzed Logical Plan == index: string, 0: string Relation [index#50,0#51] csv == Optimized Logical Plan == Relation [index#50,0#51] csv == Physical Plan == FileScan csv [index#50,0#51] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/nitin/work/df1.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct ``` On Mon, May 8, 2023 at 1:07 AM Mich Talebzadeh wrote: > When I run this job in local mode spark-submit --master local[4] > > with > > spark = SparkSession.builder \ > .appName("tests") \ > .enableHiveSupport() \ > .getOrCreate() > spark.conf.set("spark.sql.adaptive.enabled", "true") > df3.explain(extended=True) > > and no caching > > I see this plan > > == Parsed Logical Plan == > 'Join UsingJoin(Inner, [index]) > :- Relation [index#0,0#1] csv > +- Aggregate [index#11], [index#11, avg(cast(0#12 as double)) AS avg(0)#7] >+- Relation [index#11,0#12] csv > > == Analyzed Logical Plan == > index: string, 0: string, avg(0): double > Project [index#0, 0#1, avg(0)#7] > +- Join Inner, (index#0 = index#11) >:- Relation [index#0,0#1] csv >+- Aggregate [index#11], [index#11, avg(cast(0#12 as double)) AS > avg(0)#7] > +- Relation [index#11,0#12] csv > > == Optimized Logical Plan == > Project [index#0, 0#1, avg(0)#7] > +- Join Inner, (index#0 = index#11) >:- Filter isnotnull(index#0) >: +- Relation [index#0,0#1] csv >+- Aggregate [index#11], [index#11, avg(cast(0#12 as double)) AS > avg(0)#7] > +- Filter isnotnull(index#11) > +- Relation [index#11,0#12] csv > > == Physical Plan == > AdaptiveSparkPlan isFinalPlan=false > +- Project [index#0, 0#1, avg(0)#7] >+- BroadcastHashJoin [index#0], [index#11], Inner, BuildRight, false > :- Filter isnotnull(index#0) > : +- FileScan csv [index#0,0#1] Batched: false, DataFilters: > [isnotnull(index#0)], Format: CSV, Location: InMemoryFileIndex(1 > paths)[hdfs://rhes75:9000/tmp/df1.csv], PartitionFilters: [], > PushedFilters: [IsNotNull(index)], ReadSchema: > struct > +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, > string, true]),false), [plan_id=174] > +- HashAggregate(keys=[index#11], functions=[avg(cast(0#12 as > double))], output=[index#11, avg(0)#7]) > +- Exchange hashpartitioning(index#11, 200), > ENSURE_REQUIREMENTS, [plan_id=171] >+- HashAggregate(keys=[index#11], > functions=[partial_avg(cast(0#12 as double))], output=[index#11, sum#28, > count#29L]) > +- Filter isnotnull(index#11) > +- FileScan csv [index#11,0#12] Batched: false, > DataFilters: [isnotnull(index#11)], Format: CSV, Location: > InMemoryFileIndex(1 paths)[hdfs://rhes75:9000/tmp/df1.csv], > PartitionFilters: [], PushedFilters: [IsNotNull(index)], ReadSchema: > struct > > > so two in memory file scans for the csv file. So it caches the data > already given the small result set. Do you see this? > > HTH > > > Mich Talebzadeh, > Lead Solutions Architect/Engineering Lead > Palantir Technologies Limited > London > United Kingdom > > >view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > https://en.everybodywiki.com/Mich_Talebzadeh > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Sun, 7 May 2023 at 17:48, Nitin Siwach wrote: > >> Thank you for the help Mich :) >> >> I have not started with a pandas DF. I have used pandas to create a dummy >> .csv which I dump on the disk that I intend to use to showcase my pain >> point. Providing pandas code was to ensure an end-to-end runnable example >> is provided and the effort on anyone trying to help me out is minimized >&
Re: Does spark read the same file twice, if two stages are using the same DataFrame?
Thank you for the help Mich :) I have not started with a pandas DF. I have used pandas to create a dummy .csv which I dump on the disk that I intend to use to showcase my pain point. Providing pandas code was to ensure an end-to-end runnable example is provided and the effort on anyone trying to help me out is minimized I don't think Spark validating the file existence qualifies as an action according to Spark parlance. Sure there would be an analysis exception in case the file is not found as per the location provided, however, if you provided a schema and a valid path then no job would show up on the spark UI validating (IMO) that no action has been taken. (1 Action necessarily equals at least one job). If you don't provide the schema then a job is triggered (an action) to infer the schema for subsequent logical planning. Since I am just demonstrating my lack of understanding I have chosen local mode. Otherwise, I do use google buckets to host all the data This being said I think my question is something entirely different. It is that calling one action (df3.count()) is reading the same csv twice. I do not understand that. So far, I always thought that data should be persisted only in case a DAG subset is to be reused by several actions. On Sun, May 7, 2023 at 9:47 PM Mich Talebzadeh wrote: > You have started with panda DF which won't scale outside of the driver > itself. > > Let us put that aside. > df1.to_csv("./df1.csv",index_label = "index") ## write the dataframe to > the underlying file system > > starting with spark > > df1 = spark.read.csv("./df1.csv", header=True, schema = schema) ## read > the dataframe from the underlying file system > > That is your first action because spark needs to validate that file > (exiss) and the schema. What will happen if that file does not exist > > csvlocation="/tmp/df1.csv" > csvlocation2="/tmp/df5.csv" > df1= pd.DataFrame(np.arange(1_000).reshape(-1,1)) > df1.index = np.random.choice(range(10),size=1000) > df1.to_csv(csvlocation,index_label = "index") > Schema = StructType([StructField('index', StringType(), True), > StructField('0', StringType(), True)]) > > df1 = spark.read.csv(*csvlocation2*, header=True, schema = > Schema).cache() ## incorrect location > > df2 = df1.groupby("index").agg(F.mean("0")) > df3 = df1.join(df2,on='index') > df3.show() > #df3.explain() > df3.count() > > > error > > pyspark.errors.exceptions.captured.AnalysisException: [PATH_NOT_FOUND] > Path does not exist: hdfs://rhes75:9000/tmp/df5.csv. > > In a distributed env, that csv file has to be available to all spark > workers. Either you copy that file to all worker nodes or you put in HDFS > or S3 or gs:// locations to be available to all. > > It does not even get to df3.count() > > HTH > > Mich Talebzadeh, > Lead Solutions Architect/Engineering Lead > Palantir Technologies Limited > London > United Kingdom > > >view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > https://en.everybodywiki.com/Mich_Talebzadeh > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Sun, 7 May 2023 at 15:53, Nitin Siwach wrote: > >> Thank you for your response, Sir. >> >> My understanding is that the final ```df3.count()``` is the only action >> in the code I have attached. In fact, I tried running the rest of the code >> (commenting out just the final df3.count()) and, as I expected, no >> computations were triggered >> >> On Sun, 7 May, 2023, 20:16 Mich Talebzadeh, >> wrote: >> >>> >>> ...However, In my case here I am calling just one action. .. >>> >>> ok, which line in your code is called one action? >>> >>> >>> Mich Talebzadeh, >>> Lead Solutions Architect/Engineering Lead >>> Palantir Technologies Limited >>> London >>> United Kingdom >>> >>> >>>view my Linkedin profile >>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>> >>> >>> https://en.everybodywiki.com/Mich_Talebzadeh >>> >>> >>> >>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>> any loss, damage or destruction of data or any other property which may >&g
Re: Does spark read the same file twice, if two stages are using the same DataFrame?
Thank you for your response, Sir. My understanding is that the final ```df3.count()``` is the only action in the code I have attached. In fact, I tried running the rest of the code (commenting out just the final df3.count()) and, as I expected, no computations were triggered On Sun, 7 May, 2023, 20:16 Mich Talebzadeh, wrote: > > ...However, In my case here I am calling just one action. .. > > ok, which line in your code is called one action? > > > Mich Talebzadeh, > Lead Solutions Architect/Engineering Lead > Palantir Technologies Limited > London > United Kingdom > > >view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > https://en.everybodywiki.com/Mich_Talebzadeh > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Sun, 7 May 2023 at 14:13, Nitin Siwach wrote: > >> @Vikas Kumar >> I am sorry but I thought that you had answered the other question that I >> had raised to the same email address yesterday. It was around the SQL tab >> in web UI and the output of .explain showing different plans. >> >> I get how using .cache I can ensure that the data from a particular >> checkpoint is reused and the computations do not happen again. >> >> However, In my case here I am calling just one action. Within the purview >> of one action Spark should not rerun the overlapping parts of the DAG. I do >> not understand why the file scan is happening several times. I can easily >> mitigate the issue by using window functions and creating all the columns >> in one go without having to use several joins later on. That being said >> this particular behavior is what I am trying ot understand. The golden rule >> "The DAG overlaps wont run several times for one action" seems not to be >> apocryphal. If you can shed some light on this matter I would appreciate it >> >> @weiruanl...@gmail.com My datasets are very >> small as you can see in the sample examples that I am creating as the first >> part of the code >> >> Really appreciate you guys helping me out with this :) >> >> On Sun, May 7, 2023 at 12:23 PM Winston Lai >> wrote: >> >>> When your memory is not sufficient to keep the cached data for your jobs >>> in two different stages, it might be read twice because Spark might have to >>> clear the previous cache for other jobs. In those cases, a spill may >>> triggered when Spark write your data from memory to disk. >>> >>> One way to to check is to read Spark UI. When Spark cache the data, you >>> will see a little green dot connected to the blue rectangle in the Spark >>> UI. If you see this green dot twice on your two stages, likely Spark spill >>> the data after your first job and read it again in the second run. You can >>> also confirm it in other metrics from Spark UI. >>> >>> That is my personal understanding based on what I have read and seen on >>> my job runs. If there is any mistake, be free to correct me. >>> >>> Thank You & Best Regards >>> Winston Lai >>> -- >>> *From:* Nitin Siwach >>> *Sent:* Sunday, May 7, 2023 12:22:32 PM >>> *To:* Vikas Kumar >>> *Cc:* User >>> *Subject:* Re: Does spark read the same file twice, if two stages are >>> using the same DataFrame? >>> >>> Thank you tons, Vikas :). That makes so much sense now >>> >>> I'm in learning phase and was just browsing through various concepts of >>> spark with self made small examples. >>> >>> It didn't make sense to me that the two physical plans should be >>> different. But, now I understand what you're saying. >>> >>> Again, thank you for helping me out >>> >>> On Sun, 7 May, 2023, 07:48 Vikas Kumar, wrote: >>> >>> >>> Spark came up with a plan but that may or may not be optimal plan given >>> the system settings. >>> If you do df1.cache() , i am guessing spark will not read df1 twice. >>> >>> Btw, Why do you have adaptive enabled to be false? >>> >>> On Sat, May 6, 2023, 1:46 PM Nitin Siwach wrote: >>> >>> I hope this email finds you well :) >>> >>> The following code re
Re: Does spark read the same file twice, if two stages are using the same DataFrame?
@Vikas Kumar I am sorry but I thought that you had answered the other question that I had raised to the same email address yesterday. It was around the SQL tab in web UI and the output of .explain showing different plans. I get how using .cache I can ensure that the data from a particular checkpoint is reused and the computations do not happen again. However, In my case here I am calling just one action. Within the purview of one action Spark should not rerun the overlapping parts of the DAG. I do not understand why the file scan is happening several times. I can easily mitigate the issue by using window functions and creating all the columns in one go without having to use several joins later on. That being said this particular behavior is what I am trying ot understand. The golden rule "The DAG overlaps wont run several times for one action" seems not to be apocryphal. If you can shed some light on this matter I would appreciate it @weiruanl...@gmail.com My datasets are very small as you can see in the sample examples that I am creating as the first part of the code Really appreciate you guys helping me out with this :) On Sun, May 7, 2023 at 12:23 PM Winston Lai wrote: > When your memory is not sufficient to keep the cached data for your jobs > in two different stages, it might be read twice because Spark might have to > clear the previous cache for other jobs. In those cases, a spill may > triggered when Spark write your data from memory to disk. > > One way to to check is to read Spark UI. When Spark cache the data, you > will see a little green dot connected to the blue rectangle in the Spark > UI. If you see this green dot twice on your two stages, likely Spark spill > the data after your first job and read it again in the second run. You can > also confirm it in other metrics from Spark UI. > > That is my personal understanding based on what I have read and seen on my > job runs. If there is any mistake, be free to correct me. > > Thank You & Best Regards > Winston Lai > -- > *From:* Nitin Siwach > *Sent:* Sunday, May 7, 2023 12:22:32 PM > *To:* Vikas Kumar > *Cc:* User > *Subject:* Re: Does spark read the same file twice, if two stages are > using the same DataFrame? > > Thank you tons, Vikas :). That makes so much sense now > > I'm in learning phase and was just browsing through various concepts of > spark with self made small examples. > > It didn't make sense to me that the two physical plans should be > different. But, now I understand what you're saying. > > Again, thank you for helping me out > > On Sun, 7 May, 2023, 07:48 Vikas Kumar, wrote: > > > Spark came up with a plan but that may or may not be optimal plan given > the system settings. > If you do df1.cache() , i am guessing spark will not read df1 twice. > > Btw, Why do you have adaptive enabled to be false? > > On Sat, May 6, 2023, 1:46 PM Nitin Siwach wrote: > > I hope this email finds you well :) > > The following code reads the same csv twice even though only one action is > called > > End to end runnable example: > ``` > import pandas as pd > import numpy as np > > df1= pd.DataFrame(np.arange(1_000).reshape(-1,1)) > df1.index = np.random.choice(range(10),size=1000) > df1.to_csv("./df1.csv",index_label = "index") > > > > from pyspark.sql import SparkSession > from pyspark.sql import functions as F > from pyspark.sql.types import StructType, StringType, StructField > > spark = > SparkSession.builder.config("spark.sql.autoBroadcastJoinThreshold","-1").\ > config("spark.sql.adaptive.enabled","false").getOrCreate() > > schema = StructType([StructField('index', StringType(), True), > StructField('0', StringType(), True)]) > > df1 = spark.read.csv("./df1.csv", header=True, schema = schema) > > df2 = df1.groupby("index").agg(F.mean("0")) > df3 = df1.join(df2,on='index') > > df3.explain() > df3.count() > ``` > > The sql tab in the web UI shows the following: > > [image: screencapture-localhost-4040-SQL-execution-2023-05-06-19_48_41.png] > > As you can see, the df1 file is read twice. Is this the expected > behaviour? Why is that happening? I have just one action so the same part > of the pipeline should not run multiple times. > > I have read the answer [here][1] > <https://stackoverflow.com/questions/37894099/does-spark-read-the-same-file-twice-if-two-stages-are-using-the-same-rdd>. > The question is almost the same it is just that in that question the RDDs > are used and I am using dataframe in pyspark API. In th
What is DataFilters and while joining why is the filter isnotnull[joinKey] applied twice
Pyspark version:3.1.3 *Question 1: *What is DataFilters in spark physical plan? How is it different from PushedFilters? *Question 2:* When joining two datasets, Why is the filter isnotnull applied twice on the joining key column? In the physical plan, it is once applied as a PushedFilter and then explicitly applied right after it. Why is that so? code: import os import pandas as pd, numpy as np import pyspark spark=pyspark.sql.SparkSession.builder.getOrCreate() save_loc = "gs://monsoon-credittech.appspot.com/spark_datasets/random_tests/ " df1 = spark.createDataFrame(pd.DataFrame({'a':np.random.choice([1,2,None],size = 1000, p = [0.47,0.48,0.05]), 'b': np.random.random(1000)})) df2 = spark.createDataFrame(pd.DataFrame({'a':np.random.choice([1,2,None],size = 1000, p = [0.47,0.48,0.05]), 'b': np.random.random(1000)})) df1.write.parquet(os.path.join(save_loc,"dfl_key_int")) df2.write.parquet(os.path.join(save_loc,"dfr_key_int")) dfl_int = spark.read.parquet(os.path.join(save_loc,"dfl_key_int")) dfr_int = spark.read.parquet(os.path.join(save_loc,"dfr_key_int")) dfl_int.join(dfr_int,on='a',how='inner').explain() output: == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- Project [a#23L, b#24, b#28] +- BroadcastHashJoin [a#23L], [a#27L], Inner, BuildRight, false :- Filter isnotnull(a#23L) : +- FileScan parquet [a#23L,b#24] Batched: true, DataFilters: [isnotnull(a#23L)], Format: Parquet, Location: InMemoryFileIndex[gs://monsoon-credittech.appspot.com/spark_datasets/random_tests/dfl_key_int], PartitionFilters: [], PushedFilters: [IsNotNull(a)], ReadSchema: struct +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [id=#75] +- Filter isnotnull(a#27L) +- FileScan parquet [a#27L,b#28] Batched: true, DataFilters: [isnotnull(a#27L)], Format: Parquet, Location: InMemoryFileIndex[gs://monsoon-credittech.appspot.com/spark_datasets/random_tests/dfr_key_int], PartitionFilters: [], PushedFilters: [IsNotNull(a)], ReadSchema: struct -- Regards, Nitin
bucketBy in pyspark not retaining partition information
I am reading two datasets that I saved to the disk with ```bucketBy``` option on the same key with the same number of partitions. When I read them back and join them, they should not result in a shuffle. But, that isn't the case I am seeing. *The following code demonstrates the alleged behavior:* from pyspark.sql import SparkSession from pyspark.sql import functions as F spark = SparkSession.builder.config("spark.sql.autoBroadcastJoinThreshold", "-1").getOrCreate() import random data1 = [(i,random.randint(1,5),random.randint(1,5)) for t in range(2) for i in range(5)] data2 = [(i,random.randint(1,5),random.randint(1,5)) for t in range(2) for i in range(5)] df1=spark.createDataFrame(data1,schema = 'a int,b int,c int') df2=spark.createDataFrame(data1,schema = 'a int,b int,c int') parquet_path1 = './bucket_test_parquet1' parquet_path2 = './bucket_test_parquet2' df1.write.bucketBy(5,"a").format("parquet").saveAsTable('df',path=parquet_path1,mode='overwrite') df2.write.bucketBy(5,"a").format("parquet").saveAsTable('df',path=parquet_path2,mode='overwrite') read_parquet1 = spark.read.format("parquet").load(parquet_path1,header=True) read_parquet1.createOrReplaceTempView("read_parquet1") read_parquet1.createOrReplaceTempView('read_parquet1') read_parquet1 = spark.sql("SELECT * from read_parquet1") read_parquet2 = spark.read.format("parquet").load(parquet_path2,header=True) read_parquet2.createOrReplaceTempView("read_parquet2") read_parquet2.createOrReplaceTempView('read_parquet2') read_parquet2 = spark.sql("SELECT * from read_parquet2") read_parquet1.join(read_parquet2,on='a').explain() *The output that I am getting is* == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- Project [a#24, b#25, c#26, b#34, c#35] +- SortMergeJoin [a#24], [a#33], Inner :- Sort [a#24 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(a#24, 200), ENSURE_REQUIREMENTS, [id=#61] : +- Filter isnotnull(a#24) :+- FileScan parquet [a#24,b#25,c#26] Batched: true, DataFilters: [isnotnull(a#24)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/home/nitin/pymonsoon/bucket_test_parquet1], PartitionFilters: [], PushedFilters: [IsNotNull(a)], ReadSchema: struct +- Sort [a#33 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(a#33, 200), ENSURE_REQUIREMENTS, [id=#62] +- Filter isnotnull(a#33) +- FileScan parquet [a#33,b#34,c#35] Batched: true, DataFilters: [isnotnull(a#33)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/home/nitin/pymonsoon/bucket_test_parquet2], PartitionFilters: [], PushedFilters: [IsNotNull(a)], ReadSchema: struct *Which clearly has hashpartitioning goiong on. Kindly, help me clarify the utility of ```bucketBy```*
understanding iterator of series to iterator of series pandasUDF
I understand pandasUDF as follows: 1. There are multiple partitions per worker 2. Multiple arrow batches are converted per partition 3. Sent to python process 4. In the case of Series to Series the pandasUDF is applied to each arrow batch one after the other? **(So, is it that (a) - The vectorisation is at the arrow batch level but each batch, in turn, is processed sequentially by the worker. Or, is it that (b) - The arrow batches are combined after all have arrived and then the pandasUDF is applied to the whole?)** I think it is (b). i.e. the arrow batches are combined. I have given my reasoning below Given this understanding and blackbishop's answer I have the following further questions: *How exactly is Iterator versions of pandasUDFs working?* 1. If there is some expensive initialization then why can we not do that in the case of series to series pandasUDF as well. In the case of iterator of series to iterator of series the initialization is done and is shared across all the workers and used for all the arrow batches. Why can not the same process be followed for a series to series pandasUDF? initialize --> Share to workers --> once all the arrow batches are combined on a worker, Apply? 2. I can see that we might want to separate out the execution of i/o and python code on arrow batches so as one batch is being read in the pandasUDF is being run on the previous batch. (Why is this not done in the case of series to series? **This is why I think all the arrow batches are combined before running them through the pandasUDF. Because, otherwise the same i/o parallelization benefits are available for series to series pandasUDF as well** One more question: 1. Since the output is an Iterator of Series, where is the vectorisation then? Is it that the pandasUDF is run on an entire arrow batch and then the result is emitted row by row? Or, is the pandasUDF processing the arrow batches row by row and then emitting the result (This loses vectorisation as I see it)
Re: Read hdfs files in spark streaming
Hi Deepak, Please let us know - how you managed it ? Thanks, NJ On Mon, Jun 10, 2019 at 4:42 PM Deepak Sharma wrote: > Thanks All. > I managed to get this working. > Marking this thread as closed. > > On Mon, Jun 10, 2019 at 4:14 PM Deepak Sharma > wrote: > >> This is the project requirement , where paths are being streamed in kafka >> topic. >> Seems it's not possible using spark structured streaming. >> >> >> On Mon, Jun 10, 2019 at 3:59 PM Shyam P wrote: >> >>> Hi Deepak, >>> Why are you getting paths from kafka topic? any specific reason to do >>> so ? >>> >>> Regards, >>> Shyam >>> >>> On Mon, Jun 10, 2019 at 10:44 AM Deepak Sharma >>> wrote: >>> The context is different here. The file path are coming as messages in kafka topic. Spark streaming (structured) consumes form this topic. Now it have to get the value from the message , thus the path to file. read the json stored at the file location into another df. Thanks Deepak On Sun, Jun 9, 2019 at 11:03 PM vaquar khan wrote: > Hi Deepak, > > You can use textFileStream. > > https://spark.apache.org/docs/2.2.0/streaming-programming-guide.html > > Plz start using stackoverflow to ask question to other ppl so get > benefits of answer > > > Regards, > Vaquar khan > > On Sun, Jun 9, 2019, 8:08 AM Deepak Sharma > wrote: > >> I am using spark streaming application to read from kafka. >> The value coming from kafka message is path to hdfs file. >> I am using spark 2.x , spark.read.stream. >> What is the best way to read this path in spark streaming and then >> read the json stored at the hdfs path , may be using spark.read.json , >> into >> a df inside the spark streaming app. >> Thanks a lot in advance >> >> -- >> Thanks >> Deepak >> > -- Thanks Deepak www.bigdatabig.com www.keosha.net >>> >> >> -- >> Thanks >> Deepak >> www.bigdatabig.com >> www.keosha.net >> > > > -- > Thanks > Deepak > www.bigdatabig.com > www.keosha.net >
Re: MLib : Non Linear Optimization
Yes, we are using primarily these two algorithms. 1. Interior point trust-region line-search algorithm 2. Active-set trust-region line-search algorithm We are performing optimizations with constraints & thresholds etc. We are primarily using Lindo / SAS modules but want to get away from SAS due to the cost, it would be really good to have these algorithms in Spark ML. Let me know if you need any more info, i can share some snippets if required. Thanks, Nitin On Thu, Sep 8, 2016 at 2:08 PM, Robin East <robin.e...@xense.co.uk> wrote: > Do you have any particular algorithms in mind? If you state the most > common algorithms you use then it might stimulate the appropriate comments. > > > > > On 8 Sep 2016, at 05:04, nsareen <nsar...@gmail.com> wrote: > > > > Any answer to this question group ? > > > > > > > > -- > > View this message in context: http://apache-spark-user-list. > 1001560.n3.nabble.com/MLib-Non-Linear-Optimization-tp27645p27676.html > > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > > > - > > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > > >
Re: Populating tables using hive and spark
Hi Furcy, If I execute the command "ANALYZE TABLE TEST_ORC COMPUTE STATISTICS" before checking the count from hive, Hive returns the correct count albeit it does not spawn a map-reduce job for computing the count. I'm running a HDP 2.4 Cluster with Hive 1.2.1.2.4 and Spark 1.6.1 If others can concur we can go ahead and report it as a bug. Regards, Nitin On Mon, Aug 22, 2016 at 4:15 PM, Furcy Pin <furcy@flaminem.com> wrote: > Hi Nitin, > > I confirm that there is something odd here. > > I did the following test : > > create table test_orc (id int, name string, dept string) stored as ORC; > insert into table test_orc values (1, 'abc', 'xyz'); > insert into table test_orc values (2, 'def', 'xyz'); > insert into table test_orc values (3, 'pqr', 'xyz'); > insert into table test_orc values (4, 'ghi', 'xyz'); > > > I ended up with 4 files on hdfs: > > 00_0 > 00_0_copy_1 > 00_0_copy_2 > 00_0_copy_3 > > > Then I renamed 00_0_copy_2 to part-0, and I still got COUNT(*) = 4 > with hive. > So this is not a file name issue. > > I then removed one of the files, and I got this : > > > SELECT COUNT(1) FROM test_orc ; > +--+--+ > | _c0 | > +--+--+ > | 4| > +--+--+ > > > SELECT * FROM test_orc ; > +--+++--+ > | test_orc.id | test_orc.name | test_orc.dept | > +--+++--+ > | 1| abc| xyz| > | 2| def| xyz| > | 4| ghi| xyz| > +--+++--+ > 3 rows selected (0.162 seconds) > > So, my guess is that when Hive inserts data, it must keep somewhere in the > metastore the number of rows in the table. > However, if the files are modified by someone else than Hive itself, > (either manually or with Spark), you end up with an inconsistency. > > So I guess we can call it a bug: > > Hive should detect that the files changed and invalidate its > pre-calculated count. > Optionally, Spark should be nice with Hive and update the the count when > inserting. > > I don't know if this bug has already been reported, and I tested on Hive > 1.1.0, so perhaps it is already solved in later releases. > > Regards, > > Furcy > > > On Mon, Aug 22, 2016 at 9:34 AM, Nitin Kumar <nk94.nitinku...@gmail.com> > wrote: > >> Hi! >> >> I've noticed that hive has problems in registering new data records if >> the same table is written to using both the hive terminal and spark sql. >> The problem is demonstrated through the commands listed below >> >> >> hive> use default; >> hive> create table test_orc (id int, name string, dept string) stored as >> ORC; >> hive> insert into table test_orc values (1, 'abc', 'xyz'); >> hive> insert into table test_orc values (2, 'def', 'xyz'); >> hive> select count(*) from test_orc; >> OK >> 2 >> hive> select distinct(name) from test_orc; >> OK >> abc >> def >> >> *** files in hdfs path in warehouse for the created table *** >> >> >> >> >> >>> data_points = [(3, 'pqr', 'xyz'), (4, 'ghi', 'xyz')] >> >>> column_names = ['identity_id', 'emp_name', 'dept_name'] >> >>> data_df = sqlContext.createDataFrame(data_points, column_names) >> >>> data_df.show() >> >> +---++-+ >> |identity_id|emp_name|dept_name| >> +---++-+ >> | 3| pqr| xyz| >> | 4| ghi| xyz| >> +---++-+ >> >> >>> data_df.registerTempTable('temp_table') >> >>> sqlContext.sql('insert into table default.test_orc select * from >> temp_table') >> >> *** files in hdfs path in warehouse for the created table *** >> >> >> hive> select count(*) from test_orc; (Does not launch map-reduce job) >> OK >> 2 >> hive> select distinct(name) from test_orc; (Launches map-reduce job) >> abc >> def >> ghi >> pqr >> >> hive> create table test_orc_new like test_orc stored as ORC; >> hive> insert into table test_orc_new select * from test_orc; >> hive> select count(*) from test_orc_new; >> OK >> 4 >> == >> >> Even if I restart the hive services I cannot get the proper count output >> from hive. This problem only occurs if the table is written to using both >> hive and spark. If only spark is used to insert records into the table >> multiple times, the count query in the hive terminal works perfectly fine. >> >> This problem occurs for tables stored with different storage formats as >> well (textFile etc.) >> >> Is this because of the different naming conventions used by hive and >> spark to write records to hdfs? Or maybe it is not a recommended practice >> to write tables using different services? >> >> Your thoughts and comments on this matter would be highly appreciated! >> >> Thanks! >> Nitin >> >> >> >
Populating tables using hive and spark
Hi! I've noticed that hive has problems in registering new data records if the same table is written to using both the hive terminal and spark sql. The problem is demonstrated through the commands listed below hive> use default; hive> create table test_orc (id int, name string, dept string) stored as ORC; hive> insert into table test_orc values (1, 'abc', 'xyz'); hive> insert into table test_orc values (2, 'def', 'xyz'); hive> select count(*) from test_orc; OK 2 hive> select distinct(name) from test_orc; OK abc def *** files in hdfs path in warehouse for the created table *** >>> data_points = [(3, 'pqr', 'xyz'), (4, 'ghi', 'xyz')] >>> column_names = ['identity_id', 'emp_name', 'dept_name'] >>> data_df = sqlContext.createDataFrame(data_points, column_names) >>> data_df.show() +---++-+ |identity_id|emp_name|dept_name| +---++-+ | 3| pqr| xyz| | 4| ghi| xyz| +---++-+ >>> data_df.registerTempTable('temp_table') >>> sqlContext.sql('insert into table default.test_orc select * from temp_table') *** files in hdfs path in warehouse for the created table *** hive> select count(*) from test_orc; (Does not launch map-reduce job) OK 2 hive> select distinct(name) from test_orc; (Launches map-reduce job) abc def ghi pqr hive> create table test_orc_new like test_orc stored as ORC; hive> insert into table test_orc_new select * from test_orc; hive> select count(*) from test_orc_new; OK 4 == Even if I restart the hive services I cannot get the proper count output from hive. This problem only occurs if the table is written to using both hive and spark. If only spark is used to insert records into the table multiple times, the count query in the hive terminal works perfectly fine. This problem occurs for tables stored with different storage formats as well (textFile etc.) Is this because of the different naming conventions used by hive and spark to write records to hdfs? Or maybe it is not a recommended practice to write tables using different services? Your thoughts and comments on this matter would be highly appreciated! Thanks! Nitin
Re: Apache Spark : spark.eventLog.dir on Windows Environment
Hi Akhil, I don't have HADOOP_HOME or HADOOP_CONF_DIR and even winutils.exe ? What's the configuration required for this ? From where can I get winutils.exe ? Thanks and Regards, Nitin Kalra On Tue, Jul 21, 2015 at 1:30 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Do you have HADOOP_HOME, HADOOP_CONF_DIR and hadoop's winutils.exe in the environment? Thanks Best Regards On Mon, Jul 20, 2015 at 5:45 PM, nitinkalra2000 nitinkalra2...@gmail.com wrote: Hi All, I am working on Spark 1.4 on windows environment. I have to set eventLog directory so that I can reopen the Spark UI after application has finished. But I am not able to set eventLog.dir, It gives an error on Windows environment. Configuation is : entry key=spark.eventLog.enabled value=true / entry key=spark.eventLog.dir value=file:///c:/sparklogs / Exception I get : java.io.IOException: Cannot run program cygpath: CreateProcess error=2, The system cannot find the file specified at java.lang.ProcessBuilder.start(Unknown Source) at org.apache.hadoop.util.Shell.runCommand(Shell.java:206) I have also tried installing Cygwin but still the error doesn't go. Can anybody give any advice on it? I have posted the same question on Stackoverflow as well : http://stackoverflow.com/questions/31468716/apache-spark-spark-eventlog-dir-on-windows-environment Thanks Nitin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-spark-eventLog-dir-on-Windows-Environment-tp23913.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Does HiveContext connect to HiveServer2?
Hi Marcelo, The issue does not happen while connecting to the hive metstore, that works fine. It seems that HiveContext only uses Hive CLI to execute the queries while HiveServer2 does not support it. I dont think you can specify any configuration in hive-site.xml which can make it connect to HiveServer2. It becomes a blocking issue in case of Sentry where HiveServer2 does the translation of authenticated users to hive user (which is the only user that can access directories under hive/warehouse when Sentry is ON). The HiveContext is able to access the metastore and then tries to access the files under hive warehouse directory where i fails with permission error: *org.apache.hadoop.security.AccessControlException: Permission denied: user=kakn, access=READ_EXECUTE, inode=/user/hive/warehouse/rt_freewheel_mastering.db/digital_profile_cluster_in:hive:hive:drwxrwx--t* On Tue, Mar 24, 2015 at 1:43 PM, Marcelo Vanzin van...@cloudera.com wrote: spark-submit --files /path/to/hive-site.xml On Tue, Mar 24, 2015 at 10:31 AM, Udit Mehta ume...@groupon.com wrote: Another question related to this, how can we propagate the hive-site.xml to all workers when running in the yarn cluster mode? On Tue, Mar 24, 2015 at 10:09 AM, Marcelo Vanzin van...@cloudera.com wrote: It does neither. If you provide a Hive configuration to Spark, HiveContext will connect to your metastore server, otherwise it will create its own metastore in the working directory (IIRC). On Tue, Mar 24, 2015 at 8:58 AM, nitinkak001 nitinkak...@gmail.com wrote: I am wondering if HiveContext connects to HiveServer2 or does it work though Hive CLI. The reason I am asking is because Cloudera has deprecated Hive CLI. If the connection is through HiverServer2, is there a way to specify user credentials? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Does-HiveContext-connect-to-HiveServer2-tp22200.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Marcelo
Re: Hive query execution from Spark(through HiveContext) failing with Apache Sentry
Any response to this guys? On Fri, Jun 19, 2015 at 2:34 PM, Nitin kak nitinkak...@gmail.com wrote: Any other suggestions guys? On Wed, Jun 17, 2015 at 7:54 PM, Nitin kak nitinkak...@gmail.com wrote: With Sentry, only hive user has the permission for read/write/execute on the subdirectories of warehouse. All the users get translated to hive when interacting with hiveserver2. But i think HiveContext is bypassing hiveserver2. On Wednesday, June 17, 2015, ayan guha guha.a...@gmail.com wrote: Try to grant read execute access through sentry. On 18 Jun 2015 05:47, Nitin kak nitinkak...@gmail.com wrote: I am trying to run a hive query from Spark code using HiveContext object. It was running fine earlier but since the Apache Sentry has been set installed the process is failing with this exception : *org.apache.hadoop.security.AccessControlException: Permission denied: user=kakn, access=READ_EXECUTE, inode=/user/hive/warehouse/rt_freewheel_mastering.db/digital_profile_cluster_in:hive:hive:drwxrwx--t* I have pasted the full stack trace at the end of this post. My username kakn is a registered user with Sentry. I know that Spark takes all the configurations from hive-site.xml to execute the hql, so I added a few Sentry specific properties but seem to have no effect. I have attached the hive-site.xml *property* *namehive.security.authorization.task.factory/name* * valueorg.apache.sentry.binding.hive.SentryHiveAuthorizationTaskFactoryImpl/value* * /property* * property* *namehive.metastore.pre.event.listeners/name* * valueorg.apache.sentry.binding.metastore.MetastoreAuthzBinding/value* *descriptionlist of comma seperated listeners for metastore events./description* * /property* * property* *namehive.warehouse.subdir.inherit.perms/name* *valuetrue/value* * /property* *org.apache.hadoop.security.AccessControlException: Permission denied: user=kakn, access=READ_EXECUTE, inode=/user/hive/warehouse/rt_freewheel_mastering.db/digital_profile_cluster_in:hive:hive:drwxrwx--t* * at org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkFsPermission(DefaultAuthorizationProvider.java:257)* * at org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.check(DefaultAuthorizationProvider.java:238)* * at org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkPermission(DefaultAuthorizationProvider.java:151)* * at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:138)* * at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6287)* * at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6269)* * at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPathAccess(FSNamesystem.java:6194)* * at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getListingInt(FSNamesystem.java:4793)* * at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getListing(FSNamesystem.java:4755)* * at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getListing(NameNodeRpcServer.java:800)* * at org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.getListing(AuthorizationProviderProxyClientProtocol.java:310)* * at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getListing(ClientNamenodeProtocolServerSideTranslatorPB.java:606)* * at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)* * at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:587)* * at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1026)* * at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2013)* * at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2009)* * at java.security.AccessController.doPrivileged(Native Method)* * at javax.security.auth.Subject.doAs(Subject.java:415)* * at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1642)* * at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2007)* * at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)* * at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)* * at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)* * at java.lang.reflect.Constructor.newInstance(Constructor.java:526)* * at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)* * at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)* * at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:1895)* * at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:1876)* * at org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:654
Re: Hive query execution from Spark(through HiveContext) failing with Apache Sentry
Any other suggestions guys? On Wed, Jun 17, 2015 at 7:54 PM, Nitin kak nitinkak...@gmail.com wrote: With Sentry, only hive user has the permission for read/write/execute on the subdirectories of warehouse. All the users get translated to hive when interacting with hiveserver2. But i think HiveContext is bypassing hiveserver2. On Wednesday, June 17, 2015, ayan guha guha.a...@gmail.com wrote: Try to grant read execute access through sentry. On 18 Jun 2015 05:47, Nitin kak nitinkak...@gmail.com wrote: I am trying to run a hive query from Spark code using HiveContext object. It was running fine earlier but since the Apache Sentry has been set installed the process is failing with this exception : *org.apache.hadoop.security.AccessControlException: Permission denied: user=kakn, access=READ_EXECUTE, inode=/user/hive/warehouse/rt_freewheel_mastering.db/digital_profile_cluster_in:hive:hive:drwxrwx--t* I have pasted the full stack trace at the end of this post. My username kakn is a registered user with Sentry. I know that Spark takes all the configurations from hive-site.xml to execute the hql, so I added a few Sentry specific properties but seem to have no effect. I have attached the hive-site.xml *property* *namehive.security.authorization.task.factory/name* * valueorg.apache.sentry.binding.hive.SentryHiveAuthorizationTaskFactoryImpl/value* * /property* * property* *namehive.metastore.pre.event.listeners/name* * valueorg.apache.sentry.binding.metastore.MetastoreAuthzBinding/value* *descriptionlist of comma seperated listeners for metastore events./description* * /property* * property* *namehive.warehouse.subdir.inherit.perms/name* *valuetrue/value* * /property* *org.apache.hadoop.security.AccessControlException: Permission denied: user=kakn, access=READ_EXECUTE, inode=/user/hive/warehouse/rt_freewheel_mastering.db/digital_profile_cluster_in:hive:hive:drwxrwx--t* * at org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkFsPermission(DefaultAuthorizationProvider.java:257)* * at org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.check(DefaultAuthorizationProvider.java:238)* * at org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkPermission(DefaultAuthorizationProvider.java:151)* * at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:138)* * at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6287)* * at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6269)* * at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPathAccess(FSNamesystem.java:6194)* * at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getListingInt(FSNamesystem.java:4793)* * at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getListing(FSNamesystem.java:4755)* * at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getListing(NameNodeRpcServer.java:800)* * at org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.getListing(AuthorizationProviderProxyClientProtocol.java:310)* * at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getListing(ClientNamenodeProtocolServerSideTranslatorPB.java:606)* * at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)* * at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:587)* * at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1026)* * at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2013)* * at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2009)* * at java.security.AccessController.doPrivileged(Native Method)* * at javax.security.auth.Subject.doAs(Subject.java:415)* * at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1642)* * at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2007)* * at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)* * at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)* * at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)* * at java.lang.reflect.Constructor.newInstance(Constructor.java:526)* * at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)* * at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)* * at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:1895)* * at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:1876)* * at org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:654)* * at org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:104
Hive query execution from Spark(through HiveContext) failing with Apache Sentry
I am trying to run a hive query from Spark code using HiveContext object. It was running fine earlier but since the Apache Sentry has been set installed the process is failing with this exception : *org.apache.hadoop.security.AccessControlException: Permission denied: user=kakn, access=READ_EXECUTE, inode=/user/hive/warehouse/rt_freewheel_mastering.db/digital_profile_cluster_in:hive:hive:drwxrwx--t* I have pasted the full stack trace at the end of this post. My username kakn is a registered user with Sentry. I know that Spark takes all the configurations from hive-site.xml to execute the hql, so I added a few Sentry specific properties but seem to have no effect. I have attached the hive-site.xml *property* *namehive.security.authorization.task.factory/name* * valueorg.apache.sentry.binding.hive.SentryHiveAuthorizationTaskFactoryImpl/value* * /property* * property* *namehive.metastore.pre.event.listeners/name* * valueorg.apache.sentry.binding.metastore.MetastoreAuthzBinding/value* *descriptionlist of comma seperated listeners for metastore events./description* * /property* * property* *namehive.warehouse.subdir.inherit.perms/name* *valuetrue/value* * /property* *org.apache.hadoop.security.AccessControlException: Permission denied: user=kakn, access=READ_EXECUTE, inode=/user/hive/warehouse/rt_freewheel_mastering.db/digital_profile_cluster_in:hive:hive:drwxrwx--t* * at org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkFsPermission(DefaultAuthorizationProvider.java:257)* * at org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.check(DefaultAuthorizationProvider.java:238)* * at org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkPermission(DefaultAuthorizationProvider.java:151)* * at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:138)* * at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6287)* * at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6269)* * at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPathAccess(FSNamesystem.java:6194)* * at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getListingInt(FSNamesystem.java:4793)* * at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getListing(FSNamesystem.java:4755)* * at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getListing(NameNodeRpcServer.java:800)* * at org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.getListing(AuthorizationProviderProxyClientProtocol.java:310)* * at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getListing(ClientNamenodeProtocolServerSideTranslatorPB.java:606)* * at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)* * at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:587)* * at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1026)* * at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2013)* * at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2009)* * at java.security.AccessController.doPrivileged(Native Method)* * at javax.security.auth.Subject.doAs(Subject.java:415)* * at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1642)* * at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2007)* * at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)* * at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)* * at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)* * at java.lang.reflect.Constructor.newInstance(Constructor.java:526)* * at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)* * at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)* * at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:1895)* * at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:1876)* * at org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:654)* * at org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:104)* * at org.apache.hadoop.hdfs.DistributedFileSystem$14.doCall(DistributedFileSystem.java:716)* * at org.apache.hadoop.hdfs.DistributedFileSystem$14.doCall(DistributedFileSystem.java:712)* * at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)* * at org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:712)* * at org.apache.spark.sql.parquet.ParquetTypesConverter$.readMetaData(ParquetTypes.scala:440)* * at org.apache.spark.sql.parquet.ParquetTypesConverter$.readSchemaFromFile(ParquetTypes.scala:477)* * at
Re: Hive query execution from Spark(through HiveContext) failing with Apache Sentry
With Sentry, only hive user has the permission for read/write/execute on the subdirectories of warehouse. All the users get translated to hive when interacting with hiveserver2. But i think HiveContext is bypassing hiveserver2. On Wednesday, June 17, 2015, ayan guha guha.a...@gmail.com wrote: Try to grant read execute access through sentry. On 18 Jun 2015 05:47, Nitin kak nitinkak...@gmail.com javascript:_e(%7B%7D,'cvml','nitinkak...@gmail.com'); wrote: I am trying to run a hive query from Spark code using HiveContext object. It was running fine earlier but since the Apache Sentry has been set installed the process is failing with this exception : *org.apache.hadoop.security.AccessControlException: Permission denied: user=kakn, access=READ_EXECUTE, inode=/user/hive/warehouse/rt_freewheel_mastering.db/digital_profile_cluster_in:hive:hive:drwxrwx--t* I have pasted the full stack trace at the end of this post. My username kakn is a registered user with Sentry. I know that Spark takes all the configurations from hive-site.xml to execute the hql, so I added a few Sentry specific properties but seem to have no effect. I have attached the hive-site.xml *property* *namehive.security.authorization.task.factory/name* * valueorg.apache.sentry.binding.hive.SentryHiveAuthorizationTaskFactoryImpl/value* * /property* * property* *namehive.metastore.pre.event.listeners/name* * valueorg.apache.sentry.binding.metastore.MetastoreAuthzBinding/value* *descriptionlist of comma seperated listeners for metastore events./description* * /property* * property* *namehive.warehouse.subdir.inherit.perms/name* *valuetrue/value* * /property* *org.apache.hadoop.security.AccessControlException: Permission denied: user=kakn, access=READ_EXECUTE, inode=/user/hive/warehouse/rt_freewheel_mastering.db/digital_profile_cluster_in:hive:hive:drwxrwx--t* * at org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkFsPermission(DefaultAuthorizationProvider.java:257)* * at org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.check(DefaultAuthorizationProvider.java:238)* * at org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkPermission(DefaultAuthorizationProvider.java:151)* * at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:138)* * at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6287)* * at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6269)* * at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPathAccess(FSNamesystem.java:6194)* * at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getListingInt(FSNamesystem.java:4793)* * at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getListing(FSNamesystem.java:4755)* * at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getListing(NameNodeRpcServer.java:800)* * at org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.getListing(AuthorizationProviderProxyClientProtocol.java:310)* * at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getListing(ClientNamenodeProtocolServerSideTranslatorPB.java:606)* * at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)* * at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:587)* * at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1026)* * at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2013)* * at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2009)* * at java.security.AccessController.doPrivileged(Native Method)* * at javax.security.auth.Subject.doAs(Subject.java:415)* * at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1642)* * at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2007)* * at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)* * at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)* * at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)* * at java.lang.reflect.Constructor.newInstance(Constructor.java:526)* * at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)* * at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)* * at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:1895)* * at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:1876)* * at org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:654)* * at org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:104)* * at org.apache.hadoop.hdfs.DistributedFileSystem$14.doCall
Re: Hive query execution from Spark(through HiveContext) failing with Apache Sentry
With Sentry, only hive user has the permission for read/write/execute on the subdirectories of warehouse. All the users get translated to hive when interacting with hiveserver2. But i think HiveContext is bypassing hiveserver2. On Wednesday, June 17, 2015, ayan guha guha.a...@gmail.com wrote: Try to grant read execute access through sentry. On 18 Jun 2015 05:47, Nitin kak nitinkak...@gmail.com javascript:_e(%7B%7D,'cvml','nitinkak...@gmail.com'); wrote: I am trying to run a hive query from Spark code using HiveContext object. It was running fine earlier but since the Apache Sentry has been set installed the process is failing with this exception : *org.apache.hadoop.security.AccessControlException: Permission denied: user=kakn, access=READ_EXECUTE, inode=/user/hive/warehouse/rt_freewheel_mastering.db/digital_profile_cluster_in:hive:hive:drwxrwx--t* I have pasted the full stack trace at the end of this post. My username kakn is a registered user with Sentry. I know that Spark takes all the configurations from hive-site.xml to execute the hql, so I added a few Sentry specific properties but seem to have no effect. I have attached the hive-site.xml *property* *namehive.security.authorization.task.factory/name* * valueorg.apache.sentry.binding.hive.SentryHiveAuthorizationTaskFactoryImpl/value* * /property* * property* *namehive.metastore.pre.event.listeners/name* * valueorg.apache.sentry.binding.metastore.MetastoreAuthzBinding/value* *descriptionlist of comma seperated listeners for metastore events./description* * /property* * property* *namehive.warehouse.subdir.inherit.perms/name* *valuetrue/value* * /property* *org.apache.hadoop.security.AccessControlException: Permission denied: user=kakn, access=READ_EXECUTE, inode=/user/hive/warehouse/rt_freewheel_mastering.db/digital_profile_cluster_in:hive:hive:drwxrwx--t* * at org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkFsPermission(DefaultAuthorizationProvider.java:257)* * at org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.check(DefaultAuthorizationProvider.java:238)* * at org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkPermission(DefaultAuthorizationProvider.java:151)* * at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:138)* * at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6287)* * at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6269)* * at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPathAccess(FSNamesystem.java:6194)* * at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getListingInt(FSNamesystem.java:4793)* * at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getListing(FSNamesystem.java:4755)* * at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getListing(NameNodeRpcServer.java:800)* * at org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.getListing(AuthorizationProviderProxyClientProtocol.java:310)* * at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getListing(ClientNamenodeProtocolServerSideTranslatorPB.java:606)* * at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)* * at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:587)* * at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1026)* * at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2013)* * at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2009)* * at java.security.AccessController.doPrivileged(Native Method)* * at javax.security.auth.Subject.doAs(Subject.java:415)* * at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1642)* * at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2007)* * at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)* * at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)* * at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)* * at java.lang.reflect.Constructor.newInstance(Constructor.java:526)* * at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)* * at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)* * at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:1895)* * at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:1876)* * at org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:654)* * at org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:104)* * at org.apache.hadoop.hdfs.DistributedFileSystem$14.doCall
Re: HiveContext test, Spark Context did not initialize after waiting 10000ms
That is a much better solution than how I resolved it. I got around it by placing comma separated jar paths for all the hive related jars in --jars clause. I will try your solution. Thanks for sharing it. On Tue, May 26, 2015 at 4:14 AM, Mohammad Islam misla...@yahoo.com wrote: I got a similar problem. I'm not sure if your problem is already resolved. For the record, I solved this type of error by calling sc..setMaster( yarn-cluster); If you find the solution, please let us know. Regards, Mohammad On Friday, March 6, 2015 2:47 PM, nitinkak001 nitinkak...@gmail.com wrote: I am trying to run a Hive query from Spark using HiveContext. Here is the code / val conf = new SparkConf().setAppName(HiveSparkIntegrationTest) conf.set(spark.executor.extraClassPath, /opt/cloudera/parcels/CDH-5.2.0-1.cdh5.2.0.p0.36/lib/hive/lib); conf.set(spark.driver.extraClassPath, /opt/cloudera/parcels/CDH-5.2.0-1.cdh5.2.0.p0.36/lib/hive/lib); conf.set(spark.yarn.am.waitTime, 30L) val sc = new SparkContext(conf) val sqlContext = new HiveContext(sc) def inputRDD = sqlContext.sql(describe spark_poc.src_digital_profile_user); inputRDD.collect().foreach { println } println(inputRDD.schema.getClass.getName) / Getting this exception. Any clues? The weird part is if I try to do the same thing but in Java instead of Scala, it runs fine. /Exception in thread Driver java.lang.NullPointerException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:162) 15/03/06 17:39:32 ERROR yarn.ApplicationMaster: SparkContext did not initialize after waiting for 1 ms. Please check earlier log output for errors. Failing the application. Exception in thread main java.lang.NullPointerException at org.apache.spark.deploy.yarn.ApplicationMaster.waitForSparkContextInitialized(ApplicationMaster.scala:218) at org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:110) at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$main$1.apply$mcV$sp(ApplicationMaster.scala:434) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:53) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:52) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1614) at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:52) at org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:433) at org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala) 15/03/06 17:39:32 INFO yarn.ApplicationMaster: AppMaster received a signal./ -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/HiveContext-test-Spark-Context-did-not-initialize-after-waiting-1ms-tp21953.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: how to clean shuffle write each iteration
Shuffle write will be cleaned if it is not referenced by any object directly/indirectly. There is a garbage collector written inside spark which periodically checks for weak references to RDDs/shuffle write/broadcast and deletes them. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/how-to-clean-shuffle-write-each-iteration-tp21886p21889.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SQLContext.applySchema strictness
AFAIK, this is the expected behavior. You have to make sure that the schema matches the row. It won't give any error when you apply the schema as it doesn't validate the nature of data. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SQLContext-applySchema-strictness-tp21650p21653.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark SQL - Point lookup optimisation in SchemaRDD?
I was able to resolve this use case (Thanks Cheng Lian) where I wanted to launch executor on just the specific partition while also getting the batch pruning optimisations of Spark SQL by doing following :- val query = sql(SELECT * FROM cac hedTable WHERE key = 1) val plannedRDD = query.queryExecution.toRdd val prunedRDD = PartitionPruningRDD.create(plannedRDD, _ == 3) prunedRDD.collect() Thanks a lot Cheng for suggesting the approach to do things other way round. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Point-lookup-optimisation-in-SchemaRDD-tp21555p21613.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark SQL - Point lookup optimisation in SchemaRDD?
Hi All, I have a use case where I have cached my schemaRDD and I want to launch executors just on the partition which I know of (prime use-case of PartitionPruningRDD). I tried something like following :- val partitionIdx = 2 val schemaRdd = hiveContext.table(myTable) //myTable is cached in memory val partitionPrunedRDD = new PartitionPrunedRDD(schemaRdd, _ == partitionIdx) val partitionSchemaRDD = hiveContext.applySchema(partitionPrunedRDD, schemaRdd.schema) partitionSchemaRDD.registerTempTable(myTablePartition2) hiveContext.hql(select * from myTablePartition2 where id=10001) If I do this, if I expect my executor to run query in 500ms, it is running in 3000-4000 ms. I think this is happening because I did applySchema and lost the queryExecution plan. But, if I do partitionSchemaRDD.cache as well, then I get the 500ms performance but in this case, same partition/data is getting cached twice. My question is that can we create a PartitionPruningCachedSchemaRDD like class which can prune the partitions of InMemoryColumnarTableScan's RDD[CachedBatch] and launch executor on just the selected partition(s)? Thanks -Nitin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Point-lookup-optimisation-in-SchemaRDD-tp21555.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Driver Host under Yarn
Are you running in yarn-cluster or yarn-client mode? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Driver-Host-under-Yarn-tp21536p21556.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark (yarn-client mode) Hangs in final stages of Collect or Reduce
Have you checked the corresponding executor logs as well? I think information provided by you here is less to actually understand your issue. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-yarn-client-mode-Hangs-in-final-stages-of-Collect-or-Reduce-tp21551p21557.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Where can I find logs set inside RDD processing functions?
The yarn log aggregation is enabled and the logs which I get through yarn logs -applicationId your_application_id are no different than what I get through logs in Yarn Application tracking URL. They still dont have the above logs. On Fri, Feb 6, 2015 at 3:36 PM, Petar Zecevic petar.zece...@gmail.com wrote: You can enable YARN log aggregation (yarn.log-aggregation-enable to true) and execute command yarn logs -applicationId your_application_id after your application finishes. Or you can look at them directly in HDFS in /tmp/logs/user/logs/ applicationid/hostname On 6.2.2015. 19:50, nitinkak001 wrote: I am trying to debug my mapPartitionsFunction. Here is the code. There are two ways I am trying to log using log.info() or println(). I am running in yarn-cluster mode. While I can see the logs from driver code, I am not able to see logs from map, mapPartition functions in the Application Tracking URL. Where can I find the logs? /var outputRDD = partitionedRDD.mapPartitions(p = { val outputList = new ArrayList[scala.Tuple3[Long, Long, Int]] p.map({ case(key, value) = { log.info(Inside map) println(Inside map); for(i - 0 until outputTuples.size()){ val outputRecord = outputTuples.get(i) if(outputRecord != null){ outputList.add(outputRecord.getCurrRecordProfileID(), outputRecord.getWindowRecordProfileID, outputRecord.getScore()) } } } }) outputList.iterator() })/ Here is my log4j.properties /log4j.rootCategory=INFO, console log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.target=System.err log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n # Settings to quiet third party logs that are too verbose log4j.logger.org.eclipse.jetty=WARN log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO/ -- View this message in context: http://apache-spark-user-list. 1001560.n3.nabble.com/Where-can-I-find-logs-set-inside- RDD-processing-functions-tp21537.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Where can I find logs set inside RDD processing functions?
yarn.nodemanager.remote-app-log-dir is set to /tmp/logs On Fri, Feb 6, 2015 at 4:14 PM, Ted Yu yuzhih...@gmail.com wrote: To add to What Petar said, when YARN log aggregation is enabled, consider specifying yarn.nodemanager.remote-app-log-dir which is where aggregated logs are saved. Cheers On Fri, Feb 6, 2015 at 12:36 PM, Petar Zecevic petar.zece...@gmail.com wrote: You can enable YARN log aggregation (yarn.log-aggregation-enable to true) and execute command yarn logs -applicationId your_application_id after your application finishes. Or you can look at them directly in HDFS in /tmp/logs/user/logs/ applicationid/hostname On 6.2.2015. 19:50, nitinkak001 wrote: I am trying to debug my mapPartitionsFunction. Here is the code. There are two ways I am trying to log using log.info() or println(). I am running in yarn-cluster mode. While I can see the logs from driver code, I am not able to see logs from map, mapPartition functions in the Application Tracking URL. Where can I find the logs? /var outputRDD = partitionedRDD.mapPartitions(p = { val outputList = new ArrayList[scala.Tuple3[Long, Long, Int]] p.map({ case(key, value) = { log.info(Inside map) println(Inside map); for(i - 0 until outputTuples.size()){ val outputRecord = outputTuples.get(i) if(outputRecord != null){ outputList.add(outputRecord. getCurrRecordProfileID(), outputRecord.getWindowRecordProfileID, outputRecord.getScore()) } } } }) outputList.iterator() })/ Here is my log4j.properties /log4j.rootCategory=INFO, console log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.target=System.err log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n # Settings to quiet third party logs that are too verbose log4j.logger.org.eclipse.jetty=WARN log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO log4j.logger.org.apache.spark.repl.SparkILoop$ SparkILoopInterpreter=INFO/ -- View this message in context: http://apache-spark-user-list. 1001560.n3.nabble.com/Where-can-I-find-logs-set-inside- RDD-processing-functions-tp21537.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Sort based shuffle not working properly?
This is an exerpt from the Design document of the implementation of Sort based shuffle.. I am thinking I might be wrong in my perception of sort based shuffle. Dont completely understand it though. *Motivation* A sortbased shuffle can be more scalable than Spark’s current hashbased one because it doesn’t require writing a separate file for each reduce task from each mapper. Instead, we write a single sorted file and serve ranges of it to different reducers. In jobs with a lot of reduce tasks (say 10,000+), this saves significant memory for compression and serialization buffers and results in more sequential disk I/O. *Implementation* To perform a sortbased shuffle, each map task will produce one or more output files sorted by a key’s partition ID, then mergesort them to yield a single output file. Because it’s only necessary to group the keys together into partitions, we won’t bother to also sort them within each partition On Tue, Feb 3, 2015 at 5:41 PM, Nitin kak nitinkak...@gmail.com wrote: I thought thats what sort based shuffled did, sort the keys going to the same partition. I have tried (c1, c2) as (Int, Int) tuple as well. I don't think that ordering of c2 type is the problem here. On Tue, Feb 3, 2015 at 5:21 PM, Sean Owen so...@cloudera.com wrote: Hm, I don't think the sort partitioner is going to cause the result to be ordered by c1,c2 if you only partitioned on c1. I mean, it's not even guaranteed that the type of c2 has an ordering, right? On Tue, Feb 3, 2015 at 3:38 PM, nitinkak001 nitinkak...@gmail.com wrote: I am trying to implement secondary sort in spark as we do in map-reduce. Here is my data(tab separated, without c1, c2, c2). c1c2 c3 1 2 4 1 3 6 2 4 7 2 6 8 3 5 5 3 1 8 3 2 0 To do secondary sort, I create paried RDD as /((c1 + ,+ c2), row)/ and then use a custom partitioner to partition only on c1. I have set /spark.shuffle.manager = SORT/ so the keys per partition are sorted. For the key 3 I am expecting to get (3, 1) (3, 2) (3, 5) but still getting the original order 3,5 3,1 3,2 Here is the custom partitioner code: /class StraightPartitioner(p: Int) extends org.apache.spark.Partitioner { def numPartitions = p def getPartition(key: Any) = { key.asInstanceOf[String].split(,)(0).toInt } }/ and driver code, please tell me what I am doing wrong /val conf = new SparkConf().setAppName(MapInheritanceExample) conf.set(spark.shuffle.manager, SORT); val sc = new SparkContext(conf) val pF = sc.textFile(inputFile) val log = LogFactory.getLog(MapFunctionTest) val partitionedRDD = pF.map { x = var arr = x.split(\t); (arr(0)+,+arr(1), null) }.partitionBy(new StraightPartitioner(10)) var outputRDD = partitionedRDD.mapPartitions(p = { p.map({ case(o, n) = { o } }) })/ -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Sort-based-shuffle-not-working-properly-tp21487.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org Sort-basedshuffledesign.pdf Description: Adobe PDF document - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Sort based shuffle not working properly?
I thought thats what sort based shuffled did, sort the keys going to the same partition. I have tried (c1, c2) as (Int, Int) tuple as well. I don't think that ordering of c2 type is the problem here. On Tue, Feb 3, 2015 at 5:21 PM, Sean Owen so...@cloudera.com wrote: Hm, I don't think the sort partitioner is going to cause the result to be ordered by c1,c2 if you only partitioned on c1. I mean, it's not even guaranteed that the type of c2 has an ordering, right? On Tue, Feb 3, 2015 at 3:38 PM, nitinkak001 nitinkak...@gmail.com wrote: I am trying to implement secondary sort in spark as we do in map-reduce. Here is my data(tab separated, without c1, c2, c2). c1c2 c3 1 2 4 1 3 6 2 4 7 2 6 8 3 5 5 3 1 8 3 2 0 To do secondary sort, I create paried RDD as /((c1 + ,+ c2), row)/ and then use a custom partitioner to partition only on c1. I have set /spark.shuffle.manager = SORT/ so the keys per partition are sorted. For the key 3 I am expecting to get (3, 1) (3, 2) (3, 5) but still getting the original order 3,5 3,1 3,2 Here is the custom partitioner code: /class StraightPartitioner(p: Int) extends org.apache.spark.Partitioner { def numPartitions = p def getPartition(key: Any) = { key.asInstanceOf[String].split(,)(0).toInt } }/ and driver code, please tell me what I am doing wrong /val conf = new SparkConf().setAppName(MapInheritanceExample) conf.set(spark.shuffle.manager, SORT); val sc = new SparkContext(conf) val pF = sc.textFile(inputFile) val log = LogFactory.getLog(MapFunctionTest) val partitionedRDD = pF.map { x = var arr = x.split(\t); (arr(0)+,+arr(1), null) }.partitionBy(new StraightPartitioner(10)) var outputRDD = partitionedRDD.mapPartitions(p = { p.map({ case(o, n) = { o } }) })/ -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Sort-based-shuffle-not-working-properly-tp21487.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Running beyond memory limits in ConnectedComponents
Replying to all Is this Overhead memory allocation used for any specific purpose. For example, will it be any different if I do *--executor-memory 22G *with overhead set to 0%(hypothetically) vs *--executor-memory 20G* and overhead memory to default(9%) which eventually brings the total memory asked by Spark to approximately 22G. On Thu, Jan 15, 2015 at 12:54 PM, Nitin kak nitinkak...@gmail.com wrote: Is this Overhead memory allocation used for any specific purpose. For example, will it be any different if I do *--executor-memory 22G *with overhead set to 0%(hypothetically) vs *--executor-memory 20G* and overhead memory to default(9%) which eventually brings the total memory asked by Spark to approximately 22G. On Thu, Jan 15, 2015 at 12:10 PM, Sean Owen so...@cloudera.com wrote: This is a YARN setting. It just controls how much any container can reserve, including Spark executors. That is not the problem. You need Spark to ask for more memory from YARN, on top of the memory that is requested by --executor-memory. Your output indicates the default of 7% is too little. For example you can ask for 20GB for executors and ask for 2GB of overhead. Spark will ask for 22GB from YARN. (Of course, YARN needs to be set to allow containers of at least 22GB!) On Thu, Jan 15, 2015 at 4:31 PM, Nitin kak nitinkak...@gmail.com wrote: Thanks for sticking to this thread. I am guessing what memory my app requests and what Yarn requests on my part should be same and is determined by the value of *--executor-memory* which I had set to *20G*. Or can the two values be different? I checked in Yarn configurations(below), so I think that fits well into the memory overhead limits. Container Memory Maximum yarn.scheduler.maximum-allocation-mb MiBGiB Reset to the default value: 64 GiB http://10.1.1.49:7180/cmf/services/108/config# Override Instances http://10.1.1.49:7180/cmf/service/108/roleType/RESOURCEMANAGER/group/yarn-RESOURCEMANAGER-BASE/config/yarn_scheduler_maximum_allocation_mb?wizardMode=falsereturnUrl=%2Fcmf%2Fservices%2F108%2FconfigfilterValue= The largest amount of physical memory, in MiB, that can be requested for a container. On Thu, Jan 15, 2015 at 10:28 AM, Sean Owen so...@cloudera.com wrote: Those settings aren't relevant, I think. You're concerned with what your app requests, and what Spark requests of YARN on your behalf. (Of course, you can't request more than what your cluster allows for a YARN container for example, but that doesn't seem to be what is happening here.) You do not want to omit --executor-memory if you need large executor memory heaps, since then you just request the default and that is evidently not enough memory for your app. Look at http://spark.apache.org/docs/latest/running-on-yarn.html and spark.yarn.executor.memoryOverhead By default it's 7% of your 20G or about 1.4G. You might set this higher to 2G to give more overhead. See the --config property=value syntax documented in http://spark.apache.org/docs/latest/submitting-applications.html On Thu, Jan 15, 2015 at 3:47 AM, Nitin kak nitinkak...@gmail.com wrote: Thanks Sean. I guess Cloudera Manager has parameters executor_total_max_heapsize and worker_max_heapsize which point to the parameters you mentioned above. How much should that cushon between the jvm heap size and yarn memory limit be? I tried setting jvm memory to 20g and yarn to 24g, but it gave the same error as above. Then, I removed the --executor-memory clause spark-submit --class ConnectedComponentsTest --master yarn-cluster --num-executors 7 --executor-cores 1 target/scala-2.10/connectedcomponentstest_2.10-1.0.jar That is not giving GC, Out of memory exception 15/01/14 21:20:33 WARN channel.DefaultChannelPipeline: An exception was thrown by a user handler while handling an exception event ([id: 0x362d65d4, /10.1.1.33:35463 = /10.1.1.73:43389] EXCEPTION: java.lang.OutOfMemoryError: GC overhead limit exceeded) java.lang.OutOfMemoryError: GC overhead limit exceeded at java.lang.Object.clone(Native Method) at akka.util.CompactByteString$.apply(ByteString.scala:410) at akka.util.ByteString$.apply(ByteString.scala:22) at akka.remote.transport.netty.TcpHandlers$class.onMessage(TcpSupport.scala:45) at akka.remote.transport.netty.TcpServerHandler.onMessage(TcpSupport.scala:57) at akka.remote.transport.netty.NettyServerHelpers$class.messageReceived(NettyHelpers.scala:43) at akka.remote.transport.netty.ServerHandler.messageReceived(NettyTransport.scala:179) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296) at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462) at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443
Re: Running beyond memory limits in ConnectedComponents
I am sorry for the formatting error, the value for *yarn.scheduler.maximum-allocation-mb = 28G* On Thu, Jan 15, 2015 at 11:31 AM, Nitin kak nitinkak...@gmail.com wrote: Thanks for sticking to this thread. I am guessing what memory my app requests and what Yarn requests on my part should be same and is determined by the value of *--executor-memory* which I had set to *20G*. Or can the two values be different? I checked in Yarn configurations(below), so I think that fits well into the memory overhead limits. Container Memory Maximum yarn.scheduler.maximum-allocation-mb MiBGiB Reset to the default value: 64 GiB http://10.1.1.49:7180/cmf/services/108/config# Override Instances http://10.1.1.49:7180/cmf/service/108/roleType/RESOURCEMANAGER/group/yarn-RESOURCEMANAGER-BASE/config/yarn_scheduler_maximum_allocation_mb?wizardMode=falsereturnUrl=%2Fcmf%2Fservices%2F108%2FconfigfilterValue= The largest amount of physical memory, in MiB, that can be requested for a container. On Thu, Jan 15, 2015 at 10:28 AM, Sean Owen so...@cloudera.com wrote: Those settings aren't relevant, I think. You're concerned with what your app requests, and what Spark requests of YARN on your behalf. (Of course, you can't request more than what your cluster allows for a YARN container for example, but that doesn't seem to be what is happening here.) You do not want to omit --executor-memory if you need large executor memory heaps, since then you just request the default and that is evidently not enough memory for your app. Look at http://spark.apache.org/docs/latest/running-on-yarn.html and spark.yarn.executor.memoryOverhead By default it's 7% of your 20G or about 1.4G. You might set this higher to 2G to give more overhead. See the --config property=value syntax documented in http://spark.apache.org/docs/latest/submitting-applications.html On Thu, Jan 15, 2015 at 3:47 AM, Nitin kak nitinkak...@gmail.com wrote: Thanks Sean. I guess Cloudera Manager has parameters executor_total_max_heapsize and worker_max_heapsize which point to the parameters you mentioned above. How much should that cushon between the jvm heap size and yarn memory limit be? I tried setting jvm memory to 20g and yarn to 24g, but it gave the same error as above. Then, I removed the --executor-memory clause spark-submit --class ConnectedComponentsTest --master yarn-cluster --num-executors 7 --executor-cores 1 target/scala-2.10/connectedcomponentstest_2.10-1.0.jar That is not giving GC, Out of memory exception 15/01/14 21:20:33 WARN channel.DefaultChannelPipeline: An exception was thrown by a user handler while handling an exception event ([id: 0x362d65d4, /10.1.1.33:35463 = /10.1.1.73:43389] EXCEPTION: java.lang.OutOfMemoryError: GC overhead limit exceeded) java.lang.OutOfMemoryError: GC overhead limit exceeded at java.lang.Object.clone(Native Method) at akka.util.CompactByteString$.apply(ByteString.scala:410) at akka.util.ByteString$.apply(ByteString.scala:22) at akka.remote.transport.netty.TcpHandlers$class.onMessage(TcpSupport.scala:45) at akka.remote.transport.netty.TcpServerHandler.onMessage(TcpSupport.scala:57) at akka.remote.transport.netty.NettyServerHelpers$class.messageReceived(NettyHelpers.scala:43) at akka.remote.transport.netty.ServerHandler.messageReceived(NettyTransport.scala:179) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296) at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462) at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443) at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255) at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:109) at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:312) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:90) at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 15/01/14 21:20:33 ERROR util.Utils: Uncaught exception in thread SparkListenerBus java.lang.OutOfMemoryError: GC overhead limit exceeded at scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:168) at scala.collection.mutable.ListBuffer.$plus
Re: Running beyond memory limits in ConnectedComponents
Thanks for sticking to this thread. I am guessing what memory my app requests and what Yarn requests on my part should be same and is determined by the value of *--executor-memory* which I had set to *20G*. Or can the two values be different? I checked in Yarn configurations(below), so I think that fits well into the memory overhead limits. Container Memory Maximum yarn.scheduler.maximum-allocation-mb MiBGiB Reset to the default value: 64 GiB http://10.1.1.49:7180/cmf/services/108/config# Override Instances http://10.1.1.49:7180/cmf/service/108/roleType/RESOURCEMANAGER/group/yarn-RESOURCEMANAGER-BASE/config/yarn_scheduler_maximum_allocation_mb?wizardMode=falsereturnUrl=%2Fcmf%2Fservices%2F108%2FconfigfilterValue= The largest amount of physical memory, in MiB, that can be requested for a container. On Thu, Jan 15, 2015 at 10:28 AM, Sean Owen so...@cloudera.com wrote: Those settings aren't relevant, I think. You're concerned with what your app requests, and what Spark requests of YARN on your behalf. (Of course, you can't request more than what your cluster allows for a YARN container for example, but that doesn't seem to be what is happening here.) You do not want to omit --executor-memory if you need large executor memory heaps, since then you just request the default and that is evidently not enough memory for your app. Look at http://spark.apache.org/docs/latest/running-on-yarn.html and spark.yarn.executor.memoryOverhead By default it's 7% of your 20G or about 1.4G. You might set this higher to 2G to give more overhead. See the --config property=value syntax documented in http://spark.apache.org/docs/latest/submitting-applications.html On Thu, Jan 15, 2015 at 3:47 AM, Nitin kak nitinkak...@gmail.com wrote: Thanks Sean. I guess Cloudera Manager has parameters executor_total_max_heapsize and worker_max_heapsize which point to the parameters you mentioned above. How much should that cushon between the jvm heap size and yarn memory limit be? I tried setting jvm memory to 20g and yarn to 24g, but it gave the same error as above. Then, I removed the --executor-memory clause spark-submit --class ConnectedComponentsTest --master yarn-cluster --num-executors 7 --executor-cores 1 target/scala-2.10/connectedcomponentstest_2.10-1.0.jar That is not giving GC, Out of memory exception 15/01/14 21:20:33 WARN channel.DefaultChannelPipeline: An exception was thrown by a user handler while handling an exception event ([id: 0x362d65d4, /10.1.1.33:35463 = /10.1.1.73:43389] EXCEPTION: java.lang.OutOfMemoryError: GC overhead limit exceeded) java.lang.OutOfMemoryError: GC overhead limit exceeded at java.lang.Object.clone(Native Method) at akka.util.CompactByteString$.apply(ByteString.scala:410) at akka.util.ByteString$.apply(ByteString.scala:22) at akka.remote.transport.netty.TcpHandlers$class.onMessage(TcpSupport.scala:45) at akka.remote.transport.netty.TcpServerHandler.onMessage(TcpSupport.scala:57) at akka.remote.transport.netty.NettyServerHelpers$class.messageReceived(NettyHelpers.scala:43) at akka.remote.transport.netty.ServerHandler.messageReceived(NettyTransport.scala:179) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296) at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462) at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443) at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255) at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:109) at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:312) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:90) at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 15/01/14 21:20:33 ERROR util.Utils: Uncaught exception in thread SparkListenerBus java.lang.OutOfMemoryError: GC overhead limit exceeded at scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:168) at scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:45) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply
Re: Running beyond memory limits in ConnectedComponents
Thanks Sean. I guess Cloudera Manager has parameters executor_total_max_heapsize and worker_max_heapsize which point to the parameters you mentioned above. How much should that cushon between the jvm heap size and yarn memory limit be? I tried setting jvm memory to 20g and yarn to 24g, but it gave the same error as above. Then, I removed the --executor-memory clause *spark-submit --class ConnectedComponentsTest --master yarn-cluster --num-executors 7 --executor-cores 1 target/scala-2.10/connectedcomponentstest_2.10-1.0.jar* That is not giving GC, Out of memory exception 15/01/14 21:20:33 WARN channel.DefaultChannelPipeline: An exception was thrown by a user handler while handling an exception event ([id: 0x362d65d4, /10.1.1.33:35463 = /10.1.1.73:43389] EXCEPTION: java.lang.OutOfMemoryError: GC overhead limit exceeded) java.lang.OutOfMemoryError: GC overhead limit exceeded at java.lang.Object.clone(Native Method) at akka.util.CompactByteString$.apply(ByteString.scala:410) at akka.util.ByteString$.apply(ByteString.scala:22) at akka.remote.transport.netty.TcpHandlers$class.onMessage(TcpSupport.scala:45) at akka.remote.transport.netty.TcpServerHandler.onMessage(TcpSupport.scala:57) at akka.remote.transport.netty.NettyServerHelpers$class.messageReceived(NettyHelpers.scala:43) at akka.remote.transport.netty.ServerHandler.messageReceived(NettyTransport.scala:179) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296) at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462) at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443) at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255) at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:109) at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:312) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:90) at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 15/01/14 21:20:33 ERROR util.Utils: Uncaught exception in thread SparkListenerBus java.lang.OutOfMemoryError: GC overhead limit exceeded at scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:168) at scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:45) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.json4s.JsonDSL$class.seq2jvalue(JsonDSL.scala:68) at org.json4s.JsonDSL$.seq2jvalue(JsonDSL.scala:61) at org.apache.spark.util.JsonProtocol$$anonfun$jobStartToJson$3.apply(JsonProtocol.scala:127) at org.apache.spark.util.JsonProtocol$$anonfun$jobStartToJson$3.apply(JsonProtocol.scala:127) at org.json4s.JsonDSL$class.pair2jvalue(JsonDSL.scala:79) at org.json4s.JsonDSL$.pair2jvalue(JsonDSL.scala:61) at org.apache.spark.util.JsonProtocol$.jobStartToJson(JsonProtocol.scala:127) at org.apache.spark.util.JsonProtocol$.sparkEventToJson(JsonProtocol.scala:59) at org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:92) at org.apache.spark.scheduler.EventLoggingListener.onJobStart(EventLoggingListener.scala:118) at org.apache.spark.scheduler.SparkListenerBus$$anonfun$postToAll$3.apply(SparkListenerBus.scala:50) at org.apache.spark.scheduler.SparkListenerBus$$anonfun$postToAll$3.apply(SparkListenerBus.scala:50) at org.apache.spark.scheduler.SparkListenerBus$$anonfun$foreachListener$1.apply(SparkListenerBus.scala:83) at org.apache.spark.scheduler.SparkListenerBus$$anonfun$foreachListener$1.apply(SparkListenerBus.scala:81) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.SparkListenerBus$class.foreachListener(SparkListenerBus.scala:81) at
Re: Spark 1.2 Release Date
Soon enough :) http://apache-spark-developers-list.1001551.n3.nabble.com/RESULT-VOTE-Release-Apache-Spark-1-2-0-RC2-td9815.html -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-2-Release-Date-tp20765p20766.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark-sql with join terribly slow.
This might be because Spark SQL first does a shuffle on both the tables involved in join on the Join condition as key. I had a specific use case of join where I always Join on specific column id and have an optimisation lined up for that in which i can cache the data partitioned on JOIN key id and could prevent the shuffle by passing the partition information to in-memory caching. See - https://issues.apache.org/jira/browse/SPARK-4849 Thanks -Nitin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-sql-with-join-terribly-slow-tp20751p20756.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SchemaRDD partition on specific column values?
Hi Michael, I have opened following JIRA for the same :- https://issues.apache.org/jira/browse/SPARK-4849 I am having a look at the code to see what can be done and then we can have a discussion over the approach. Let me know if you have any comments/suggestions. Thanks -Nitin On Sun, Dec 14, 2014 at 2:53 PM, Michael Armbrust mich...@databricks.com wrote: I'm happy to discuss what it would take to make sure we can propagate this information correctly. Please open a JIRA (and mention me in it). Regarding including it in 1.2.1, it depends on how invasive the change ends up being, but it is certainly possible. On Thu, Dec 11, 2014 at 3:55 AM, nitin nitin2go...@gmail.com wrote: Can we take this as a performance improvement task in Spark-1.2.1? I can help contribute for this. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SchemaRDD-partition-on-specific-column-values-tp20350p20623.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Regards Nitin Goyal
Re: SchemaRDD partition on specific column values?
Can we take this as a performance improvement task in Spark-1.2.1? I can help contribute for this. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SchemaRDD-partition-on-specific-column-values-tp20350p20623.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: registerTempTable: Table not found
Looks like this issue has been fixed very recently and should be available in next RC :- http://apache-spark-developers-list.1001551.n3.nabble.com/CREATE-TABLE-AS-SELECT-does-not-work-with-temp-tables-in-1-2-0-td9662.html -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/registerTempTable-Table-not-found-tp20592p20593.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: PhysicalRDD problem?
Hi Michael, I think I have found the exact problem in my case. I see that we have written something like following in Analyzer.scala :- // TODO: pass this in as a parameter. val fixedPoint = FixedPoint(100) and Batch(Resolution, fixedPoint, ResolveReferences :: ResolveRelations :: ResolveSortReferences :: NewRelationInstances :: ImplicitGenerate :: StarExpansion :: ResolveFunctions :: GlobalAggregates :: UnresolvedHavingClauseAttributes :: TrimGroupingAliases :: typeCoercionRules ++ extendedRules : _*), Perhaps in my case, it reaches the 100 iterations and break out of while loop in RuleExecutor.scala and thus, doesn't resolve all the attributes. Exception in my logs :- 14/12/10 04:45:28 INFO HiveContext$$anon$4: Max iterations (100) reached for batch Resolution 14/12/10 04:45:28 ERROR [Sql]: Servlet.service() for servlet [Sql] in context with path [] threw exception [Servlet execution threw an exception] with root cause org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved attributes: 'T1.SP AS SP#6566,'T1.DOWN_BYTESHTTPSUBCR AS DOWN_BYTESHTTPSUBCR#6567, tree: 'Project ['T1.SP AS SP#6566,'T1.DOWN_BYTESHTTPSUBCR AS DOWN_BYTESHTTPSUBCR#6567] ... ... ... at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$$anonfun$1.applyOrElse(Analyzer.scala:80) at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$$anonfun$1.applyOrElse(Analyzer.scala:78) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:144) at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:135) at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$.apply(Analyzer.scala:78) at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$.apply(Analyzer.scala:76) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:61) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:59) at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51) at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60) at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:34) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:59) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:51) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.sql.catalyst.rules.RuleExecutor.apply(RuleExecutor.scala:51) at org.apache.spark.sql.SQLContext$QueryExecution.analyzed$lzycompute(SQLContext.scala:411) at org.apache.spark.sql.SQLContext$QueryExecution.analyzed(SQLContext.scala:411) at org.apache.spark.sql.CacheManager$$anonfun$cacheQuery$1.apply(CacheManager.scala:86) at org.apache.spark.sql.CacheManager$class.writeLock(CacheManager.scala:67) at org.apache.spark.sql.CacheManager$class.cacheQuery(CacheManager.scala:85) at org.apache.spark.sql.SQLContext.cacheQuery(SQLContext.scala:50) at org.apache.spark.sql.SchemaRDD.cache(SchemaRDD.scala:490) I think the solution here is to have the FixedPoint constructor argument as configurable/parameterized (also written as TODO). Do we have a plan to do this in 1.2 release? Or I can take this up as a task for myself if you want (since this is very crucial for our release). Thanks -Nitin On Wed, Dec 10, 2014 at 1:06 AM, Michael Armbrust mich...@databricks.com wrote: val newSchemaRDD = sqlContext.applySchema(existingSchemaRDD, existingSchemaRDD.schema) This line is throwing away the logical information about existingSchemaRDD and thus Spark SQL can't know how to push down projections or predicates past this operator. Can you describe more the problems that you see if you don't do this reapplication of the schema. -- Regards Nitin Goyal
Re: PhysicalRDD problem?
I see that somebody had already raised a PR for this but it hasn't been merged. https://issues.apache.org/jira/browse/SPARK-4339 Can we merge this in next 1.2 RC? Thanks -Nitin On Wed, Dec 10, 2014 at 11:50 AM, Nitin Goyal nitin2go...@gmail.com wrote: Hi Michael, I think I have found the exact problem in my case. I see that we have written something like following in Analyzer.scala :- // TODO: pass this in as a parameter. val fixedPoint = FixedPoint(100) and Batch(Resolution, fixedPoint, ResolveReferences :: ResolveRelations :: ResolveSortReferences :: NewRelationInstances :: ImplicitGenerate :: StarExpansion :: ResolveFunctions :: GlobalAggregates :: UnresolvedHavingClauseAttributes :: TrimGroupingAliases :: typeCoercionRules ++ extendedRules : _*), Perhaps in my case, it reaches the 100 iterations and break out of while loop in RuleExecutor.scala and thus, doesn't resolve all the attributes. Exception in my logs :- 14/12/10 04:45:28 INFO HiveContext$$anon$4: Max iterations (100) reached for batch Resolution 14/12/10 04:45:28 ERROR [Sql]: Servlet.service() for servlet [Sql] in context with path [] threw exception [Servlet execution threw an exception] with root cause org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved attributes: 'T1.SP AS SP#6566,'T1.DOWN_BYTESHTTPSUBCR AS DOWN_BYTESHTTPSUBCR#6567, tree: 'Project ['T1.SP AS SP#6566,'T1.DOWN_BYTESHTTPSUBCR AS DOWN_BYTESHTTPSUBCR#6567] ... ... ... at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$$anonfun$1.applyOrElse(Analyzer.scala:80) at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$$anonfun$1.applyOrElse(Analyzer.scala:78) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:144) at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:135) at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$.apply(Analyzer.scala:78) at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$.apply(Analyzer.scala:76) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:61) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:59) at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51) at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60) at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:34) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:59) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:51) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.sql.catalyst.rules.RuleExecutor.apply(RuleExecutor.scala:51) at org.apache.spark.sql.SQLContext$QueryExecution.analyzed$lzycompute(SQLContext.scala:411) at org.apache.spark.sql.SQLContext$QueryExecution.analyzed(SQLContext.scala:411) at org.apache.spark.sql.CacheManager$$anonfun$cacheQuery$1.apply(CacheManager.scala:86) at org.apache.spark.sql.CacheManager$class.writeLock(CacheManager.scala:67) at org.apache.spark.sql.CacheManager$class.cacheQuery(CacheManager.scala:85) at org.apache.spark.sql.SQLContext.cacheQuery(SQLContext.scala:50) at org.apache.spark.sql.SchemaRDD.cache(SchemaRDD.scala:490) I think the solution here is to have the FixedPoint constructor argument as configurable/parameterized (also written as TODO). Do we have a plan to do this in 1.2 release? Or I can take this up as a task for myself if you want (since this is very crucial for our release). Thanks -Nitin On Wed, Dec 10, 2014 at 1:06 AM, Michael Armbrust mich...@databricks.com wrote: val newSchemaRDD = sqlContext.applySchema(existingSchemaRDD, existingSchemaRDD.schema) This line is throwing away the logical information about existingSchemaRDD and thus Spark SQL can't know how to push down projections or predicates past this operator. Can you describe more the problems that you see if you don't do this reapplication of the schema. -- Regards Nitin Goyal -- Regards Nitin Goyal
PhysicalRDD problem?
Hi All, I am facing following problem on Spark-1.2 rc1 where I get Treenode exception (unresolved attributes) :- https://issues.apache.org/jira/browse/SPARK-2063 To avoid this, I do something following :- val newSchemaRDD = sqlContext.applySchema(existingSchemaRDD, existingSchemaRDD.schema) It seems to work with above code but I see that PROJECTIONS and PREDICATES aren't pushed down in my InMemoryTabularScan(spark.sql.inMemoryColumnarStorage.partitionPruning = true) and get a performance hit. When I see the logical plan while debugging, I see something like :- ... Filter(col1#38=2,col2#39=3) PhysicalRDD[...] InMemoryTabularScan[col1,col2,col3,col4,col5(InMEmoryRelation...)] while I expect it to be :- ... PhysicalRDD[...] InMemoryTabularScan[col1#38=2,col2#39=3,col3,col4,col5(InMEmoryRelation...)] Predicates and Projections do get pushed down when I don't create new RDD by applying schema again and using the existing schema RDD further(in case of simple queries) but then for complex queries, I get TreenodeException (Unresolved Attributes) as I mentioned. Let me know if you need any more info around my problem. Thanks in Advance -Nitin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/PhysicalRDD-problem-tp20589.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
SchemaRDD partition on specific column values?
Hi All, I want to hash partition (and then cache) a schema RDD in way that partitions are based on hash of the values of a column (ID column in my case). e.g. if my table has ID column with values as 1,2,3,4,5,6,7,8,9 and spark.sql.shuffle.partitions is configured as 3, then there should be 3 partitions and say for ID=1, all the tuples should be present in one particular partition. My actual use case is that I always get a query in which I have to join 2 cached tables on ID column, so it first partitions both tables on ID and then apply JOIN and I want to avoid the partitioning based on ID by preprocessing it (and then cache it). Thanks in Advance -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SchemaRDD-partition-on-specific-column-values-tp20350.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SchemaRDD partition on specific column values?
With some quick googling, I learnt that I can we can provide distribute by coulmn_name in hive ql to distribute data based on a column values. My question now if I use distribute by id, will there be any performance improvements? Will I be able to avoid data movement in shuffle(Excahnge before JOIN step) and improve overall performance? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SchemaRDD-partition-on-specific-column-values-tp20350p20424.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Is Spark 1.1.0 incompatible with Hive?
Yes, I added all the Hive jars present in Cloudera distribution of Hadoop. I added them because I was getting ClassNotFoundException for many required classes(one example stack trace below). So, someone on the community suggested to include the hive jars: *Exception in thread main java.lang.NoClassDefFoundError: org/apache/hadoop/hive/conf/HiveConf* *at org.apache.spark.sql.hive.api.java.JavaHiveContext.init(JavaHiveContext.scala:30)* *at HiveContextExample.main(HiveContextExample.java:57)* *at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606)* *at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:331)* *at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)* *at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)* *Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hive.conf.HiveConf* On Mon, Oct 27, 2014 at 1:57 PM, Michael Armbrust mich...@databricks.com wrote: No such method error almost always means you are mixing different versions of the same library on the classpath. In this case it looks like you have more than one version of guava. Have you added anything to the classpath? On Mon, Oct 27, 2014 at 8:36 AM, nitinkak001 nitinkak...@gmail.com wrote: I am working on running the following hive query from spark. /SELECT * FROM spark_poc.table_name DISTRIBUTE BY GEO_REGION, GEO_COUNTRY SORT BY IP_ADDRESS, COOKIE_ID/ Ran into /java.lang.NoSuchMethodError: com.google.common.hash.HashFunction.hashInt(I)Lcom/google/common/hash/HashCode;/ (complete stack trace at the bottom). Found a few mentions of this issue in the user list. It seems(from the below thread link) that there is a Guava version incompatibility between Spark 1.1.0 and Hive which is probably fixed in 1.2.0. / http://apache-spark-user-list.1001560.n3.nabble.com/Hive-From-Spark-td10110.html#a12671/ *So, wanted to confirm, is Spark SQL 1.1.0 incompatible with Hive or is there a workaround to this?* /Exception in thread Driver java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:162) Caused by: java.lang.NoSuchMethodError: com.google.common.hash.HashFunction.hashInt(I)Lcom/google/common/hash/HashCode; at org.apache.spark.util.collection.OpenHashSet.org $apache$spark$util$collection$OpenHashSet$$hashcode(OpenHashSet.scala:261) at org.apache.spark.util.collection.OpenHashSet$mcI$sp.getPos$mcI$sp(OpenHashSet.scala:165) at org.apache.spark.util.collection.OpenHashSet$mcI$sp.contains$mcI$sp(OpenHashSet.scala:102) at org.apache.spark.util.SizeEstimator$$anonfun$visitArray$2.apply$mcVI$sp(SizeEstimator.scala:214) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at org.apache.spark.util.SizeEstimator$.visitArray(SizeEstimator.scala:210) at org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:169) at org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:161) at org.apache.spark.util.SizeEstimator$.estimate(SizeEstimator.scala:155) at org.apache.spark.util.collection.SizeTracker$class.takeSample(SizeTracker.scala:78) at org.apache.spark.util.collection.SizeTracker$class.afterUpdate(SizeTracker.scala:70) at org.apache.spark.util.collection.SizeTrackingVector.$plus$eq(SizeTrackingVector.scala:31) at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:236) at org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:126) at org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:104) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:750) at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:601) at org.apache.spark.storage.BlockManager.putSingle(BlockManager.scala:872) at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:79) at org.apache.spark.broadcast.TorrentBroadcast.init(TorrentBroadcast.scala:68) at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:36) at
Re: Is Spark 1.1.0 incompatible with Hive?
I am now on CDH 5.2 which has the Hive module packaged in it. On Mon, Oct 27, 2014 at 2:17 PM, Michael Armbrust mich...@databricks.com wrote: Which version of CDH are you using? I believe that hive is not correctly packaged in 5.1, but should work in 5.2. Another option that people use is to deploy the plain Apache version of Spark on CDH Yarn. On Mon, Oct 27, 2014 at 11:10 AM, Nitin kak nitinkak...@gmail.com wrote: Yes, I added all the Hive jars present in Cloudera distribution of Hadoop. I added them because I was getting ClassNotFoundException for many required classes(one example stack trace below). So, someone on the community suggested to include the hive jars: *Exception in thread main java.lang.NoClassDefFoundError: org/apache/hadoop/hive/conf/HiveConf* *at org.apache.spark.sql.hive.api.java.JavaHiveContext.init(JavaHiveContext.scala:30)* *at HiveContextExample.main(HiveContextExample.java:57)* *at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606)* *at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:331)* *at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)* *at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)* *Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hive.conf.HiveConf* On Mon, Oct 27, 2014 at 1:57 PM, Michael Armbrust mich...@databricks.com wrote: No such method error almost always means you are mixing different versions of the same library on the classpath. In this case it looks like you have more than one version of guava. Have you added anything to the classpath? On Mon, Oct 27, 2014 at 8:36 AM, nitinkak001 nitinkak...@gmail.com wrote: I am working on running the following hive query from spark. /SELECT * FROM spark_poc.table_name DISTRIBUTE BY GEO_REGION, GEO_COUNTRY SORT BY IP_ADDRESS, COOKIE_ID/ Ran into /java.lang.NoSuchMethodError: com.google.common.hash.HashFunction.hashInt(I)Lcom/google/common/hash/HashCode;/ (complete stack trace at the bottom). Found a few mentions of this issue in the user list. It seems(from the below thread link) that there is a Guava version incompatibility between Spark 1.1.0 and Hive which is probably fixed in 1.2.0. / http://apache-spark-user-list.1001560.n3.nabble.com/Hive-From-Spark-td10110.html#a12671/ *So, wanted to confirm, is Spark SQL 1.1.0 incompatible with Hive or is there a workaround to this?* /Exception in thread Driver java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:162) Caused by: java.lang.NoSuchMethodError: com.google.common.hash.HashFunction.hashInt(I)Lcom/google/common/hash/HashCode; at org.apache.spark.util.collection.OpenHashSet.org $apache$spark$util$collection$OpenHashSet$$hashcode(OpenHashSet.scala:261) at org.apache.spark.util.collection.OpenHashSet$mcI$sp.getPos$mcI$sp(OpenHashSet.scala:165) at org.apache.spark.util.collection.OpenHashSet$mcI$sp.contains$mcI$sp(OpenHashSet.scala:102) at org.apache.spark.util.SizeEstimator$$anonfun$visitArray$2.apply$mcVI$sp(SizeEstimator.scala:214) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at org.apache.spark.util.SizeEstimator$.visitArray(SizeEstimator.scala:210) at org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:169) at org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:161) at org.apache.spark.util.SizeEstimator$.estimate(SizeEstimator.scala:155) at org.apache.spark.util.collection.SizeTracker$class.takeSample(SizeTracker.scala:78) at org.apache.spark.util.collection.SizeTracker$class.afterUpdate(SizeTracker.scala:70) at org.apache.spark.util.collection.SizeTrackingVector.$plus$eq(SizeTrackingVector.scala:31) at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:236) at org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:126) at org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:104) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:750) at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:601
Re: Is Spark 1.1.0 incompatible with Hive?
Somehow worked by placing all the jars(except guava) in hive lib after --jars. Had initially tried to place the jars under another temporary folder and pointing the executor and driver extraClassPath to that director, but didnt work. On Mon, Oct 27, 2014 at 2:21 PM, Nitin kak nitinkak...@gmail.com wrote: I am now on CDH 5.2 which has the Hive module packaged in it. On Mon, Oct 27, 2014 at 2:17 PM, Michael Armbrust mich...@databricks.com wrote: Which version of CDH are you using? I believe that hive is not correctly packaged in 5.1, but should work in 5.2. Another option that people use is to deploy the plain Apache version of Spark on CDH Yarn. On Mon, Oct 27, 2014 at 11:10 AM, Nitin kak nitinkak...@gmail.com wrote: Yes, I added all the Hive jars present in Cloudera distribution of Hadoop. I added them because I was getting ClassNotFoundException for many required classes(one example stack trace below). So, someone on the community suggested to include the hive jars: *Exception in thread main java.lang.NoClassDefFoundError: org/apache/hadoop/hive/conf/HiveConf* *at org.apache.spark.sql.hive.api.java.JavaHiveContext.init(JavaHiveContext.scala:30)* *at HiveContextExample.main(HiveContextExample.java:57)* *at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606)* *at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:331)* *at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)* *at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)* *Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hive.conf.HiveConf* On Mon, Oct 27, 2014 at 1:57 PM, Michael Armbrust mich...@databricks.com wrote: No such method error almost always means you are mixing different versions of the same library on the classpath. In this case it looks like you have more than one version of guava. Have you added anything to the classpath? On Mon, Oct 27, 2014 at 8:36 AM, nitinkak001 nitinkak...@gmail.com wrote: I am working on running the following hive query from spark. /SELECT * FROM spark_poc.table_name DISTRIBUTE BY GEO_REGION, GEO_COUNTRY SORT BY IP_ADDRESS, COOKIE_ID/ Ran into /java.lang.NoSuchMethodError: com.google.common.hash.HashFunction.hashInt(I)Lcom/google/common/hash/HashCode;/ (complete stack trace at the bottom). Found a few mentions of this issue in the user list. It seems(from the below thread link) that there is a Guava version incompatibility between Spark 1.1.0 and Hive which is probably fixed in 1.2.0. / http://apache-spark-user-list.1001560.n3.nabble.com/Hive-From-Spark-td10110.html#a12671/ *So, wanted to confirm, is Spark SQL 1.1.0 incompatible with Hive or is there a workaround to this?* /Exception in thread Driver java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:162) Caused by: java.lang.NoSuchMethodError: com.google.common.hash.HashFunction.hashInt(I)Lcom/google/common/hash/HashCode; at org.apache.spark.util.collection.OpenHashSet.org $apache$spark$util$collection$OpenHashSet$$hashcode(OpenHashSet.scala:261) at org.apache.spark.util.collection.OpenHashSet$mcI$sp.getPos$mcI$sp(OpenHashSet.scala:165) at org.apache.spark.util.collection.OpenHashSet$mcI$sp.contains$mcI$sp(OpenHashSet.scala:102) at org.apache.spark.util.SizeEstimator$$anonfun$visitArray$2.apply$mcVI$sp(SizeEstimator.scala:214) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at org.apache.spark.util.SizeEstimator$.visitArray(SizeEstimator.scala:210) at org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:169) at org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:161) at org.apache.spark.util.SizeEstimator$.estimate(SizeEstimator.scala:155) at org.apache.spark.util.collection.SizeTracker$class.takeSample(SizeTracker.scala:78) at org.apache.spark.util.collection.SizeTracker$class.afterUpdate(SizeTracker.scala:70) at org.apache.spark.util.collection.SizeTrackingVector.$plus$eq(SizeTrackingVector.scala:31) at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:236