Re: How to insert data into 2000 partitions(directories) of ORC/parquet at a time using Spark SQL?

2016-06-13 Thread Bijay Pathak
Hi Swetha,

One option is to use Hive with the above issues fixed which is Hive 2.0 or
Cloudera CDH Hive 1.2 which has above issue resolved. One thing to remember
is it's not the Hive you have installed but the Hive Spark is using which
in Spark 1.6 is Hive version 1.2 as of now.

The workaround I did for this issue was to write dataframe directly using
dataframe write method and to create the Hive Table on top of that, doing
which my processing time was down  from 4+ hrs to just under 1 hr.


data_frame.write.partitionBy('idPartitioner','dtPartitoner').orc("path/to/final/location")

And ORC format is supported with HiveContext only.

Thanks,
Bijay


On Mon, Jun 13, 2016 at 11:41 AM, swetha kasireddy <
swethakasire...@gmail.com> wrote:

> Hi Mich,
>
> Following is  a sample code snippet:
>
>
> *val *userDF =
> userRecsDF.toDF("idPartitioner", "dtPartitioner", "userId", 
> "userRecord").persist()
> System.*out*.println(" userRecsDF.partitions.size"+
> userRecsDF.partitions.size)
>
> userDF.registerTempTable("userRecordsTemp")
>
> sqlContext.sql("SET hive.default.fileformat=Orc  ")
> sqlContext.sql("set hive.enforce.bucketing = true; ")
> sqlContext.sql("set hive.enforce.sorting = true; ")
> sqlContext.sql("  CREATE EXTERNAL TABLE IF NOT EXISTS users (userId
> STRING, userRecord STRING) PARTITIONED BY (idPartitioner STRING,
> dtPartitioner STRING)   stored as ORC LOCATION '/user/userId/userRecords' ")
> sqlContext.sql(
>   """ from userRecordsTemp ps   insert overwrite table users
> partition(idPartitioner, dtPartitioner)  select ps.userId, ps.userRecord,
> ps.idPartitioner, ps.dtPartitioner CLUSTER BY idPartitioner, dtPartitioner
> """.stripMargin)
>
>
> On Mon, Jun 13, 2016 at 10:57 AM, swetha kasireddy <
> swethakasire...@gmail.com> wrote:
>
>> Hi Bijay,
>>
>> If I am hitting this issue,
>> https://issues.apache.org/jira/browse/HIVE-11940. What needs to be done?
>> Incrementing to higher version of hive is the only solution?
>>
>> Thanks!
>>
>> On Mon, Jun 13, 2016 at 10:47 AM, swetha kasireddy <
>> swethakasire...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> Following is  a sample code snippet:
>>>
>>>
>>> *val *userDF = userRecsDF.toDF("idPartitioner", "dtPartitioner",
>>> "userId", "userRecord").persist()
>>> System.*out*.println(" userRecsDF.partitions.size"+
>>> userRecsDF.partitions.size)
>>>
>>> userDF.registerTempTable("userRecordsTemp")
>>>
>>> sqlContext.sql("SET hive.default.fileformat=Orc  ")
>>> sqlContext.sql("set hive.enforce.bucketing = true; ")
>>> sqlContext.sql("set hive.enforce.sorting = true; ")
>>> sqlContext.sql("  CREATE EXTERNAL TABLE IF NOT EXISTS users (userId
>>> STRING, userRecord STRING) PARTITIONED BY (idPartitioner STRING,
>>> dtPartitioner STRING)   stored as ORC LOCATION '/user/userId/userRecords' "
>>> )
>>> sqlContext.sql(
>>>   """ from userRecordsTemp ps   insert overwrite table users
>>> partition(idPartitioner, dtPartitioner)  select ps.userId, ps.userRecord,
>>> ps.idPartitioner, ps.dtPartitioner CLUSTER BY idPartitioner, dtPartitioner
>>> """.stripMargin)
>>>
>>>
>>>
>>>
>>> On Fri, Jun 10, 2016 at 12:10 AM, Bijay Pathak <
>>> bijay.pat...@cloudwick.com> wrote:
>>>
>>>> Hello,
>>>>
>>>> Looks like you are hitting this:
>>>> https://issues.apache.org/jira/browse/HIVE-11940.
>>>>
>>>> Thanks,
>>>> Bijay
>>>>
>>>>
>>>>
>>>> On Thu, Jun 9, 2016 at 9:25 PM, Mich Talebzadeh <
>>>> mich.talebza...@gmail.com> wrote:
>>>>
>>>>> cam you provide a code snippet of how you are populating the target
>>>>> table from temp table.
>>>>>
>>>>>
>>>>> HTH
>>>>>
>>>>> Dr Mich Talebzadeh
>>>>>
>>>>>
>>>>>
>>>>> LinkedIn * 
>>>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>
>>>>>
>>>>>
>>>>> http://talebzadehmich.wordpress.com
>>&

Re: How to insert data into 2000 partitions(directories) of ORC/parquet at a time using Spark SQL?

2016-06-10 Thread Bijay Pathak
Hello,

Looks like you are hitting this:
https://issues.apache.org/jira/browse/HIVE-11940.

Thanks,
Bijay



On Thu, Jun 9, 2016 at 9:25 PM, Mich Talebzadeh 
wrote:

> cam you provide a code snippet of how you are populating the target table
> from temp table.
>
>
> HTH
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 9 June 2016 at 23:43, swetha kasireddy 
> wrote:
>
>> No, I am reading the data from hdfs, transforming it , registering the
>> data in a temp table using registerTempTable and then doing insert
>> overwrite using Spark SQl' hiveContext.
>>
>> On Thu, Jun 9, 2016 at 3:40 PM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> how are you doing the insert? from an existing table?
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>>
>>> On 9 June 2016 at 21:16, Stephen Boesch  wrote:
>>>
 How many workers (/cpu cores) are assigned to this job?

 2016-06-09 13:01 GMT-07:00 SRK :

> Hi,
>
> How to insert data into 2000 partitions(directories) of ORC/parquet
> at a
> time using Spark SQL? It seems to be not performant when I try to
> insert
> 2000 directories of Parquet/ORC using Spark SQL. Did anyone face this
> issue?
>
> Thanks!
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-insert-data-into-2000-partitions-directories-of-ORC-parquet-at-a-time-using-Spark-SQL-tp27132.html
> Sent from the Apache Spark User List mailing list archive at
> Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>

>>>
>>
>


Re: Dataframe saves for a large set but throws OOM for a small dataset

2016-04-30 Thread Bijay Pathak
Sorry, for the confusion this was supposed to be answer for another thread.

Bijay

On Sat, Apr 30, 2016 at 2:37 PM, Bijay Kumar Pathak 
wrote:

> Hi,
>
> I was facing the same issue on Spark 1.6. My data size was around 100 GB
> and was writing in the partition Hive table.
>
> I was able to solve this issue by starting from 6G of memory and reaching
> upto 15GB of memory per executor  with overhead of 2GB  and partitioning
> the DataFrame before doing the insert overwrite into the Hive Table. From
> my experience Parquet puts lot of memory pressure in executor,  try
> increasing your executor memory.
>
> Here are relevant JIRA ticket:
> https://issues.apache.org/jira/browse/SPARK-8890
> https://issues.apache.org/jira/browse/PARQUET-222
>
> Thanks,
> Bijay
>
>
>
> On Sat, Apr 30, 2016 at 1:52 PM, Brandon White 
> wrote:
>
>> randomSplit instead of randomSample
>> On Apr 30, 2016 1:51 PM, "Brandon White"  wrote:
>>
>>> val df = globalDf
>>> val filteredDfs= filterExpressions.map { expr =>
>>>   val filteredDf = df.filter(expr)
>>>   val samples = filteredDf.randomSample([.7, .3])
>>>(samples(0), samples(1)
>>> }
>>>
>>> val largeDfs = filteredDfs.(_._1)
>>> val smallDfs = filteredDfs(_._2)
>>>
>>> val unionedLargeDfs = tailRecursiveUnionAll(largeDfs.tail, largeDfs.head)
>>> val unionedSmallDfs = tailRecursiveUnionAll(smallDfs.tail, smallDfs.head)
>>>
>>> unionedLargeDfs.write.parquet(output) // works fine
>>> unionedSmallDfs.write.parquet(output)  // breaks with OOM stack trace in
>>> first thread
>>>
>>> There is no skew here. I am using Spark 1.5.1 with 80 executors with 7g
>>> memory.
>>> On Apr 30, 2016 1:22 PM, "Ted Yu"  wrote:
>>>
 Can you provide a bit more information:

 Does the smaller dataset have skew ?

 Which release of Spark are you using ?

 How much memory did you specify ?

 Thanks

 On Sat, Apr 30, 2016 at 1:17 PM, Brandon White  wrote:

> Hello,
>
> I am writing to datasets. One dataset is x2 larger than the other.
> Both datasets are written to parquet the exact same way using
>
> df.write.mode("Overwrite").parquet(outputFolder)
>
> The smaller dataset OOMs while the larger dataset writes perfectly
> fine. Here is the stack trace: Any ideas what is going on here and how I
> can fix it?
>
> Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
> at java.util.Arrays.copyOf(Arrays.java:2367)
> at
> java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:130)
> at
> java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:114)
> at
> java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:415)
> at java.lang.StringBuilder.append(StringBuilder.java:132)
> at scala.StringContext.standardInterpolator(StringContext.scala:123)
> at scala.StringContext.s(StringContext.scala:90)
> at
> org.apache.spark.sql.SQLContext$QueryExecution.toString(SQLContext.scala:947)
> at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:52)
> at
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108)
> at
> org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57)
> at
> org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57)
> at
> org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:69)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
> at
> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
> at
> org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:933)
> at
> org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:933)
> at
> org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:197)
> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:146)
> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:137)
> at
> org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:304)
>


>


Fwd: Connection closed Exception.

2016-04-10 Thread Bijay Pathak
Hi,

I am running Spark 1.6 on EMR. I have workflow which does the fiollowing
things:

   1. Read the 2 flat file, create the data frame and join it.
   2. Read the particular partition from the hive table and joins the
   dataframe from 1 with it.
   3. Finally, insert overwrite into hive table which is partitioned into
   two fields.

The stout log message in terminal when I submit the jobs show the below
message.
#
# java.lang.OutOfMemoryError: Java heap space
# -XX:OnOutOfMemoryError="kill -9 %p"
#   Executing /bin/sh -c "kill -9 30149"...
Killed

And while I check the YARN logs it shows the below error. The Spark UI
doesn't show any failure stages or tasks but the jobs get stuck in the
middle without completing all the stages. Did anyone come across similar
issues? What could be the reason behind it and how could I troubleshoot it?


16/04/11 00:19:38 ERROR client.TransportResponseHandler: Still have 1
requests outstanding when connection from ip-10-184-195-29.ec2.internal/
10.184.195.29:43162 is closed
16/04/11 00:19:38 WARN executor.CoarseGrainedExecutorBackend: An unknown
(ip-10-184-195-29.ec2.internal:43162) driver disconnected.
16/04/11 00:19:38 ERROR executor.CoarseGrainedExecutorBackend: Driver
10.184.195.29:43162 disassociated! Shutting down.
16/04/11 00:19:38 WARN netty.NettyRpcEndpointRef: Error sending message
[message = Heartbeat(12,[Lscala.Tuple2;@6545df9a,BlockManagerId(12,
ip-10-184-194-43.ec2.internal, 43867))] in 1 attempts
java.io.IOException: Connection from ip-10-184-195-29.ec2.internal/
10.184.195.29:43162 closed
at
org.apache.spark.network.client.TransportResponseHandler.channelUnregistered(TransportResponseHandler.java:124)
at
org.apache.spark.network.server.TransportChannelHandler.channelUnregistered(TransportChannelHandler.java:94)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
at
io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
at
io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
at
io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
at
io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:739)
at
io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:659)
at
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
at
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at java.lang.Thread.run(Thread.java:745)
16/04/11 00:19:38 INFO storage.DiskBlockManager: Shutdown hook called
16/04/11 00:19:38 INFO util.ShutdownHookManager: Shutdown hook called


Connection closed Exception.

2016-04-10 Thread Bijay Pathak
Hi,

I am running Spark 1.6 on EMR. I have workflow which does the fiollowing
things:

   1. Read the 2 flat file, create the data frame and join it.
   2. Read the particular partition from the hive table and joins the
   dataframe from 1 with it.
   3. Finally, insert overwrite into hive table which is partitioned into
   two fields.

The stout log message in terminal when I submit the jobs show the below
message.
#
# java.lang.OutOfMemoryError: Java heap space
# -XX:OnOutOfMemoryError="kill -9 %p"
#   Executing /bin/sh -c "kill -9 30149"...
Killed

And while I check the YARN logs it shows the below error. The Spark UI
doesn't show any failure stages or tasks but the jobs get stuck in the
middle without completing all the stages. Did anyone come across similar
issues? What could be the reason behind it and how could I troubleshoot it?


16/04/11 00:19:38 ERROR client.TransportResponseHandler: Still have 1
requests outstanding when connection from ip-10-184-195-29.ec2.internal/
10.184.195.29:43162 is closed
16/04/11 00:19:38 WARN executor.CoarseGrainedExecutorBackend: An unknown
(ip-10-184-195-29.ec2.internal:43162) driver disconnected.
16/04/11 00:19:38 ERROR executor.CoarseGrainedExecutorBackend: Driver
10.184.195.29:43162 disassociated! Shutting down.
16/04/11 00:19:38 WARN netty.NettyRpcEndpointRef: Error sending message
[message = Heartbeat(12,[Lscala.Tuple2;@6545df9a,BlockManagerId(12,
ip-10-184-194-43.ec2.internal, 43867))] in 1 attempts
java.io.IOException: Connection from ip-10-184-195-29.ec2.internal/
10.184.195.29:43162 closed
at
org.apache.spark.network.client.TransportResponseHandler.channelUnregistered(TransportResponseHandler.java:124)
at
org.apache.spark.network.server.TransportChannelHandler.channelUnregistered(TransportChannelHandler.java:94)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
at
io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
at
io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
at
io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
at
io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:739)
at
io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:659)
at
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
at
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at java.lang.Thread.run(Thread.java:745)
16/04/11 00:19:38 INFO storage.DiskBlockManager: Shutdown hook called
16/04/11 00:19:38 INFO util.ShutdownHookManager: Shutdown hook called


Re: Converting a string of format of 'dd/MM/yyyy' in Spark sql

2016-03-24 Thread Bijay Pathak
Hi,

I have written the UDF for doing same in pyspark DataFrame since some of my
dates are before unix standard time epoch of 1/1/1970. I have more than 250
columns and applying custom date_format UDF to more than 50 columns. I am
getting OOM error and poor performance because of UDF.

What's your Data Size and how is the performance?

Thanks,
Bijay

On Thu, Mar 24, 2016 at 10:19 AM, Mich Talebzadeh  wrote:

> Minor correction UK date is dd/MM/
>
> scala> sql("select paymentdate,
> TO_DATE(FROM_UNIXTIME(UNIX_TIMESTAMP(paymentdate,'dd/MM/'),'-MM-dd'))
> AS newdate from tmp").first
> res47: org.apache.spark.sql.Row = [10/02/2014,2014-02-10]
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 24 March 2016 at 17:09, Mich Talebzadeh 
> wrote:
>
>> Thanks everyone. Appreciated
>>
>> sql("select paymentdate,
>> TO_DATE(FROM_UNIXTIME(UNIX_TIMESTAMP(paymentdate,'MM/dd/'),'-MM-dd'))
>> from tmp").first
>> res45: org.apache.spark.sql.Row = [10/02/2014,2014-10-02]
>>
>> Breaking a nut with sledgehammer :)
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 24 March 2016 at 17:03, Kasinathan, Prabhu 
>> wrote:
>>
>>> Can you try this one?
>>>
>>> spark-sql> select paymentdate,
>>> TO_DATE(FROM_UNIXTIME(UNIX_TIMESTAMP(paymentdate,'MM/dd/'),'-MM-dd'))
>>> from tmp;
>>> 10/02/2014 2014-10-02
>>> spark-sql>
>>>
>>>
>>> From: Tamas Szuromi 
>>> Date: Thursday, March 24, 2016 at 9:35 AM
>>> To: Mich Talebzadeh 
>>> Cc: Ajay Chander , Tamas Szuromi <
>>> tamas.szur...@odigeo.com.INVALID>, "user @spark" 
>>> Subject: Re: Converting a string of format of 'dd/MM/' in Spark sql
>>>
>>> Actually, you should run  sql("select paymentdate,
>>> unix_timestamp(paymentdate, "dd/MM/") from tmp").first
>>>
>>>
>>> But keep in mind you will get a unix timestamp!
>>>
>>>
>>> On 24 March 2016 at 17:29, Mich Talebzadeh 
>>> wrote:
>>>
 Thanks guys.

 Unfortunately neither is working

  sql("select paymentdate, unix_timestamp(paymentdate) from tmp").first
 res28: org.apache.spark.sql.Row = [10/02/2014,null]


 Dr Mich Talebzadeh



 LinkedIn * 
 https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
 *



 http://talebzadehmich.wordpress.com



 On 24 March 2016 at 14:23, Ajay Chander  wrote:

> Mich,
>
> Can you try the value for paymentdata to this
> format  paymentdata='2015-01-01 23:59:59' , to_date(paymentdate) and
> see if it helps.
>
>
> On Thursday, March 24, 2016, Tamas Szuromi <
> tamas.szur...@odigeo.com.invalid> wrote:
>
>> Hi Mich,
>>
>> Take a look
>> https://spark.apache.org/docs/1.6.1/api/java/org/apache/spark/sql/functions.html#unix_timestamp(org.apache.spark.sql.Column,%20java.lang.String)
>>
>> cheers,
>> Tamas
>>
>>
>> On 24 March 2016 at 14:29, Mich Talebzadeh > > wrote:
>>
>>>
>>> Hi,
>>>
>>> I am trying to convert a date in Spark temporary table
>>>
>>> Tried few approaches.
>>>
>>> scala> sql("select paymentdate, to_date(paymentdate) from tmp")
>>> res21: org.apache.spark.sql.DataFrame = [paymentdate: string, _c1:
>>> date]
>>>
>>>
>>> scala> sql("select paymentdate, to_date(paymentdate) from tmp").first
>>> *res22: org.apache.spark.sql.Row = [10/02/2014,null]*
>>>
>>> My date is stored as String dd/MM/ as shown above. However,
>>> to_date() returns null!
>>>
>>>
>>> Thanks
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>>
>>
>>

>>>
>>
>


Re: multiple tables for join

2016-03-24 Thread Bijay Pathak
Hi,

Can you elaborate what's the issues you are facing, I am doing the similar
kind of join so I may be able to provide you with some suggestions and
pointers.

Thanks,
BIjay

On Thu, Mar 24, 2016 at 5:12 AM, pseudo oduesp 
wrote:

> hi , i spent two months of my times to make 10 joins whith folowin tables :
>
>1go tbal1
>3go table 2
>500mo table 3
>400 mo table 4
> 20 mo table 5
>100 mo table 6
>30 mo table 7
>40 mo table 8
> 700 mo  table 9
> 800 mo table 10
>
> i use hivecontext.sql("select * from table1  left join table 2  on
> c1==c2")..
>
> table 1 with 2000 columns
> table 2 with 18500 columns
> tables 3  with 10 columns
> all other tbales under 10 columns
>
> please help me .
>


Re: adding rows to a DataFrame

2016-03-11 Thread Bijay Pathak
Here is another way you can achieve that(in Python):
base_df.withColumn("column_name","column_expression_for_new_column")
# To add new row create the data frame containing the new row and do the
unionAll()
base_df.unionAll(new_df)

# Another approach convert to rdd add required fields and convert back to
Dataframe
def update_row(row):
"""Add extra column according to your logic"""
# Example
update_row = row + ("Text","number",)
return row

updated_row_rdd = base_df.map(lambda row: update_row(row))
# Convert back to rdd with giving the schema
updated_df = sql_context.createDataFrame(updated_row_rdd, schema)

# To add extra row create the new data frame with the new row and do the
unionAll
result_df = updated_df.unionAll(new_row_df)


Thanks,
Bijay

On Fri, Mar 11, 2016 at 11:49 AM, Michael Armbrust 
wrote:

> Or look at explode on DataFrame
>
> On Fri, Mar 11, 2016 at 10:45 AM, Stefan Panayotov 
> wrote:
>
>> Hi,
>>
>> I have a problem that requires me to go through the rows in a DataFrame
>> (or possibly through rows in a JSON file) and conditionally add rows
>> depending on a value in one of the columns in each existing row. So, for
>> example if I have:
>>
>>
>> +---+---+---+
>> | _1| _2| _3|
>> +---+---+---+
>> |ID1|100|1.1|
>> |ID2|200|2.2|
>> |ID3|300|3.3|
>> |ID4|400|4.4|
>> +---+---+---+
>>
>> I need to be able to get:
>>
>>
>> +---+---+---++---+
>> | _1| _2| _3|  _4| _5|
>> +---+---+---++---+
>> |ID1|100|1.1|ID1 add text or d...| 25|
>> |id11 ..|21 |
>> |id12 ..|22 |
>> |ID2|200|2.2|ID2 add text or d...| 50|
>> |id21 ..|33 |
>> |id22 ..|34 |
>> |id23 ..|35 |
>> |ID3|300|3.3|ID3 add text or d...| 75|
>> |id31 ..|11 |
>> |ID4|400|4.4|ID4 add text or d...|100|
>> |id41 ..|51 |
>> |id42 ..|52 |
>> |id43 ..|53 |
>> |id44 ..|54 |
>> +---+---+---++---+
>>
>> How can I achieve this in Spark without doing DF.collect(), which will
>> get everything to the driver and for a big data set I'll get OOM?
>> BTW, I know how to use withColumn() to add new columns to the DataFrame.
>> I need to also add new rows.
>> Any help will be appreciated.
>>
>> Thanks,
>>
>>
>> *Stefan Panayotov, PhD **Home*: 610-355-0919
>> *Cell*: 610-517-5586
>> *email*: spanayo...@msn.com
>> spanayo...@outlook.com
>> spanayo...@comcast.net
>>
>>
>
>


Getting all field value as Null while reading Hive Table with Partition

2016-01-20 Thread Bijay Pathak
Hello,

I am getting all the value of field as NULL while reading Hive Table with
Partition in SPARK 1.5.0 running on CDH5.5.1 with YARN (Dynamic Allocation).

Below is the command I used in Spark_Shell:

import org.apache.spark.sql.hive.HiveContext
val hiveContext = new HiveContext(sc)
val hiveTable = hiveContext.table("test_db.partition_table")

Am I missing something? Do I need to provide other info while reading
Partition table?

Thanks,
Bijay


Base metrics for Spark Benchmarking.

2015-04-16 Thread Bijay Pathak
Hello,

We wanted to tune the Spark running on YARN cluster.The Spark History
Server UI shows lots of parameters like:

   - GC time
   - Task Duration
   - Shuffle R/W
   - Shuffle Spill (Memory/Disk)
   - Serialization Time (Task/Result)
   - Scheduler Delay

Among the above metrics, which are the most important that should be taken
as reference for benchmarking the cluster performance?

Thanks,

Bijay


Re: why Shuffle Write is not zero when everything is cached and there is enough memory?

2015-03-31 Thread Bijay Pathak
The Spark Sort-Based Shuffle (default from 1.1) keeps the data from
each Map tasks to memory until they they can't fit after which they
are sorted and spilled to disk. You can reduce the Shuffle write to
disk by increasing spark.shuffle.memoryFraction(default 0.2).

By writing the shuffle output to disk the Spark lineage can be
truncated when the RDDs are already materialized as the side-effects
of earlier shuffle.This is the under the hood optimization in Spark
which is only possible because of shuffle output output being written
to disk.

You can set spark.shuffle.spill to false if you don't want to spill to
the disk and assuming you have enough heap memory.

On Tue, Mar 31, 2015 at 12:35 PM, Udit Mehta ume...@groupon.com wrote:
 I have noticed a similar issue when using spark streaming. The spark shuffle
 write size increases to a large size(in GB) and then the app crashes saying:
 java.io.FileNotFoundException:
 /data/vol0/nodemanager/usercache/$user/appcache/application_1427480955913_0339/spark-local-20150330231234-db1a/0b/temp_shuffle_1b23808f-f285-40b2-bec7-1c6790050d7f
 (No such file or directory)

 I dont understand why the shuffle size increases to such a large value for
 long running jobs.

 Thanks,
 Udiy

 On Mon, Mar 30, 2015 at 4:01 AM, shahab shahab.mok...@gmail.com wrote:

 Thanks Saisai. I will try your solution, but still i don't understand why
 filesystem should be used where there is a plenty of memory available!



 On Mon, Mar 30, 2015 at 11:22 AM, Saisai Shao sai.sai.s...@gmail.com
 wrote:

 Shuffle write will finally spill the data into file system as a bunch of
 files. If you want to avoid disk write, you can mount a ramdisk and
 configure spark.local.dir to this ram disk. So shuffle output will write
 to memory based FS, and will not introduce disk IO.

 Thanks
 Jerry

 2015-03-30 17:15 GMT+08:00 shahab shahab.mok...@gmail.com:

 Hi,

 I was looking at SparkUI, Executors, and I noticed that I have 597 MB
 for  Shuffle while I am using cached temp-table and the Spark had 2 GB 
 free
 memory (the number under Memory Used is 597 MB /2.6 GB) ?!!!

 Shouldn't be Shuffle Write be zero and everything (map/reduce) tasks be
 done in memory?

 best,

 /Shahab





-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: why Shuffle Write is not zero when everything is cached and there is enough memory?

2015-03-31 Thread Bijay Pathak
Hi Udit,

The persisted RDDs in memory is cleared by Spark using LRU policy and you
can also set the time to clear the persisted RDDs and meta-data by setting*
spark.cleaner.ttl *(default infinite). But I am not aware about any
properties to clean the older shuffle write from from disks.

thanks,
bijay

On Tue, Mar 31, 2015 at 1:50 PM, Udit Mehta ume...@groupon.com wrote:

 Thanks for the reply.
 This will reduce the shuffle write to disk to an extent but for a long
 running job(multiple days), the shuffle write would still occupy a lot of
 space on disk. Why do we need to store the data from older map tasks to
 memory?

 On Tue, Mar 31, 2015 at 1:19 PM, Bijay Pathak bijay.pat...@cloudwick.com
 wrote:

 The Spark Sort-Based Shuffle (default from 1.1) keeps the data from
 each Map tasks to memory until they they can't fit after which they
 are sorted and spilled to disk. You can reduce the Shuffle write to
 disk by increasing spark.shuffle.memoryFraction(default 0.2).

 By writing the shuffle output to disk the Spark lineage can be
 truncated when the RDDs are already materialized as the side-effects
 of earlier shuffle.This is the under the hood optimization in Spark
 which is only possible because of shuffle output output being written
 to disk.

 You can set spark.shuffle.spill to false if you don't want to spill to
 the disk and assuming you have enough heap memory.

 On Tue, Mar 31, 2015 at 12:35 PM, Udit Mehta ume...@groupon.com wrote:
  I have noticed a similar issue when using spark streaming. The spark
 shuffle
  write size increases to a large size(in GB) and then the app crashes
 saying:
  java.io.FileNotFoundException:
 
 /data/vol0/nodemanager/usercache/$user/appcache/application_1427480955913_0339/spark-local-20150330231234-db1a/0b/temp_shuffle_1b23808f-f285-40b2-bec7-1c6790050d7f
  (No such file or directory)
 
  I dont understand why the shuffle size increases to such a large value
 for
  long running jobs.
 
  Thanks,
  Udiy
 
  On Mon, Mar 30, 2015 at 4:01 AM, shahab shahab.mok...@gmail.com
 wrote:
 
  Thanks Saisai. I will try your solution, but still i don't understand
 why
  filesystem should be used where there is a plenty of memory available!
 
 
 
  On Mon, Mar 30, 2015 at 11:22 AM, Saisai Shao sai.sai.s...@gmail.com
  wrote:
 
  Shuffle write will finally spill the data into file system as a bunch
 of
  files. If you want to avoid disk write, you can mount a ramdisk and
  configure spark.local.dir to this ram disk. So shuffle output will
 write
  to memory based FS, and will not introduce disk IO.
 
  Thanks
  Jerry
 
  2015-03-30 17:15 GMT+08:00 shahab shahab.mok...@gmail.com:
 
  Hi,
 
  I was looking at SparkUI, Executors, and I noticed that I have 597 MB
  for  Shuffle while I am using cached temp-table and the Spark had 2
 GB free
  memory (the number under Memory Used is 597 MB /2.6 GB) ?!!!
 
  Shouldn't be Shuffle Write be zero and everything (map/reduce) tasks
 be
  done in memory?
 
  best,
 
  /Shahab
 
 
 
 





Shuffle Spill Memory and Shuffle Spill Disk

2015-03-23 Thread Bijay Pathak
Hello,

I am running  TeraSort https://github.com/ehiggs/spark-terasort on 100GB
of data. The final metrics I am getting on Shuffle Spill are:

Shuffle Spill(Memory): 122.5 GB
Shuffle Spill(Disk): 3.4 GB

What's the difference and relation between these two metrics? Does these
mean 122.5 GB was spill from memory during the shuffle?

thank you,
bijay