Re: AnalysisException - Infer schema for the Parquet path

2020-05-10 Thread Nilesh Kuchekar
Hi Chetan,

  You can have a static parquet file created, and when you
create a data frame you can pass the location of both the files, with
option mergeSchema true. This will always fetch you a dataframe even if the
original file is not present.

Kuchekar, Nilesh


On Sat, May 9, 2020 at 10:46 PM Mich Talebzadeh 
wrote:

> Have you tried catching error when you are creating a dataframe?
>
> import scala.util.{Try, Success, Failure}
> val df = Try(spark.read.
>  format("com.databricks.spark.xml").
>option("rootTag", "hierarchy").
>option("rowTag", "sms_request").
>load("/tmp/broadcast.xml")) match {
>case Success(df) => df
>case Failure(exception) => throw new Exception("foo")
>   }
> HTH
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sat, 9 May 2020 at 22:51, Chetan Khatri 
> wrote:
>
>> Hi Spark Users,
>>
>> I've a spark job where I am reading the parquet path, and that parquet
>> path data is generated by other systems, some of the parquet paths doesn't
>> contains any data which is possible. is there a any way to read the parquet
>> if no data found I can create a dummy dataframe and go ahead.
>>
>> One way is to check path exists like
>>
>>  val conf = spark.sparkContext.hadoopConfiguration
>> val fs = org.apache.hadoop.fs.FileSystem.get(conf)
>> val currentAreaExists = fs.exists(new
>> org.apache.hadoop.fs.Path(consumableCurrentArea))
>>
>> But I don't want to check this for 300 parquets, just if data doesn't
>> exist in the parquet path go with the dummy parquet / custom DataFrame
>>
>> AnalysisException: u'Unable to infer schema for Parquet. It must be
>> specified manually.;'
>>
>> Thanks
>>
>


Customize Partitioner for Datasets

2017-09-28 Thread Kuchekar
Hi,

 Is there a way we can customize the partitioner for Dataset to be a
Hive Hash Partitioner rather than Murmur3 Partitioner.

Regards,
Kuchekar, Nilesh


Re: IOT in Spark

2017-05-18 Thread Kuchekar
Hi Gaurav,

 You might want to look for Lambda Architecture with Spark.

https://www.youtube.com/watch?v=xHa7pA94DbA


Regards,
Kuchekar, Nilesh

On Thu, May 18, 2017 at 8:58 PM, Gaurav1809 <gauravhpan...@gmail.com> wrote:

> Hello gurus,
>
> How exactly it works in real world scenarios when it come to read data from
> IOT devices (say for example censors at in/out gate in huge mall)? Can we
> do
> it in Spark? Do we need to use any other tool/utility (kafka???) to read
> data from those censors and then process them in Spark? Please share your
> thoughts on this and it will give me headstart to my work. I am completely
> unaware of the technology stack that can be used here, so any pointers to
> this will be so much helpful. Thanks.
>
> -Gaurav
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/IOT-in-Spark-tp28698.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Spark UI shows Jobs are processing, but the files are already written to S3

2016-11-16 Thread Kuchekar
Hi,

 I am running a spark job, which saves the computed data (massive data)
to S3. On  the Spark Ui I see the some jobs are active, but no activity in
the logs. Also on S3 all the data has be written (verified each bucket -->
it has _SUCCESS file)

Am I missing something?

Thanks.
Kuchekar, Nilesh


Re: Maintaining order of pair rdd

2016-07-26 Thread Kuchekar
Hi Janardhan,

   You could something like this :

For maintaining the insertion order by the key  first partition by Key (so
that each key is located in the same partition) and after that you can do
something like this.

RDD.mapValues( x => ArrayBuffer(x)).reduceByKey(x,y => x++y )

The idea is create an ArrayBuffer (This maintains insertion order).


More elegant solution would be using zipByIndex on the pair RDD and then
sort by the index in each groupByKey

RDD.zipByIndex ==> This will give you something like this value
((x,y),index)

now map it like this (x,(y,index))

now reduce by key so and then sort the internal with the index value.


Thanks.
Kuchekar, Nilesh

On Tue, Jul 26, 2016 at 7:35 PM, janardhan shetty <janardhan...@gmail.com>
wrote:

> Let me provide step wise details:
>
> 1.
> I have an RDD  = {
> (ID2,18159) - *element 1  *
> (ID1,18159) - *element 2*
> (ID3,18159) - *element 3*
> (ID2,36318) - *element 4 *
> (ID1,36318) - *element 5*
> (ID3,36318)
> (ID2,54477)
> (ID1,54477)
> (ID3,54477)
> }
>
> 2. RDD.groupByKey().mapValues(v => v.toArray())
>
> Array(
> (ID1,Array(*18159*, 308703, 72636, 64544, 39244, 107937, *54477*, 145272,
> 100079, *36318*, 160992, 817, 89366, 150022, 19622, 44683, 58866, 162076,
> 45431, 100136)),
> (ID3,Array(100079, 19622, *18159*, 212064, 107937, 44683, 150022, 39244,
> 100136, 58866, 72636, 145272, 817, 89366, * 54477*, *36318*, 308703,
> 160992, 45431, 162076)),
> (ID2,Array(308703, * 54477*, 89366, 39244, 150022, 72636, 817, 58866,
> 44683, 19622, 160992, 107937, 100079, 100136, 145272, 64544, *18159*,
> 45431, *36318*, 162076))
> )
>
>
> whereas in Step 2 I need as below:
>
> Array(
> (ID1,Array(*18159*,*36318*, *54477,...*)),
> (ID3,Array(*18159*,*36318*, *54477, ...*)),
> (ID2,Array(*18159*,*36318*, *54477, ...*))
> )
>
> Does this help ?
>
> On Tue, Jul 26, 2016 at 2:25 AM, Marco Mistroni <mmistr...@gmail.com>
> wrote:
>
>> Apologies janardhan, i always get confused on this
>> Ok. so you have a  (key, val) RDD (val is irrelevant here)
>>
>> then you can do this
>> val reduced = myRDD.reduceByKey((first, second) => first  ++ second)
>>
>> val sorted = reduced.sortBy(tpl => tpl._1)
>>
>> hth
>>
>>
>>
>> On Tue, Jul 26, 2016 at 3:31 AM, janardhan shetty <janardhan...@gmail.com
>> > wrote:
>>
>>> groupBy is a shuffle operation and index is already lost in this process
>>> if I am not wrong and don't see *sortWith* operation on RDD.
>>>
>>> Any suggestions or help ?
>>>
>>> On Mon, Jul 25, 2016 at 12:58 AM, Marco Mistroni <mmistr...@gmail.com>
>>> wrote:
>>>
>>>> Hi
>>>>  after you do a groupBy you should use a sortWith.
>>>> Basically , a groupBy reduces your structure to (anyone correct me if i
>>>> m wrong) a RDD[(key,val)], which you can see as a tuple.so you could
>>>> use sortWith (or sortBy, cannot remember which one) (tpl=> tpl._1)
>>>> hth
>>>>
>>>> On Mon, Jul 25, 2016 at 1:21 AM, janardhan shetty <
>>>> janardhan...@gmail.com> wrote:
>>>>
>>>>> Thanks Marco. This solved the order problem. Had another question
>>>>> which is prefix to this.
>>>>>
>>>>> As you can see below ID2,ID1 and ID3 are in order and I need to
>>>>> maintain this index order as well. But when we do groupByKey 
>>>>> operation(*rdd.distinct.groupByKey().mapValues(v
>>>>> => v.toArray*))
>>>>> everything is *jumbled*.
>>>>> Is there any way we can maintain this order as well ?
>>>>>
>>>>> scala> RDD.foreach(println)
>>>>> (ID2,18159)
>>>>> (ID1,18159)
>>>>> (ID3,18159)
>>>>>
>>>>> (ID2,18159)
>>>>> (ID1,18159)
>>>>> (ID3,18159)
>>>>>
>>>>> (ID2,36318)
>>>>> (ID1,36318)
>>>>> (ID3,36318)
>>>>>
>>>>> (ID2,54477)
>>>>> (ID1,54477)
>>>>> (ID3,54477)
>>>>>
>>>>> *Jumbled version : *
>>>>> Array(
>>>>> (ID1,Array(*18159*, 308703, 72636, 64544, 39244, 107937, *54477*,
>>>>> 145272, 100079, *36318*, 160992, 817, 89366, 150022, 19622, 44683,
>>>>> 58866, 162076, 45431, 100136)),
>>>>> (ID3,Array(100079, 19622, *18159*, 212064, 107937, 44683, 150022,
>>>>> 39244, 100136, 58866, 72636, 145272, 817

Re: Heavy Stage Concentration - Ends With Failure

2016-07-19 Thread Kuchekar
Hi,

Can you check if the RDD is partitioned correctly with correct partition
number (if you are manually setting the partition value.) . Try using Hash
partitioner while reading the files.

One way you can debug is by checking the number of records that executor
has compared to others in the Stage tab of the Spark UI.

Kuchekar, Nilesh

On Tue, Jul 19, 2016 at 8:16 PM, Aaron Jackson <ajack...@pobox.com> wrote:

> Hi,
>
> I have a cluster with 15 nodes of which 5 are HDFS nodes.  I kick off a
> job that creates some 120 stages.  Eventually, the active and pending
> stages reduce down to a small bottleneck and it never fails... the tasks
> associated with the 10 (or so) running tasks are always allocated to the
> same executor on the same host.
>
> Sooner or later, it runs out of memory ... or some other resource.  It
> falls over and then they tasks are reallocated to another executor.
>
> Why do we see such heavy concentration of tasks onto a single executor
> when other executors are free?  Were the tasks assigned to an executor when
> the job was decomposed into stages?
>


Re: Memory issues on spark

2016-02-17 Thread Kuchekar
Give your config the cluster manager can only give 2 Executors.

Looking at m3.2xlarge --> its is with 30 GB Memory . you have 3
 *m3.2xlarge which means you have total of 3 * 30 Gb memory for executor.
15 GB for 16 executor would require 15 * 16 GB. Also check executor the
number of core you are setting.




Kuchekar, Nilesh

On Wed, Feb 17, 2016 at 8:02 PM, <arun.bong...@cognizant.com> wrote:

> Hi All,
>
> I have been facing memory issues in spark. im using spark-sql on AWS EMR.
> i have around 50GB file in AWS S3. I want to read this file in BI tool
> connected to spark-sql on thrift-server over OBDC. I'm executing select *
> from table in BI tool(qlikview,tableau).
> I run into OOM error sometimes and some time the LOST_EXECUTOR. I'm really
> confused.
> The spark runs fine for smaller data set.
>
> I have 3 node EMR cluster with m3.2xlarge.
>
> I have set below conf on spark.
>
> export SPARK_EXECUTOR_INSTANCES=16
> export SPARK_EXECUTOR_CORES=16
> export SPARK_EXECUTOR_MEMORY=15G
> export SPARK_DRIVER_MEMORY=12G
> spark.kryoserializer.buffer.max 1024m
>
> Even after setting SPARK_EXECUTOR_INSTANCES as 16, only 2 executors come
> up.
>
> This is been road block since long time. Any help would be appreciated.
>
> Thanks
> Arun.
>
> This e-mail and any files transmitted with it are for the sole use of the
> intended recipient(s) and may contain confidential and privileged
> information. If you are not the intended recipient(s), please reply to the
> sender and destroy all copies of the original message. Any unauthorized
> review, use, disclosure, dissemination, forwarding, printing or copying of
> this email, and/or any action taken in reliance on the contents of this
> e-mail is strictly prohibited and may be unlawful. Where permitted by
> applicable law, this e-mail and other e-mail communications sent to and
> from Cognizant e-mail addresses may be monitored.
>


Re: Spark execuotr Memory profiling

2016-02-10 Thread Kuchekar
Hi Nirav,

  I faced similar issue with Yarn, EMR 1.5.2 and following
Spark Conf helped me. You can set the values accordingly

conf= (SparkConf().set("spark.master","yarn-client").setAppName("HalfWay"
).set("spark.driver.memory", "15G").set("spark.yarn.am.memory","15G"))

conf=conf.set("spark.driver.maxResultSize","10G").set(
"spark.storage.memoryFraction","0.6").set("spark.shuffle.memoryFraction",
"0.6").set("spark.yarn.executor.memoryOverhead","4000")

conf = conf.set("spark.executor.cores","4").set("spark.executor.memory",
"15G").set("spark.executor.instances","6")

Is it also possible to use reduceBy in place of groupBy that might help the
shuffling too.


Kuchekar, Nilesh

On Wed, Feb 10, 2016 at 8:09 PM, Nirav Patel <npa...@xactlycorp.com> wrote:

> We have been trying to solve memory issue with a spark job that processes
> 150GB of data (on disk). It does a groupBy operation; some of the executor
> will receive somehwere around (2-4M scala case objects) to work with. We
> are using following spark config:
>
> "executorInstances": "15",
>
>  "executorCores": "1", (we reduce it to one so single task gets all
> the executorMemory! at least that's the assumption here)
>
>  "executorMemory": "15000m",
>
>  "minPartitions": "2000",
>
>  "taskCpus": "1",
>
>  "executorMemoryOverhead": "1300",
>
>  "shuffleManager": "tungsten-sort",
>
>   "storageFraction": "0.4"
>
>
> This is a snippet of what we see in spark UI for a Job that fails.
>
> This is a *stage* of this job that fails.
>
> Stage IdPool NameDescriptionSubmittedDurationTasks: Succeeded/TotalInput
> OutputShuffle Read â–¾Shuffle WriteFailure Reason
> 5 (retry 15) prod
> <http://hdn7:18080/history/application_1454975800192_0447/stages/pool?poolname=prod>
>  map
> at SparkDataJobs.scala:210
> <http://hdn7:18080/history/application_1454975800192_0447/stages/stage?id=5=15>
> +details
>
> 2016/02/09 21:30:06 13 min
> 130/389 (16 failed)
> 1982.6 MB 818.7 MB org.apache.spark.shuffle.FetchFailedException: Error
> in opening
> FileSegmentManagedBuffer{file=/tmp/hadoop/nm-local-dir/usercache/fasd/appcache/application_1454975800192_0447/blockmgr-abb77b52-9761-457a-b67d-42a15b975d76/0c/shuffle_0_39_0.data,
> offset=11421300, length=2353}
>
> This is one of the single *task* attempt from above stage that threw OOM
>
> 2 22361 0 FAILED PROCESS_LOCAL 38 / nd1.mycom.local 2016/02/09 22:10:42 5.2
> min 1.6 min 7.4 MB / 375509 java.lang.OutOfMemoryError: Java heap space
> +details
>
> java.lang.OutOfMemoryError: Java heap space
>   at java.util.IdentityHashMap.resize(IdentityHashMap.java:469)
>   at java.util.IdentityHashMap.put(IdentityHashMap.java:445)
>   at 
> org.apache.spark.util.SizeEstimator$SearchState.enqueue(SizeEstimator.scala:159)
>   at 
> org.apache.spark.util.SizeEstimator$$anonfun$visitSingleObject$1.apply(SizeEstimator.scala:203)
>   at 
> org.apache.spark.util.SizeEstimator$$anonfun$visitSingleObject$1.apply(SizeEstimator.scala:202)
>   at scala.collection.immutable.List.foreach(List.scala:318)
>   at 
> org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:202)
>   at 
> org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:186)
>   at org.apache.spark.util.SizeEstimator$.estimate(SizeEstimator.scala:54)
>   at 
> org.apache.spark.util.collection.SizeTracker$class.takeSample(SizeTracker.scala:78)
>   at 
> org.apache.spark.util.collection.SizeTracker$class.afterUpdate(SizeTracker.scala:70)
>   at 
> org.apache.spark.util.collection.SizeTrackingVector.$plus$eq(SizeTrackingVector.scala:3
>
>
> None of above suggest that it went out ot 15GB of memory that I initially
> allocated? So what am i missing here. What's eating my memory.
>
> We tried executorJavaOpts to get heap dump but it doesn't seem to work.
>
> -XX:-HeapDumpOnOutOfMemoryError -XX:OnOutOfMemoryError='kill -3 %p'
> -XX:HeapDumpPath=/opt/cores/spark
>
> I don't see any cores being generated.. neither I can find Heap dump
> anywhere in logs.
>
> Also, how do I find yarn container ID from spark executor ID ? So that I
> can investigate yarn nodemanager and resourcemanager logs for particular
> container.
>
> PS - Job does not do any caching of intermediate RDD as each RDD is just
> used once for subsequent step. We use spark 1.5.2 over Yarn in yarn-client
> mode.
>
>
> Thanks
>
>
>
>
>
>
> [image: What's New with Xactly] <http://www.xactlycorp.com/email-click/>
>
> <https://www.nyse.com/quote/XNYS:XTLY>  [image: LinkedIn]
> <https://www.linkedin.com/company/xactly-corporation>  [image: Twitter]
> <https://twitter.com/Xactly>  [image: Facebook]
> <https://www.facebook.com/XactlyCorp>  [image: YouTube]
> <http://www.youtube.com/xactlycorporation>