Spark DataSet class is not truly private[sql]

2020-03-04 Thread Nirav Patel
I see Spark dataset is defined as: class Dataset[T] private[sql]( @transient val sparkSession: SparkSession, @DeveloperApi @InterfaceStability.Unstable @transient val queryExecution: QueryExecution, encoder: Encoder[T]) However it has public constructors which allows DataSet to

Spark 2.4 and Hive 2.3 - Performance issue with concurrent hive DDL queries

2020-01-30 Thread Nirav Patel
h how hive handles connection from spark client? I know the workaround could be to run my application in cluster in which case queries will be submitted by different client machines (worker nodes) but we really just want to use spark in local mode. Thanks, Nirav -- <http://www.xactlycorp.co

Stage or Tasks level logs missing

2019-02-13 Thread Nirav Patel
level logs If I missed something let me know Thanks, Nirav

Spark 2.2.1 - Operation not allowed: alter table replace columns

2018-12-17 Thread Nirav Patel
I see that similar issue if fixed for `ALTER TABLE table_name ADD COLUMNS(..)` stmt. https://issues.apache.org/jira/browse/SPARK-19261 Is it also fixed for `REPLACE COLUMNS` in any subsequent version? Thanks --

Re: CSV parser - is there a way to find malformed csv record

2018-10-09 Thread Nirav Patel
Thanks Shuporno . That mode worked. I found out couple records having quotes inside quotes which needed to be escaped. On Tue, Oct 9, 2018 at 1:40 PM Taylor Cox wrote: > Hey Nirav, > > > > Here’s an idea: > > > > Suppose your file.csv has N records, one for eac

CSV parser - is there a way to find malformed csv record

2018-10-08 Thread Nirav Patel
I am getting `RuntimeException: Malformed CSV record` while parsing csv record and attaching schema at same time. Most likely there are additional commas or json data in some field which are not escaped properly. Is there a way CSV parser tells me which record is malformed? This is what I am

Re: CSV parser - how to parse column containing json data

2018-10-02 Thread Nirav Patel
.withColumn("_c3", from_json(col("_c3_signals"),json_schema)) > > > > *From: *Nirav Patel > *Date: *Thursday, August 30, 2018 at 7:19 PM > *To: *spark users > *Subject: *CSV parser - how to parse column containing json data > > > > Is the

CSV parser - how to parse column containing json data

2018-08-30 Thread Nirav Patel
Is there a way to parse csv file with some column in middle containing json data structure? "a",102,"c","{"x":"xx","y":false,"z":123}","d","e",102.2 Thanks, Nirav -- <http://www.xactlycorp.com/

csv reader performance with multiline option

2018-08-18 Thread Nirav Patel
does enabling 'multiLine' option impact performance? how? would it run read entire file with just one thread? Thanks --        

Re: Insert into dynamic partitioned hive/parquet table throws error - Partition spec contains non-partition columns

2018-08-07 Thread Nirav Patel
FYI, it works with static partitioning spark.sql("insert overwrite table mytable PARTITION(P1=1085, P2=164590861) select c1, c2,..cn, P1, P2 from updateTable") On Thu, Aug 2, 2018 at 5:01 PM, Nirav Patel wrote: > I am trying to insert overwrite multiple partitions into existing

Updating dynamic partitioned hive table throws error - Partition spec contains non-partition columns

2018-08-07 Thread nirav
I am using spark 2.2.1 and hive2.1. I am trying to insert overwrite multiple partitions into existing partitioned hive/parquet table. Table was created using sparkSession. I have a table 'mytable' with partitions P1 and P2. I have following set on sparkSession object:

Insert into dynamic partitioned hive/parquet table throws error - Partition spec contains non-partition columns

2018-08-02 Thread Nirav Patel
I am trying to insert overwrite multiple partitions into existing partitioned hive/parquet table. Table was created using sparkSession. I have a table 'mytable' with partitions P1 and P2. I have following set on sparkSession object: .config("hive.exec.dynamic.partition", true)

Re: Saving dataframes with partitionBy: append partitions, overwrite within each

2018-08-02 Thread Nirav Patel
by: org.apache.hadoop.hive.ql.metadata.Table$ValidationFailureSemanticException: Partition spec {p1=, p2=, P1=1085, P2=164590861} contains non-partition columns On Thu, Aug 2, 2018 at 11:37 AM, Nirav Patel wrote: > Thanks Koert. I'll check that out when we can update to 2.3 > > Meanwhile, I am trying hive sql (INSERT

Re: Saving dataframes with partitionBy: append partitions, overwrite within each

2018-08-02 Thread Nirav Patel
seudo code; yet to try df.createOrReplaceTempView("updateTable") df.rdd.groupBy(P1, P2).map { (key, Iterable[Row]) => spark.sql("INSERT OVERWRITE TABLE stats PARTITION(P1 = key._1, P2 = key._2) SELECT * from updateTable where P1 = key._1 and P2 = key._2") } Regards,

Overwrite only specific partition with hive dynamic partitioning

2018-08-01 Thread Nirav Patel
Hi, I have a hive partition table created using sparkSession. I would like to insert/overwrite Dataframe data to specific set of partition without loosing any other partition. In each run I have to update Set of partitions not just one. e.g. I have dataframe with bid=1, bid=2, bid=3 in first

Re: Saving dataframes with partitionBy: append partitions, overwrite within each

2018-08-01 Thread Nirav Patel
Hi Peay, Have you find better solution yet? I am having same issue. Following says it works with spark 2.1 onward but only when you use sqlContext and not Dataframe https://medium.com/@anuvrat/writing-into-dynamic-partitions-using-spark-2e2b818a007a Thanks, Nirav On Mon, Oct 2, 2017 at 4:37 AM

Re: Mulitple joins with same Dataframe throws AnalysisException: resolved attribute(s)

2018-07-19 Thread Nirav Patel
corrected subject line. It's missing attribute error not ambiguous reference error. On Thu, Jul 19, 2018 at 2:11 PM, Nirav Patel wrote: > I am getting attribute missing error after joining dataframe 'df2' twice . > > Exception in thread "main" org.apache.spark.sql.AnalysisEx

Mulitple joins with same Dataframe throws Ambiguous reference error

2018-07-19 Thread Nirav Patel
I am getting attribute missing error after joining dataframe 'df2' twice . Exception in thread "main" org.apache.spark.sql.AnalysisException: resolved attribute(s) *fid#49 *missing from value#14,value#126,mgrId#15,name#16,d31#109,df2Id#125,df2Id#47,d4#130,d3#129,df1Id#13,name#128, *fId#127* in

Re: Dataframe from partitioned parquet table missing partition columns from schema

2018-07-17 Thread Nirav Patel
Just found out that I need following option while reading: .option("basePath", "hdfs://localhost:9000/ptest/") https://stackoverflow.com/questions/43192940/why-is-partition-key-column-missing-from-dataframe On Tue, Jul 17, 2018 at 3:48 PM, Nirav Patel wrote: >

Dataframe from partitioned parquet table missing partition columns from schema

2018-07-17 Thread Nirav Patel
I created a hive table with parquet storage using sparkSql. Now in hive cli when I do describe and Select I can see partition columns in both as regular columns as well as partition column. However if I try to do same in sparkSql (Dataframe) I don't see partition columns. I need to do projection

Dataset - withColumn and withColumnRenamed that accept Column type

2018-07-13 Thread Nirav Patel
Is there a version of withColumn or withColumnRenamed that accept Column instead of String? That way I can specify FQN in case when there is duplicate column names. I can Drop column based on Column type argument then why can't I rename them based on same type argument. Use case is, I have

Re: How to avoid duplicate column names after join with multiple conditions

2018-07-12 Thread Nirav Patel
. They are both same just pick one. On Thu, Jul 12, 2018 at 9:38 AM, Prem Sure wrote: > Hi Nirav, did you try > .drop(df1(a) after join > > Thanks, > Prem > > On Thu, Jul 12, 2018 at 9:50 PM Nirav Patel wrote: > >> Hi Vamshi, >> >> That api is very res

Re: How to avoid duplicate column names after join with multiple conditions

2018-07-12 Thread Nirav Patel
, Vamshi Talla wrote: > Nirav, > > Spark does not create a duplicate column when you use the below join > expression, as an array of column(s) like below but that requires the > column name to be same in both the data frames. > > Example: *df1.join(df2, [‘a’])* >

Dataframe multiple joins with same dataframe not able to resolve correct join columns

2018-07-11 Thread Nirav Patel
I am trying to joind df1 with df2 and result of which to again with df2. df is a common dataframe. val df3 = df1 .join(*df2*, df1("PARTICIPANT_ID") === df2("PARTICIPANT_ID") and df1("BUSINESS_ID") === df2("BUSINESS_ID")) .drop(df1("BUSINESS_ID")) //dropping

How Kryo serializer allocates buffer in Spark

2018-07-10 Thread nirav
I am getting following error in spark task. Default max value is 64mb! Document says it should be large enough to store largest object in my application. I don't think I have any object thhhat is bigger then 64mb. SO what these values (spark.kryoserializer.buffer, spark.kryoserializer.buffer.max)

Dataframe joins - UnsupportedOperationException: Unimplemented type: IntegerType

2018-07-09 Thread Nirav Patel
I am getting following error after performing joins between 2 dataframe. It happens on call to .show() method. I assume it's an issue with incompatible type but it's been really hard to identify which column of which dataframe have that incompatibility. Any pointers? 11:06:10.304 13700 [Executor

How to avoid duplicate column names after join with multiple conditions

2018-07-02 Thread Nirav Patel
Expr is `df1(a) === df2(a) and df1(b) === df2(c)` How to avoid duplicate column 'a' in result? I don't see any api that combines both. Rename manually? --   

Spark sql creating managed table with location converts it to external table

2018-06-22 Thread Nirav Patel
http://www.gatorsmile.io/table-types-in-spark-external-or-managed/ "We do not allow users to create a MANAGED table with the users supplied LOCATION." Is this supposed to get resolved in 2.2 ? Thanks --   

Fwd: pyspark:APID iS coming as null

2018-04-12 Thread nirav nishith
L. ITS line 12 regexp_replace(COALESCE(get_json_object(t1.pl_unbase, '$.audience.apid'),\ get_json_object(t1.pl_unbase, '$.audience.or.apid') ), '\\[|\\"|\\]','') as apid,\ HOW does get_json_object works in spark Regards Nirav

Re: Sharing spark executor pool across multiple long running spark applications

2018-02-10 Thread Nirav Patel
remove executors from within the job, like when we know how > many we would need, so the executors could join other jobs. > > On Tue, Feb 6, 2018 at 3:00 PM, Nirav Patel <npa...@xactlycorp.com> wrote: > >> Currently sparkContext and it's executor pool is not shareable. Ea

Sharing spark executor pool across multiple long running spark applications

2018-02-06 Thread Nirav Patel
Currently sparkContext and it's executor pool is not shareable. Each spakContext gets its own executor pool for entire life of an application. So what is the best ways to share cluster resources across multiple long running spark applications? Only one I see is spark dynamic allocation but it has

Project tungsten phase2 - SIMD and columnar in-memory storage

2017-06-29 Thread Nirav Patel
I read following future optimizations in Tungsten on databricks site. https://databricks.com/blog/2015/04/28/project-tungsten-bringing-spark-closer-to-bare-metal.html "There are also a handful of longer term possibilities for Tungsten. In particular, we plan to investigate compilation to LLVM or

Re: Spark standalone , client mode. How do I monitor?

2017-06-29 Thread Nirav Patel
you can use ganglia, ambari or nagios to monitor spark workers/masters. Spark executors are resilient. There are may proprietary software companies as well that just do hadoop application monitoring. On Tue, Jun 27, 2017 at 5:03 PM, anna stax wrote: > Hi all, > > I have

Re: Does spark support Apache Arrow

2017-06-29 Thread Nirav Patel
Kwon, Isn't that JIRA is part of integration with Arrow. As far as arrow as in-memory store goes it probably conflicts with spark's own tungsten memory representation, right? Thanks Nir On Thu, May 19, 2016 at 8:03 PM, Hyukjin Kwon wrote: > FYI, there is a JIRA for this,

Re: Apache Arrow + Spark examples?

2017-06-29 Thread Nirav Patel
bump. I have same question at Petr. SPARK-13534 seem to only solve de(serialization) issue involved between rdd and python objects. However, isn't Arrow can be standard for in-memory columnar representation? may be alternative to spark current in-memory store (k-v blocks or tungsten) Thanks Nir

Re: DataFrameWriter - Where to find list of Options applicable to particular format(datasource)

2017-03-14 Thread Nirav Patel
/jira/browse/SPARK-18579 > ). > > > > 2017-03-14 9:20 GMT+09:00 Nirav Patel <npa...@xactlycorp.com>: > >> Hi, >> >> Is there a document for each datasource (csv, tsv, parquet, json, avro) >> with available options ? I need to find one for csv to

Re: Monitoring ongoing Spark Job when run in Yarn Cluster mode

2017-03-13 Thread Nirav Patel
I think it would be on port 4040 by default on the Node where driver is running. You should be able to navigate to that via Resource Manager's application master link as in cluster mode both AM and driver runs on same node. On Mon, Mar 13, 2017 at 6:53 AM, Sourav Mazumder <

DataFrameWriter - Where to find list of Options applicable to particular format(datasource)

2017-03-13 Thread Nirav Patel
Hi, Is there a document for each datasource (csv, tsv, parquet, json, avro) with available options ? I need to find one for csv to "ignoreLeadingWhiteSpace" and "ignoreTrailingWhiteSpace" Thanks -- [image: What's New with Xactly]

DataframeWriter - How to change filename extension

2017-02-22 Thread Nirav Patel
Hi, I am writing Dataframe as TSV using DataframeWriter as follows: myDF.write.mode("overwrite").option("sep","\t").csv("/out/path") Problem is all part files have .csv extension instead of .tsv as follows: part-r-00012-f9f06712-1648-4eb6-985b-8a9c79267eef.csv All the records are stored in TSV

Re: Dynamic Allocation not respecting spark.executor.cores

2017-01-04 Thread Nirav Patel
If this is not an expected behavior then its should be logged as an issue. On Tue, Jan 3, 2017 at 2:51 PM, Nirav Patel <npa...@xactlycorp.com> wrote: > When enabling dynamic scheduling I see that all executors are using only 1 > core even if I specify "spark.executor.cores&

Dynamic scheduling not respecting spark.executor.cores

2017-01-03 Thread Nirav Patel
When enabling dynamic scheduling I see that all executors are using only 1 core even if I specify "spark.executor.cores" to 6. If dynamic scheduling is disable then each executors will have 6 cores. I have tested this against spark 1.5 . I wonder if this is the same behavior with 2.x as well.

Re: Spark SQL UDF - passing map as a UDF parameter

2016-11-15 Thread Nirav Patel
rdd = sc.makeRDD(1 to 3).map(i => (i, 0)) > map(rdd.collect.flatMap(x => x._1 :: x._2 :: Nil).map(lit _): _*) > > // maropu > > On Tue, Nov 15, 2016 at 9:33 AM, Nirav Patel <npa...@xactlycorp.com> > wrote: > >> I am trying to use following API from Functions to

Spark SQL UDF - passing map as a UDF parameter

2016-11-14 Thread Nirav Patel
I am trying to use following API from Functions to convert a map into column so I can pass it to UDF. map(cols: Column *): Column

spark ml - ngram - how to preserve single word (1-gram)

2016-11-08 Thread Nirav Patel
Is it possible to preserve single token while using n-gram feature transformer? e.g. Array("Hi", "I", "heard", "about", "Spark") Becomes Array("Hi", "i", "heard", "about", "Spark", "Hi i", "I heard", "heard about", "about Spark") Currently if I want to do it I will have to manually transform

Spark ML - Naive Bayes - how to select Threshold values

2016-11-07 Thread Nirav Patel
Few questions about `thresholds` parameter: This is what doc says "Param for Thresholds in multi-class classification to adjust the probability of predicting each class. Array must have length equal to the number of classes, with values >= 0. The class with largest value p/t is predicted, where p

Spark ML - Is it rule of thumb that all Estimators should only be Fit on Training data

2016-11-02 Thread Nirav Patel
It is very clear that for ML algorithms (classification, regression) that Estimator only fits on training data but it's not very clear of other estimators like IDF for example. IDF is a feature transformation model but having IDF estimator and transformer makes it little confusing that what

Re: Spark ML - CrossValidation - How to get Evaluation metrics of best model

2016-11-02 Thread Nirav Patel
before you use > CrossValidator, in order to get an unbiased estimate of the best model's > performance. > > On Tue, Nov 1, 2016 at 12:10 PM Nirav Patel <npa...@xactlycorp.com> wrote: > >> I am running classification model. with normal training-test split I can

Re: Spark ML - Is IDF model reusable

2016-11-01 Thread Nirav Patel
aying same thing so thats a good thing :) > > On Wed, Nov 2, 2016 at 10:04 AM, Nirav Patel <npa...@xactlycorp.com> > wrote: > >> Hi Ayan, >> >> "classification algorithm will for sure need to Fit against new dataset >> to produce new model" I said th

Re: Spark ML - Is IDF model reusable

2016-11-01 Thread Nirav Patel
f "model evaluation" work flow > typically in lower frequency than Re-Training process. > > On Wed, Nov 2, 2016 at 5:48 AM, Nirav Patel <npa...@xactlycorp.com> wrote: > >> Hi Ayan, >> After deployment, we might re-train it every month. That is whole >&g

Re: Spark ML - Is IDF model reusable

2016-11-01 Thread Nirav Patel
little elaborate setting you > may want to automate model evaluations, but that's a different story. > > Not sure if I could explain properly, please feel free to comment. > On 1 Nov 2016 22:54, "Nirav Patel" <npa...@xactlycorp.com> wrote: > >> Yes, I do apply NaiveBayes

Spark ML - CrossValidation - How to get Evaluation metrics of best model

2016-11-01 Thread Nirav Patel
I am running classification model. with normal training-test split I can check model accuracy and F1 score using MulticlassClassificationEvaluator. How can I do this with CrossValidation approach? Afaik, you Fit entire sample data in CrossValidator as you don't want to leave out any observation

Re: Spark ML - Is IDF model reusable

2016-11-01 Thread Nirav Patel
del before passing it through the next step. > > > --- > Robin East > *Spark GraphX in Action* Michael Malak and Robin East > Manning Publications Co. > http://www.manning.com/books/spark-graphx-in-action > &g

Re: Spark ML - Is IDF model reusable

2016-11-01 Thread Nirav Patel
t; overfit your model. > > --- > Robin East > *Spark GraphX in Action* Michael Malak and Robin East > Manning Publications Co. > http://www.manning.com/books/spark-graphx-in-action > > > > > > On 1 Nov 2016, at 10:15,

Spark ML - Is IDF model reusable

2016-11-01 Thread Nirav Patel
FYI, I do reuse IDF model while making prediction against new unlabeled data but not between training and test data while training a model. On Tue, Nov 1, 2016 at 3:10 AM, Nirav Patel <npa...@xactlycorp.com> wrote: > I am using IDF estimator/model (TF-IDF) to convert text features into

Is IDF model reusable

2016-11-01 Thread Nirav Patel
I am using IDF estimator/model (TF-IDF) to convert text features into vectors. Currently, I fit IDF model on all sample data and then transform them. I read somewhere that I should split my data into training and test before fitting IDF model; Fit IDF only on training data and then use same

MulticlassClassificationEvaluator how weighted precision and weighted recall calculated

2016-10-03 Thread Nirav Patel
For example 3 class would it be? weightedPrecision = ( TP1 * w1 + TP2 * w2 + TP3 * w3) / ( TP1 * w1 + TP2 * w2 + TP3 * w3) + ( FP1 * w1 + FP2 * w2 + FP3 * w3) where TP1..2 are TP for each class. w1, w2.. are wight for each class based on their distribution in sample data? and similar for

Re: Tutorial error - zeppelin 0.6.2 built with spark 2.0 and mapr

2016-09-26 Thread Nirav Patel
FYI, it works when I use MapR configured Spark 2.0. ie export SPARK_HOME=/opt/mapr/spark/spark-2.0.0-bin-without-hadoop Thanks Nirav On Mon, Sep 26, 2016 at 3:45 PM, Nirav Patel <npa...@xactlycorp.com> wrote: > Hi, > > I built zeppeling 0.6 branch using spark 2.0 usin

Tutorial error - zeppelin 0.6.2 built with spark 2.0 and mapr

2016-09-26 Thread Nirav Patel
Hi, I built zeppeling 0.6 branch using spark 2.0 using following mvn : mvn clean package -Pbuild-distr -Pmapr41 -Pyarn -Pspark-2.0 -Pscala-2.11 -DskipTests Built went successful. I only have following set in zeppelin-conf.sh export HADOOP_HOME=/opt/mapr/hadoop/hadoop-2.5.1/ export

How PolynomialExpansion works

2016-09-16 Thread Nirav Patel
Doc says: Take a 2-variable feature vector as an example: (x, y), if we want to expand it with degree 2, then we get (x, x * x, y, x * y, y * y). I know polynomial expansion of (x+y)^2 = x^2 + 2xy + y^2 but can't relate it to above. Thanks -- [image: What's New with Xactly]

Re: Are RDD's ever persisted to disk?

2016-08-23 Thread Nirav
You can store either in serialized form(butter array) or just save it in a string format like tsv or csv. There are different RDD save apis for that. Sent from my iPhone > On Aug 23, 2016, at 12:26 PM, kant kodali wrote: > > > ok now that I understand RDD can be stored to

Re: Spark 1.5.2 - Different results from reduceByKey over multiple iterations

2016-06-23 Thread Nirav Patel
SO it was indeed my merge function. I created new result object for every merge and its working now. Thanks On Wed, Jun 22, 2016 at 3:46 PM, Nirav Patel <npa...@xactlycorp.com> wrote: > PS. In my reduceByKey operation I have two mutable object. What I do is > merge mutable2 i

Re: OOM on the driver after increasing partitions

2016-06-22 Thread Nirav Patel
e > maximum available limit. So the other options are > > 1) Separate the driver from master, i.e., run them on two separate nodes > 2) Increase the RAM capacity on the driver/master node. > > Regards, > Raghava. > > > On Wed, Jun 22, 2016 at 7:05 PM, Nirav Patel <

Re: OOM on the driver after increasing partitions

2016-06-22 Thread Nirav Patel
Yes driver keeps fair amount of meta data to manage scheduling across all your executors. I assume with 64 nodes you have more executors as well. Simple way to test is to increase driver memory. On Wed, Jun 22, 2016 at 10:10 AM, Raghava Mutharaju < m.vijayaragh...@gmail.com> wrote: > It is an

Re: Spark 1.5.2 - Different results from reduceByKey over multiple iterations

2016-06-22 Thread Nirav Patel
? On Wed, Jun 22, 2016 at 11:52 AM, Nirav Patel <npa...@xactlycorp.com> wrote: > Hi, > > I do not see any indication of errors or executor getting killed in spark > UI - jobs, stages, event timelines. No task failures. I also don't see any > errors in executor logs. > > Than

Re: Spark 1.5.2 - Different results from reduceByKey over multiple iterations

2016-06-22 Thread Nirav Patel
incorrect result, did you observe any error (on > workers) ? > > Cheers > > On Tue, Jun 21, 2016 at 10:42 PM, Nirav Patel <npa...@xactlycorp.com> > wrote: > >> I have an RDD[String, MyObj] which is a result of Join + Map operation. >> It has no partiti

Re: FullOuterJoin on Spark

2016-06-22 Thread Nirav Patel
Can your domain list fit in memory of one executor. if so you can use broadcast join. You can always narrow down to inner join and derive rest from original set if memory is issue there. If you are just concerned about shuffle memory then to reduce amount of shuffle you can do following: 1)

Re: spark job automatically killed without rhyme or reason

2016-06-22 Thread Nirav Patel
spark is memory hogger and suicidal if you have a job processing bigger dataset. however databricks claims that spark > 1.6 have optimization related to memory footprint as well as processing. It will only be available if you use dataframe or dataset. if you are using rdd you have to do lot of

Spark 1.5.2 - Different results from reduceByKey over multiple iterations

2016-06-21 Thread Nirav Patel
I have an RDD[String, MyObj] which is a result of Join + Map operation. It has no partitioner info. I run reduceByKey without passing any Partitioner or partition counts. I observed that output aggregation result for given key is incorrect sometime. like 1 out of 5 times. It looks like reduce

Spark dynamic allocation - efficiently request new resource

2016-06-07 Thread Nirav Patel
different number of CPU cores or memory available for all of its task. That way spark can process data skew with heavier executor by assigning more Memory or CPUs to new executors. Thanks Nirav -- [image: What's New with Xactly] <http://www.xactlycorp.com/email-click/> <https://www

Re: Bulk loading Serialized RDD into Hbase throws KryoException - IndexOutOfBoundsException

2016-05-30 Thread Nirav Patel
exception. On Sun, May 29, 2016 at 11:26 PM, sjk <shijinkui...@163.com> wrote: > org.apache.hadoop.hbase.client.{Mutation, Put} > org.apache.hadoop.hbase.io.ImmutableBytesWritable > > if u used mutation, register the above class too > > On May 30, 2016, at 08:11, Nirav Pate

Re: Bulk loading Serialized RDD into Hbase throws KryoException - IndexOutOfBoundsException

2016-05-29 Thread Nirav Patel
29, 2016, at 4:58 PM, Nirav Patel <npa...@xactlycorp.com> wrote: > > I pasted code snipped for that method. > > here's full def: > > def writeRddToHBase2(hbaseRdd: RDD[(ImmutableBytesWritable, Put)], > tableName: String) { > > > hbase

Re: Bulk loading Serialized RDD into Hbase throws KryoException - IndexOutOfBoundsException

2016-05-29 Thread Nirav Patel
e.buffer", 16097152) val table = new HTable(hConf, tableName) //table.setWriteBufferSize(8388608) *itr.grouped(100).foreach(table.put(_)) * // << Exception happens at this point table.close() } } I am using hbase 0.98.12 mapr distribution. Th

Bulk loading Serialized RDD into Hbase throws KryoException - IndexOutOfBoundsException

2016-05-29 Thread Nirav Patel
Hi, I am getting following Kryo deserialization error when trying to buklload Cached RDD into Hbase. It works if I don't cache the RDD. I cache it with MEMORY_ONLY_SER. here's the code snippet: hbaseRdd.values.foreachPartition{ itr => val hConf = HBaseConfiguration.create()

Re: Spark UI doesn't give visibility on which stage job actually failed (due to lazy eval nature)

2016-05-25 Thread Nirav Patel
to transformation. Again my real point is to assess this as an requirement from users, stakeholders perspective regardless of technical challenge. Thanks Nirav On Wed, May 25, 2016 at 8:04 PM, Mark Hamstra <m...@clearstorydata.com> wrote: > But when you talk about optimizing the DAG, it really doe

Re: Spark UI doesn't give visibility on which stage job actually failed (due to lazy eval nature)

2016-05-25 Thread Nirav Patel
n't be particularly easy. > > On Wed, May 25, 2016 at 5:28 PM, Nirav Patel <npa...@xactlycorp.com> > wrote: > >> It's great that spark scheduler does optimized DAG processing and only >> does lazy eval when some action is performed or shuffle dependency is >

Spark UI doesn't give visibility on which stage job actually failed (due to lazy eval nature)

2016-05-25 Thread Nirav Patel
It's great that spark scheduler does optimized DAG processing and only does lazy eval when some action is performed or shuffle dependency is encountered. Sometime it goes further after shuffle dep before executing anything. e.g. if there are map steps after shuffle then it doesn't stop at shuffle

Spark UI metrics - Task execution time and number of records processed

2016-05-18 Thread Nirav Patel
Hi, I have been noticing that for shuffled tasks(groupBy, Join) reducer tasks are not evenly loaded. Most of them (90%) finished super fast but there are some outliers that takes much longer as you can see from "Max" value in following metric. Metric is from Join operation done on two RDDs. I

API to study key cardinality and distribution and other important statistics about data at certain stage

2016-05-13 Thread Nirav Patel
Hi, Problem is every time job fails or perform poorly at certain stages you need to study your data distribution just before THAT stage. Overall look at input data set doesn't help very much if you have so many transformation going on in DAG. I alway end up writing complicated typed code to run

How to take executor memory dump

2016-05-11 Thread Nirav Patel
Hi, I am hitting OutOfMemoryError issues with spark executors. It happens mainly during shuffle. Executors gets killed with OutOfMemoryError. I have try setting up spark.executor.extraJavaOptions to take memory dump but its not happening. spark.executor.extraJavaOptions = "-XX:+UseCompressedOops

Re: How to verify if spark is using kryo serializer for shuffle

2016-05-08 Thread Nirav Patel
ing, consider increasing spark.driver.memory > > Cheers > > On Sun, May 8, 2016 at 9:14 AM, Nirav Patel <npa...@xactlycorp.com> wrote: > >> Yes, I am using yarn client mode hence I specified am settings too. >> What you mean akka is moved out of picture? I am using s

Re: How to verify if spark is using kryo serializer for shuffle

2016-05-08 Thread Nirav Patel
https://spark.apache.org/docs/latest/running-on-yarn.html > > In cluster mode, spark.yarn.am.memory is not effective. > > For Spark 2.0, akka is moved out of the picture. > FYI > >> On Sat, May 7, 2016 at 8:24 PM, Nirav Patel <npa...@xactlycorp.com> wrote: >>

Re: How to verify if spark is using kryo serializer for shuffle

2016-05-07 Thread Nirav Patel
Jvm overhead based on num of executors , stages and tasks in your app. > Do you know your driver heap size and application structure ( num of stages > and tasks ) > > Ashish > > On Saturday, May 7, 2016, Nirav Patel <npa...@xactlycorp.com> wrote: > >> Right but this l

Re: How to verify if spark is using kryo serializer for shuffle

2016-05-07 Thread Nirav Patel
happen in executor JVM NOT in driver JVM. Thanks On Sat, May 7, 2016 at 11:58 AM, Ted Yu <yuzhih...@gmail.com> wrote: > bq. at akka.serialization.JavaSerializer.toBinary(Serializer.scala:129) > > It was Akka which uses JavaSerializer > > Cheers > > On Sat, May 7, 20

How to verify if spark is using kryo serializer for shuffle

2016-05-07 Thread Nirav Patel
Hi, I thought I was using kryo serializer for shuffle. I could verify it from spark UI - Environment tab that spark.serializer org.apache.spark.serializer.KryoSerializer spark.kryo.registrator com.myapp.spark.jobs.conf.SparkSerializerRegistrator But when I see following error in Driver logs it

Re: Spark 1.5.2 Shuffle Blocks - running out of memory

2016-05-06 Thread Nirav Patel
Is this a limit of spark shuffle block currently? On Tue, May 3, 2016 at 11:18 AM, Nirav Patel <npa...@xactlycorp.com> wrote: > Hi, > > My spark application getting killed abruptly during a groupBy operation > where shuffle happens. All shuffle happens with PROCESS_LOCAL

Re: Missing data in Kafka Consumer

2016-05-06 Thread Nirav Shah
I had same issue while using with storm. Than I found no of storm spout instance should not be greater than no of partition. if you increase that than nos were not matching.May be you can check something similar for spark. Regards, Nirav On May 5, 2016 9:48 PM, "Jerry" <

Spark 1.5.2 Shuffle Blocks - running out of memory

2016-05-03 Thread Nirav Patel
Hi, My spark application getting killed abruptly during a groupBy operation where shuffle happens. All shuffle happens with PROCESS_LOCAL locality. I see following in driver logs. Should not this logs be in executors? Anyhow looks like ByteBuffer is running out of memory. What will be workaround

Re: aggregateByKey - external combine function

2016-04-29 Thread Nirav Patel
Any thoughts? I can explain more on problem but basically shuffle data doesn't seem to fit in reducer memory (32GB) and I am looking ways to process them on disk+memory. Thanks On Thu, Apr 28, 2016 at 10:07 AM, Nirav Patel <npa...@xactlycorp.com> wrote: > Hi, > > I t

aggregateByKey - external combine function

2016-04-28 Thread Nirav Patel
l need to merge key-values spread across different salt and it will come to memory issue at that point! Any pointer to resolve this? perhaps an external merge ? Thanks Nirav Thanks -- [image: What's New with Xactly] <http://www.xactlycorp.com/email-click/> <https://www.nyse.com/quote/XNY

GroubByKey Iterable[T] - Memory usage

2016-04-25 Thread Nirav Patel
Hi, Is the Iterable from out of GroupByKey is loaded fully into memory of reducer task or can it also be on disk? Also, is there a way to evacuate from memory once reducer is done iterating it and want to use memory for something else. Thanks -- [image: What's New with Xactly]

SparkDriver throwing java.lang.OutOfMemoryError: Java heap space

2016-04-04 Thread Nirav Patel
hitting byte array limits of 2GB while serializing before shuffle? What is a good solution here? Thanks Nirav -- [image: What's New with Xactly] <http://www.xactlycorp.com/email-click/> <https://www.nyse.com/quote/XNYS:XTLY> [image: LinkedIn] <https://www.linkedin.com

Re: spark 1.5.2 - value filterByRange is not a member of org.apache.spark.rdd.RDD[(myKey, myData)]

2016-04-02 Thread Nirav Patel
In second class I re-declared following and compile error went away. Your soln worked too. implicit val rowKeyOrdering = rowKeyOrd Thanks Nirav On Wed, Mar 30, 2016 at 7:36 PM, Ted Yu <yuzhih...@gmail.com> wrote: > Have you tried the following construct ? > > new OrderedRD

Multiple lookups; consolidate result and run further aggregations

2016-04-02 Thread Nirav Patel
ion will happen on driver side unless I do sparkContext.parallelize(seqVal) Is this correct? Also, what I am trying to do is efficient multiple lookup. Another option is to broadcast lookup keys and perform join. Please advice. Thanks Nirav -- [image: What's New with Xactly] &l

Re: How to efficiently Scan (not filter nor lookup) part of Paird RDD or Ordered RDD

2016-04-02 Thread Nirav Patel
mally use is to zipWithIndex() and then use the filter >> operation. Filter is an O(m) operation where m is the size of your >> partition, not an O(N) operation. >> >> -Ilya Ganelin >> >> On Sat, Jan 23, 2016 at 5:48 AM, Nirav Patel <npa...@xactlycorp.com> >

spark 1.5.2 - value filterByRange is not a member of org.apache.spark.rdd.RDD[(myKey, myData)]

2016-03-30 Thread Nirav Patel
e implicit rowKeyOrdering is defined but not in second class. Please help me resolve this compile error. Thanks Nirav -- [image: What's New with Xactly] <http://www.xactlycorp.com/email-click/> <https://www.nyse.com/quote/XNYS:XTLY> [image: LinkedIn] <https://www.linkedin.com/c

Re: Compress individual RDD

2016-03-15 Thread Nirav Patel
compress only rdds with serialization enabled in the persistence > mode. So you could skip _SER modes for your other rdds. Not perfect but > something. > On 15-Mar-2016 4:33 pm, "Nirav Patel" <npa...@xactlycorp.com> wrote: > >> Hi, >> >> I see that there

Compress individual RDD

2016-03-15 Thread Nirav Patel
Hi, I see that there's following spark config to compress an RDD. My guess is it will compress all RDDs of a given SparkContext, right? If so, is there a way to instruct spark context to only compress some rdd and leave others uncompressed ? Thanks spark.rdd.compress false Whether to compress

Re: Job fails at saveAsHadoopDataset stage due to Lost Executor due to reason unknown so far

2016-03-03 Thread Nirav Patel
ndeed was high. > You can use binary search to get to a reasonable value for caching. > > Thanks > > On Thu, Mar 3, 2016 at 7:52 AM, Nirav Patel <npa...@xactlycorp.com> wrote: > >> Hi Ted, >> >> I'd say about 70th percentile keys have 2 columns each having

Re: Job fails at saveAsHadoopDataset stage due to Lost Executor due to reason unknown so far

2016-03-03 Thread Nirav Patel
cannot pre-aggregate. Thanks Nirav On Thu, Mar 3, 2016 at 8:16 PM, Ted Yu <yuzhih...@gmail.com> wrote: > bq. hConf.setBoolean("hbase.cluster.distributed", true) > > Not sure why the above is needed. If hbase-site.xml is on the classpath, > it should contain the above

Re: Job fails at saveAsHadoopDataset stage due to Lost Executor due to reason unknown so far

2016-03-03 Thread Nirav Patel
so why does 'saveAsHadoopDataset' incurs so much memory pressure? Should I try to reduce hbase caching value ? On Wed, Mar 2, 2016 at 7:51 AM, Nirav Patel <npa...@xactlycorp.com> wrote: > Hi, > > I have a spark jobs that runs on yarn and keeps failing at line where i do :

  1   2   >