Re: java.lang.UnsupportedOperationException: Cannot evaluate expression: fun_nm(input[0, string, true])

2016-08-16 Thread Sumit Khanna
This is just the stacktrace,but where is it you ccalling the UDF?

Regards,
Sumit

On 16-Aug-2016 2:20 pm, "pseudo oduesp"  wrote:

> hi,
> i cretae new columns with udf  after i try to filter this columns :
> i get this error why ?
>
> : java.lang.UnsupportedOperationException: Cannot evaluate expression:
> fun_nm(input[0, string, true])
> at org.apache.spark.sql.catalyst.expressions.Unevaluable$class.
> eval(Expression.scala:221)
> at org.apache.spark.sql.execution.python.PythonUDF.
> eval(PythonUDF.scala:27)
> at org.apache.spark.sql.catalyst.expressions.BinaryExpression.
> eval(Expression.scala:408)
> at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$.
> org$apache$spark$sql$catalyst$optimizer$EliminateOuterJoin$$
> canFilterOutNull(Optimizer.scala:1234)
> at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$
> anonfun$55.apply(Optimizer.scala:1248)
> at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$
> anonfun$55.apply(Optimizer.scala:1248)
> at scala.collection.LinearSeqOptimized$class.
> exists(LinearSeqOptimized.scala:93)
> at scala.collection.immutable.List.exists(List.scala:84)
> at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$.
> org$apache$spark$sql$catalyst$optimizer$EliminateOuterJoin$$
> buildNewJoinType(Optimizer.scala:1248)
> at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$
> anonfun$apply$30.applyOrElse(Optimizer.scala:1264)
> at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$$
> anonfun$apply$30.applyOrElse(Optimizer.scala:1262)
> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.
> apply(TreeNode.scala:279)
> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.
> apply(TreeNode.scala:279)
> at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.
> withOrigin(TreeNode.scala:69)
> at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(
> TreeNode.scala:278)
> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$
> transformDown$1.apply(TreeNode.scala:284)
> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$
> transformDown$1.apply(TreeNode.scala:284)
> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.
> apply(TreeNode.scala:321)
> at org.apache.spark.sql.catalyst.trees.TreeNode.
> mapProductIterator(TreeNode.scala:179)
> at org.apache.spark.sql.catalyst.trees.TreeNode.
> transformChildren(TreeNode.scala:319)
> at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(
> TreeNode.scala:284)
> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$
> transformDown$1.apply(TreeNode.scala:284)
> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$
> transformDown$1.apply(TreeNode.scala:284)
> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.
> apply(TreeNode.scala:321)
> at org.apache.spark.sql.catalyst.trees.TreeNode.
> mapProductIterator(TreeNode.scala:179)
> at org.apache.spark.sql.catalyst.trees.TreeNode.
> transformChildren(TreeNode.scala:319)
> at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(
> TreeNode.scala:284)
> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$
> transformDown$1.apply(TreeNode.scala:284)
> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$
> transformDown$1.apply(TreeNode.scala:284)
> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.
> apply(TreeNode.scala:321)
> at org.apache.spark.sql.catalyst.trees.TreeNode.
> mapProductIterator(TreeNode.scala:179)
> at org.apache.spark.sql.catalyst.trees.TreeNode.
> transformChildren(TreeNode.scala:319)
> at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(
> TreeNode.scala:284)
> at org.apache.spark.sql.catalyst.trees.TreeNode.transform(
> TreeNode.scala:268)
> at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$.
> apply(Optimizer.scala:1262)
> at org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$.
> apply(Optimizer.scala:1225)
> at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$
> execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:85)
> at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$
> execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:82)
> at scala.collection.IndexedSeqOptimized$class.
> foldl(IndexedSeqOptimized.scala:57)
> at scala.collection.IndexedSeqOptimized$class.
> foldLeft(IndexedSeqOptimized.scala:66)
> at scala.collection.mutable.WrappedArray.foldLeft(
> WrappedArray.scala:35)
> at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$
> execute$1.apply(RuleExecutor.scala:82)
> at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$
> execute$1.apply(RuleExecutor.scala:74)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(
> RuleExecutor.scala:74)
> at 

hdfs persist rollbacks when spark job is killed

2016-08-08 Thread Sumit Khanna
Hello,

the use case is as follows :

say I am inserting 200K rows as dataframe.write.formate("parquet") etc etc
(like a basic write to hdfs  command), but say due to some reason or rhyme
my job got killed, when the run was in the mid of it, meaning lets say I
was only able to insert 100K rows when my job got killed.

twist is that I might actually be upserting, and even in append only cases,
my delta change data that is being inserted / written in this run might
actually be spanning across various partitions.

Now what I am looking for is something to role the changes back, like the
batch insertion should be all or nothing, and even if it is partition, it
must must be atomic to each row/ unit of insertion.

Kindly help.

Thanks,
Sumit


silence the spark debug logs

2016-08-07 Thread Sumit Khanna
Hello,

I dont want to print the all spark logs, but say a few only, e.g just the
executions plans etc etc. How do I silence the spark debug ?

Thanks,
Sumit


Re: spark df schema to hive schema converter func

2016-08-06 Thread Sumit Khanna
wrt https://issues.apache.org/jira/browse/SPARK-5236. How do I also,
usually convert something of type DecimalType to int/ string/ etc etc.

Thanks,

On Sun, Aug 7, 2016 at 10:33 AM, Sumit Khanna <sumit.kha...@askme.in> wrote:

> Hi,
>
> was wondering if we have something like that takes as an argument a spark
> df type e.g DecimalType(12,5) and converts it into the corresponding hive
> schema type. Double / Decimal / String ?
>
> Any ideas.
>
> Thanks,
>


spark df schema to hive schema converter func

2016-08-06 Thread Sumit Khanna
Hi,

was wondering if we have something like that takes as an argument a spark
df type e.g DecimalType(12,5) and converts it into the corresponding hive
schema type. Double / Decimal / String ?

Any ideas.

Thanks,


Re: how to debug spark app?

2016-08-03 Thread Sumit Khanna
Am not really sure of the best practices on this , but I either consult the
localhost:4040/jobs/ etc
or better this :

val customSparkListener: CustomSparkListener = new CustomSparkListener()
sc.addSparkListener(customSparkListener)

class CustomSparkListener extends SparkListener {
 override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) {
  debug(s"application ended at time : ${applicationEnd.time}")
 }
 override def onApplicationStart(applicationStart:
SparkListenerApplicationStart): Unit ={
  debug(s"[SPARK LISTENER DEBUGS] application Start app attempt id :
${applicationStart.appAttemptId}")
  debug(s"[SPARK LISTENER DEBUGS] application Start app id :
${applicationStart.appId}")
  debug(s"[SPARK LISTENER DEBUGS] application start app name :
${applicationStart.appName}")
  debug(s"[SPARK LISTENER DEBUGS] applicaton start driver logs :
${applicationStart.driverLogs}")
  debug(s"[SPARK LISTENER DEBUGS] application start spark user :
${applicationStart.sparkUser}")
  debug(s"[SPARK LISTENER DEBUGS] application start time :
${applicationStart.time}")
 }
 override def onExecutorAdded(executorAdded:
SparkListenerExecutorAdded): Unit = {
  debug(s"[SPARK LISTENER DEBUGS] ${executorAdded.executorId}")
  debug(s"[SPARK LISTENER DEBUGS] ${executorAdded.executorInfo}")
  debug(s"[SPARK LISTENER DEBUGS] ${executorAdded.time}")
 }
 override  def onExecutorRemoved(executorRemoved:
SparkListenerExecutorRemoved): Unit = {
  debug(s"[SPARK LISTENER DEBUGS] the executor removed Id :
${executorRemoved.executorId}")
  debug(s"[SPARK LISTENER DEBUGS] the executor removed reason :
${executorRemoved.reason}")
  debug(s"[SPARK LISTENER DEBUGS] the executor temoved at time :
${executorRemoved.time}")
 }

 override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
  debug(s"[SPARK LISTENER DEBUGS] job End id : ${jobEnd.jobId}")
  debug(s"[SPARK LISTENER DEBUGS] job End job Result : ${jobEnd.jobResult}")
  debug(s"[SPARK LISTENER DEBUGS] job End time : ${jobEnd.time}")
 }
 override def onJobStart(jobStart: SparkListenerJobStart) {
  debug(s"[SPARK LISTENER DEBUGS] Job started with properties
${jobStart.properties}")
  debug(s"[SPARK LISTENER DEBUGS] Job started with time ${jobStart.time}")
  debug(s"[SPARK LISTENER DEBUGS] Job started with job id
${jobStart.jobId.toString}")
  debug(s"[SPARK LISTENER DEBUGS] Job started with stage ids
${jobStart.stageIds.toString()}")
  debug(s"[SPARK LISTENER DEBUGS] Job started with stages
${jobStart.stageInfos.size} : $jobStart")
 }

 override def onStageCompleted(stageCompleted:
SparkListenerStageCompleted): Unit = {
  debug(s"[SPARK LISTENER DEBUGS] Stage
${stageCompleted.stageInfo.stageId} completed with
${stageCompleted.stageInfo.numTasks} tasks.")
  debug(s"[SPARK LISTENER DEBUGS] Stage details :
${stageCompleted.stageInfo.details.toString}")
  debug(s"[SPARK LISTENER DEBUGS] Stage completion time :
${stageCompleted.stageInfo.completionTime}")
  debug(s"[SPARK LISTENER DEBUGS] Stage details :
${stageCompleted.stageInfo.rddInfos.toString()}")
 }
 override def onStageSubmitted(stageSubmitted:
SparkListenerStageSubmitted): Unit = {
  debug(s"[SPARK LISTENER DEBUGS] Stage properties :
${stageSubmitted.properties}")
  debug(s"[SPARK LISTENER DEBUGS] Stage rddInfos :
${stageSubmitted.stageInfo.rddInfos.toString()}")
  debug(s"[SPARK LISTENER DEBUGS] Stage submission Time :
${stageSubmitted.stageInfo.submissionTime}")
  debug(s"[SPARK LISTENER DEBUGS] Stage submission details :
${stageSubmitted.stageInfo.details.toString()}")
 }
 override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
  debug(s"[SPARK LISTENER DEBUGS] task ended reason : ${taskEnd.reason}")
  debug(s"[SPARK LISTENER DEBUGS] task type : ${taskEnd.taskType}")
  debug(s"[SPARK LISTENER DEBUGS] task Metrics : ${taskEnd.taskMetrics}")
  debug(s"[SPARK LISTENER DEBUGS] task Info : ${taskEnd.taskInfo}")
  debug(s"[SPARK LISTENER DEBUGS] task stage Id : ${taskEnd.stageId}")
  debug(s"[SPARK LISTENER DEBUGS] task stage attempt Id :
${taskEnd.stageAttemptId}")
  debug(s"[SPARK LISTENER DEBUGS] task ended reason : ${taskEnd.reason}")
 }
 override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
  debug(s"[SPARK LISTENER DEBUGS] stage Attempt id :
${taskStart.stageAttemptId}")
  debug(s"[SPARK LISTENER DEBUGS] stage Id : ${taskStart.stageId}")
  debug(s"[SPARK LISTENER DEBUGS] task Info : ${taskStart.taskInfo}")
 }
 override def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD): Unit = {
  debug(s"[SPARK LISTENER DEBUGS] the unpersist RDD id : ${unpersistRDD.rddId}")
 }
}

and then usually check for logs. P.S :I am running it as a jar.

Thanks,


On Thu, Aug 4, 2016 at 6:46 AM, Ted Yu  wrote:

> Have you looked at:
>
> https://spark.apache.org/docs/latest/running-on-yarn.html#debugging-your-application
>
> If you use Mesos:
>
> https://spark.apache.org/docs/latest/running-on-mesos.html#troubleshooting-and-debugging
>
> On Wed, Aug 3, 2016 at 6:13 PM, glen  

Re: multiple spark streaming contexts

2016-08-01 Thread Sumit Khanna
Hey Nikolay,

I know the approach, but this pretty much doesnt fit the bill for my
usecase wherein each topic needs to be logged / persisted as a separate
hdfs location.

I am looking for something where a streaming context pertains to a topic
and that topic only, and was wondering if I could have them all in parallel
in one app / jar run.

Thanks,

On Mon, Aug 1, 2016 at 1:08 PM, Nikolay Zhebet <phpap...@gmail.com> wrote:

> Hi, If you want read several kafka topics in spark-streaming job, you can
> set names of topics splited by coma and after that you can read all
> messages from all topics in one flow:
>
> val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
>
> val lines = KafkaUtils.createStream[String, String, StringDecoder, 
> StringDecoder](ssc, kafkaParams, topicMap, StorageLevel.MEMORY_ONLY).map(_._2)
>
>
> After that you can use ".filter" function for splitting your topics and 
> iterate messages separately.
>
> val orders_paid = lines.filter(x => { x("table_name") == 
> "kismia.orders_paid"})
>
> orders_paid.foreachRDD( rdd => { 
>
>
> Or you can you you if..else construction for splitting your messages by
> names in foreachRDD:
>
> lines.foreachRDD((recrdd, time: Time) => {
>
>recrdd.foreachPartition(part => {
>
>   part.foreach(item_row => {
>
>  if (item_row("table_name") == "kismia.orders_paid") { ...} else if 
> (...) {...}
>
> 
>
>
> 2016-08-01 9:39 GMT+03:00 Sumit Khanna <sumit.kha...@askme.in>:
>
>> Any ideas guys? What are the best practices for multiple streams to be
>> processed?
>> I could trace a few Stack overflow comments wherein they better recommend
>> a jar separate for each stream / use case. But that isn't pretty much what
>> I want, as in it's better if one / multiple spark streaming contexts can
>> all be handled well within a single jar.
>>
>> Guys please reply,
>>
>> Awaiting,
>>
>> Thanks,
>> Sumit
>>
>> On Mon, Aug 1, 2016 at 12:24 AM, Sumit Khanna <sumit.kha...@askme.in>
>> wrote:
>>
>>> Any ideas on this one guys ?
>>>
>>> I can do a sample run but can't be sure of imminent problems if any? How
>>> can I ensure different batchDuration etc etc in here, per StreamingContext.
>>>
>>> Thanks,
>>>
>>> On Sun, Jul 31, 2016 at 10:50 AM, Sumit Khanna <sumit.kha...@askme.in>
>>> wrote:
>>>
>>>> Hey,
>>>>
>>>> Was wondering if I could create multiple spark stream contexts in my
>>>> application (e.g instantiating a worker actor per topic and it has its own
>>>> streaming context its own batch duration everything).
>>>>
>>>> What are the caveats if any?
>>>> What are the best practices?
>>>>
>>>> Have googled half heartedly on the same but the air isn't pretty much
>>>> demystified yet. I could skim through something like
>>>>
>>>>
>>>> http://stackoverflow.com/questions/29612726/how-do-you-setup-multiple-spark-streaming-jobs-with-different-batch-durations
>>>>
>>>>
>>>> http://stackoverflow.com/questions/37006565/multiple-spark-streaming-contexts-on-one-worker
>>>>
>>>> Thanks in Advance!
>>>> Sumit
>>>>
>>>
>>>
>>
>


Re: multiple spark streaming contexts

2016-08-01 Thread Sumit Khanna
Any ideas guys? What are the best practices for multiple streams to be
processed?
I could trace a few Stack overflow comments wherein they better recommend a
jar separate for each stream / use case. But that isn't pretty much what I
want, as in it's better if one / multiple spark streaming contexts can all
be handled well within a single jar.

Guys please reply,

Awaiting,

Thanks,
Sumit

On Mon, Aug 1, 2016 at 12:24 AM, Sumit Khanna <sumit.kha...@askme.in> wrote:

> Any ideas on this one guys ?
>
> I can do a sample run but can't be sure of imminent problems if any? How
> can I ensure different batchDuration etc etc in here, per StreamingContext.
>
> Thanks,
>
> On Sun, Jul 31, 2016 at 10:50 AM, Sumit Khanna <sumit.kha...@askme.in>
> wrote:
>
>> Hey,
>>
>> Was wondering if I could create multiple spark stream contexts in my
>> application (e.g instantiating a worker actor per topic and it has its own
>> streaming context its own batch duration everything).
>>
>> What are the caveats if any?
>> What are the best practices?
>>
>> Have googled half heartedly on the same but the air isn't pretty much
>> demystified yet. I could skim through something like
>>
>>
>> http://stackoverflow.com/questions/29612726/how-do-you-setup-multiple-spark-streaming-jobs-with-different-batch-durations
>>
>>
>> http://stackoverflow.com/questions/37006565/multiple-spark-streaming-contexts-on-one-worker
>>
>> Thanks in Advance!
>> Sumit
>>
>
>


Re: multiple spark streaming contexts

2016-07-31 Thread Sumit Khanna
Any ideas on this one guys ?

I can do a sample run but can't be sure of imminent problems if any? How
can I ensure different batchDuration etc etc in here, per StreamingContext.

Thanks,

On Sun, Jul 31, 2016 at 10:50 AM, Sumit Khanna <sumit.kha...@askme.in>
wrote:

> Hey,
>
> Was wondering if I could create multiple spark stream contexts in my
> application (e.g instantiating a worker actor per topic and it has its own
> streaming context its own batch duration everything).
>
> What are the caveats if any?
> What are the best practices?
>
> Have googled half heartedly on the same but the air isn't pretty much
> demystified yet. I could skim through something like
>
>
> http://stackoverflow.com/questions/29612726/how-do-you-setup-multiple-spark-streaming-jobs-with-different-batch-durations
>
>
> http://stackoverflow.com/questions/37006565/multiple-spark-streaming-contexts-on-one-worker
>
> Thanks in Advance!
> Sumit
>


multiple spark streaming contexts

2016-07-30 Thread Sumit Khanna
Hey,

Was wondering if I could create multiple spark stream contexts in my
application (e.g instantiating a worker actor per topic and it has its own
streaming context its own batch duration everything).

What are the caveats if any?
What are the best practices?

Have googled half heartedly on the same but the air isn't pretty much
demystified yet. I could skim through something like

http://stackoverflow.com/questions/29612726/how-do-you-setup-multiple-spark-streaming-jobs-with-different-batch-durations

http://stackoverflow.com/questions/37006565/multiple-spark-streaming-contexts-on-one-worker

Thanks in Advance!
Sumit


Re: how to save spark files as parquets efficiently

2016-07-29 Thread Sumit Khanna
Great! Common sense is very uncommon.

On Fri, Jul 29, 2016 at 8:26 PM, Ewan Leith <ewan.le...@realitymine.com>
wrote:

> If you replace the df.write ….
>
>
>
> With
>
>
>
> df.count()
>
>
>
> in your code you’ll see how much time is taken to process the full
> execution plan without the write output.
>
>
>
> That code below looks perfectly normal for writing a parquet file yes,
> there shouldn’t be any tuning needed for “normal” performance.
>
>
>
> Thanks,
>
> Ewan
>
>
>
> *From:* Sumit Khanna [mailto:sumit.kha...@askme.in]
> *Sent:* 29 July 2016 13:41
> *To:* Gourav Sengupta <gourav.sengu...@gmail.com>
> *Cc:* user <user@spark.apache.org>
> *Subject:* Re: how to save spark files as parquets efficiently
>
>
>
> Hey Gourav,
>
>
>
> Well so I think that it is my execution plan that is at fault. So
> basically df.write as a spark job on localhost:4040/ well being an action
> will include the time taken for all the umpteen transformation on it right?
> All I wanted to know is "what apt env/config params are needed to something
> simple read a dataframe from parquet and save it back as another parquet
> (meaning vanilla load/store no transformation). Is it good enough to simply
> read. and write. in the very format mentioned in spark tutorial docs i.e
>
>
>
> *df.write.format("parquet").mode("overwrite").save(hdfspathTemp) *??
>
>
>
> Thanks,
>
>
>
> On Fri, Jul 29, 2016 at 4:22 PM, Gourav Sengupta <
> gourav.sengu...@gmail.com> wrote:
>
> Hi,
>
>
> The default write format in SPARK is parquet. And I have never faced any
> issues writing over a billion records in SPARK. Are you using
> virtualization by any chance or an obsolete hard disk or Intel Celeron may
> be?
>
> Regards,
>
> Gourav Sengupta
>
>
>
> On Fri, Jul 29, 2016 at 7:27 AM, Sumit Khanna <sumit.kha...@askme.in>
> wrote:
>
> Hey,
>
>
>
> master=yarn
>
> mode=cluster
>
>
>
> spark.executor.memory=8g
>
> spark.rpc.netty.dispatcher.numThreads=2
>
>
>
> All the POC on a single node cluster. the biggest bottle neck being :
>
>
>
> 1.8 hrs to save 500k records as a parquet file/dir executing this command :
>
>
>
> *df.write.format("parquet").mode("overwrite").save(hdfspathTemp)*
>
>
>
> No doubt, the whole execution plan gets triggered on this write / save
> action. But is it the right command / set of params to save a dataframe?
>
>
>
> essentially I am doing an upsert by pulling in data from hdfs and then
> updating it with the delta changes of the current run. But not sure if
> write itself takes that much time or some optimization is needed for
> upsert. (I have that asked as another question altogether).
>
>
>
> Thanks,
>
> Sumit
>
>
>
>
>
>
>


Re: how to save spark files as parquets efficiently

2016-07-29 Thread Sumit Khanna
Hey Gourav,

Well so I think that it is my execution plan that is at fault. So basically
df.write as a spark job on localhost:4040/ well being an action will
include the time taken for all the umpteen transformation on it right? All
I wanted to know is "what apt env/config params are needed to something
simple read a dataframe from parquet and save it back as another parquet
(meaning vanilla load/store no transformation). Is it good enough to simply
read. and write. in the very format mentioned in spark tutorial docs i.e

df.write.format("parquet").mode("overwrite").save(hdfspathTemp) ??

Thanks,

On Fri, Jul 29, 2016 at 4:22 PM, Gourav Sengupta <gourav.sengu...@gmail.com>
wrote:

> Hi,
>
> The default write format in SPARK is parquet. And I have never faced any
> issues writing over a billion records in SPARK. Are you using
> virtualization by any chance or an obsolete hard disk or Intel Celeron may
> be?
>
> Regards,
> Gourav Sengupta
>
> On Fri, Jul 29, 2016 at 7:27 AM, Sumit Khanna <sumit.kha...@askme.in>
> wrote:
>
>> Hey,
>>
>> master=yarn
>> mode=cluster
>>
>> spark.executor.memory=8g
>> spark.rpc.netty.dispatcher.numThreads=2
>>
>> All the POC on a single node cluster. the biggest bottle neck being :
>>
>> 1.8 hrs to save 500k records as a parquet file/dir executing this command
>> :
>>
>> df.write.format("parquet").mode("overwrite").save(hdfspathTemp)
>>
>>
>> No doubt, the whole execution plan gets triggered on this write / save
>> action. But is it the right command / set of params to save a dataframe?
>>
>> essentially I am doing an upsert by pulling in data from hdfs and then
>> updating it with the delta changes of the current run. But not sure if
>> write itself takes that much time or some optimization is needed for
>> upsert. (I have that asked as another question altogether).
>>
>> Thanks,
>> Sumit
>>
>>
>


Re: how to save spark files as parquets efficiently

2016-07-29 Thread Sumit Khanna
Hey,

So I believe this is the right format to save the file, as in optimization
is never in the write part, but with the head / body of my execution plan
isnt it?

Thanks,

On Fri, Jul 29, 2016 at 11:57 AM, Sumit Khanna <sumit.kha...@askme.in>
wrote:

> Hey,
>
> master=yarn
> mode=cluster
>
> spark.executor.memory=8g
> spark.rpc.netty.dispatcher.numThreads=2
>
> All the POC on a single node cluster. the biggest bottle neck being :
>
> 1.8 hrs to save 500k records as a parquet file/dir executing this command :
>
> df.write.format("parquet").mode("overwrite").save(hdfspathTemp)
>
>
> No doubt, the whole execution plan gets triggered on this write / save
> action. But is it the right command / set of params to save a dataframe?
>
> essentially I am doing an upsert by pulling in data from hdfs and then
> updating it with the delta changes of the current run. But not sure if
> write itself takes that much time or some optimization is needed for
> upsert. (I have that asked as another question altogether).
>
> Thanks,
> Sumit
>
>


Re: correct / efficient manner to upsert / update in hdfs (via spark / in general)

2016-07-29 Thread Sumit Khanna
Hey Ayan,

A. Create a table TGT1 as (select key,info from delta UNION ALL select
key,info from TGT where key not in (select key from SRC)). Rename TGT1 to
TGT. Not in can be written other variations using Outer Join
B. Assuming SRC and TGT have a timestamp,
  B.1. Select latest records from UNION ALL(SRC,TGT) using RANK()
OVER PARTITION BY (Key order by timestamp desc)
  B.2. Create TGT1 from B.1. Rename TGT1 to TGT2

Well how we approached this was to broadcast the primary keys, since they
say is better because a smaller table (we make sure that our run frequency
is shrunk enlarged based on traffic somehow) so much so that the
cardinality | unique delta primary keys | is a small and broadcastable
number indeed. Then what follows next is a filter function on each executor
which has the keys to be upserted against , all with them(I believe in
memory, broadcast writes the keys in executor memory isnt it ? ). As in,
that was the only optimization I could think of. with option A, as well as
B, there are likely to be huge shuffle costs (shuffleHashJoin)s right?

1.  if updates are fairly spred across keys, the scheme does not give much
benefit as number of partition read ~= total number of partition.
2.  This scheme often shows long tail problem (Think 1 key changed in a
partition).

1. is beyond doubt true, because my any column key back in time/partition
space may get updated in the next run. So is 2, as in we make the entire
partition pass through the filter for only updating 1 or 2-3 affected keys.

I do not think with the current use case if I can ensure that keys get
partitioned well and delta corresponds to just one partition, that will
happen if I only and only maintain the date-wise partitions and some
concept of recency is observed. Let me see how HBase might efficiently
tackle this classic upsert case.

Thanks,
Sumit

On Fri, Jul 29, 2016 at 3:22 PM, ayan guha <guha.a...@gmail.com> wrote:

> This is a classic case compared to hadoop vs DWH implmentation.
>
> Source (Delta table): SRC. Target: TGT
>
> Requirement: Pure Upsert, ie just keep the latest information for each
> key.
>
> Options:
>
> A. Create a table TGT1 as (select key,info from delta UNION ALL select
> key,info from TGT where key not in (select key from SRC)). Rename TGT1 to
> TGT. Not in can be written other variations using Outer Join
> B. Assuming SRC and TGT have a timestamp,
>   B.1. Select latest records from UNION ALL(SRC,TGT) using RANK()
> OVER PARTITION BY (Key order by timestamp desc)
>   B.2. Create TGT1 from B.1. Rename TGT1 to TGT2
>
> Both options are costly. And essentially more effort can be introduced to
> write complex manipulations by partitioning data based on key and read only
> partitions which are "changed". 3 issues:
> 1.  if updates are fairly spred across keys, the scheme does not give much
> benefit as number of partition read ~= total number of partition.
> 2.  This scheme often shows long tail problem (Think 1 key changed in a
> partition).
>
> This may be good when partition is based on keys and keys increase
> monotonically. This adds maintenance of adding more partitions but do well
> well to contain number of partitions read.
>
> My advise: Give HBase a shot. It gives UPSERT out of box. If you want
> history, just add timestamp in the key (in reverse). Computation engines
> easily support HBase.
>
> Best
> Ayan
>
> On Fri, Jul 29, 2016 at 5:03 PM, Sumit Khanna <sumit.kha...@askme.in>
> wrote:
>
>> Just a note, I had the delta_df keys for the filter as in NOT
>> INTERSECTION udf broadcasted to all the worker nodes. Which I think is an
>> efficient move enough.
>>
>> Thanks,
>>
>> On Fri, Jul 29, 2016 at 12:19 PM, Sumit Khanna <sumit.kha...@askme.in>
>> wrote:
>>
>>> Hey,
>>>
>>> the very first run :
>>>
>>> glossary :
>>>
>>> delta_df := current run / execution changes dataframe.
>>>
>>> def deduplicate :
>>> apply windowing function and group by
>>>
>>> def partitionDataframe(delta_df) :
>>> get unique keys of that data frame and then return an array of data
>>> frames each containing just that very same key as the column.
>>> this will give the above dataframe partitoned as say by date column or
>>> gender column or age group column etc etc.
>>>
>>> 0. deduplicate(delta_df : delta_df [ with all unique primary  /
>>> deduplicating key column ]
>>> 1. partitionDataframe(delta_df) : Array[delta_df(i to # partitons)]
>>> 2. write the dataframe to corresponding parent hdfs path + partiton dir_
>>>
>>> subsequent runs :
>>>
>>> for each partition :
>>&g

Re: correct / efficient manner to upsert / update in hdfs (via spark / in general)

2016-07-29 Thread Sumit Khanna
Just a note, I had the delta_df keys for the filter as in NOT INTERSECTION
udf broadcasted to all the worker nodes. Which I think is an efficient move
enough.

Thanks,

On Fri, Jul 29, 2016 at 12:19 PM, Sumit Khanna <sumit.kha...@askme.in>
wrote:

> Hey,
>
> the very first run :
>
> glossary :
>
> delta_df := current run / execution changes dataframe.
>
> def deduplicate :
> apply windowing function and group by
>
> def partitionDataframe(delta_df) :
> get unique keys of that data frame and then return an array of data frames
> each containing just that very same key as the column.
> this will give the above dataframe partitoned as say by date column or
> gender column or age group column etc etc.
>
> 0. deduplicate(delta_df : delta_df [ with all unique primary  /
> deduplicating key column ]
> 1. partitionDataframe(delta_df) : Array[delta_df(i to # partitons)]
> 2. write the dataframe to corresponding parent hdfs path + partiton dir_
>
> subsequent runs :
>
> for each partition :
> 0. partitionDataframe(delta_df) : Array[delta_df(i to # partitons)]
> 1. load df from previous hdfs location of that partition
> 2. filter the above df(p) where p is the partiton no. such that keys not
> present in delta_df(p) of current run. i.e get df(p)[primary column] not in
> delta_df(p). done via a basic ! in UDF.
> 3. delta_df.unionAll(filtered df above).
> 4. persist the output of 3. as df.write.mode.format.
>
> Is this the right way of doing the upserts partiton wise?  all in all it
> is taking 2 hours for inserting / upserting 5ooK records in parquet format
> in some hdfs location where each location gets mapped to one partition.
>
> My spark conf specs are :
>
> yarn cluster mode. single node.
> spark.executor.memory 8g
> spark.rpc.netty.dispatcher.numThreads 2
>
> Thanks,
> Sumit
>
>
>


correct / efficient manner to upsert / update in hdfs (via spark / in general)

2016-07-29 Thread Sumit Khanna
Hey,

the very first run :

glossary :

delta_df := current run / execution changes dataframe.

def deduplicate :
apply windowing function and group by

def partitionDataframe(delta_df) :
get unique keys of that data frame and then return an array of data frames
each containing just that very same key as the column.
this will give the above dataframe partitoned as say by date column or
gender column or age group column etc etc.

0. deduplicate(delta_df : delta_df [ with all unique primary  /
deduplicating key column ]
1. partitionDataframe(delta_df) : Array[delta_df(i to # partitons)]
2. write the dataframe to corresponding parent hdfs path + partiton dir_

subsequent runs :

for each partition :
0. partitionDataframe(delta_df) : Array[delta_df(i to # partitons)]
1. load df from previous hdfs location of that partition
2. filter the above df(p) where p is the partiton no. such that keys not
present in delta_df(p) of current run. i.e get df(p)[primary column] not in
delta_df(p). done via a basic ! in UDF.
3. delta_df.unionAll(filtered df above).
4. persist the output of 3. as df.write.mode.format.

Is this the right way of doing the upserts partiton wise?  all in all it is
taking 2 hours for inserting / upserting 5ooK records in parquet format in
some hdfs location where each location gets mapped to one partition.

My spark conf specs are :

yarn cluster mode. single node.
spark.executor.memory 8g
spark.rpc.netty.dispatcher.numThreads 2

Thanks,
Sumit


how to save spark files as parquets efficiently

2016-07-29 Thread Sumit Khanna
Hey,

master=yarn
mode=cluster

spark.executor.memory=8g
spark.rpc.netty.dispatcher.numThreads=2

All the POC on a single node cluster. the biggest bottle neck being :

1.8 hrs to save 500k records as a parquet file/dir executing this command :

df.write.format("parquet").mode("overwrite").save(hdfspathTemp)


No doubt, the whole execution plan gets triggered on this write / save
action. But is it the right command / set of params to save a dataframe?

essentially I am doing an upsert by pulling in data from hdfs and then
updating it with the delta changes of the current run. But not sure if
write itself takes that much time or some optimization is needed for
upsert. (I have that asked as another question altogether).

Thanks,
Sumit