Re: Spark 1.6.0 running jobs in yarn shows negative no of tasks in executor

2016-02-25 Thread Umesh Kacha
Hi I am using Hadoop 2.4.0 it is not frequent sometimes it happens I dont
think my spark logic has any problem if logic would have been wrong it
would be failing everyday. I see mostly YARN killed executors so I see
executor lost in my driver logs.

On Thu, Feb 25, 2016 at 10:30 PM, Yin Yang  wrote:

> Which release of hadoop are you using ?
>
> Can you share a bit about the logic of your job ?
>
> Pastebinning portion of relevant logs would give us more clue.
>
> Thanks
>
> On Thu, Feb 25, 2016 at 8:54 AM, unk1102  wrote:
>
>> Hi I have spark job which I run on yarn and sometimes it behaves in weird
>> manner it shows negative no of tasks in few executors and I keep on
>> loosing
>> executors I also see no of executors are more than I requested. My job is
>> highly tuned not getting OOM or any problem. It is just YARN behaves in a
>> way sometimes so that executors keep on getting killed because of resource
>> crunching. Please guide how do I control YARN from behaving bad.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-6-0-running-jobs-in-yarn-shows-negative-no-of-tasks-in-executor-tp26337.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Spark Streaming with Druid?

2016-02-08 Thread Umesh Kacha
Hi Hemant, thanks much can we use SnappyData on YARN. My Spark jobs run
using yarn client mode. Please guide.

On Mon, Feb 8, 2016 at 9:46 AM, Hemant Bhanawat 
wrote:

> You may want to have a look at spark druid project already in progress:
> https://github.com/SparklineData/spark-druid-olap
>
> You can also have a look at SnappyData
> , which is a low latency
> store tightly integrated with Spark, Spark SQL and Spark Streaming. You can
> find the 0.1 Preview release's documentation here.
> 
>
> Disclaimer: I am a SnappyData engineer.
>
> Hemant
> www.snappydata.io
>
>
> On Sun, Feb 7, 2016 at 12:47 AM, unk1102  wrote:
>
>> Hi did anybody tried Spark Streaming with Druid as low latency store?
>> Combination seems powerful is it worth trying both together? Please guide
>> and share your experience. I am after creating the best low latency
>> streaming analytics.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-with-Druid-tp26164.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: How to find cause(waiting threads etc) of hanging job for 7 hours?

2016-01-12 Thread Umesh Kacha
Hi Prabhu thanks for the response. I did the same the problem is when I get
process id using jps or ps - ef I don't get user in the very first column I
see number in place of user name so can't run jstack on it because of
permission issue it gives something like following

728852   3553   9833   0   04:30   ?  00:00:00   /bin/bash blabal
On Jan 12, 2016 12:02, "Prabhu Joseph" <prabhujose.ga...@gmail.com> wrote:

> Umesh,
>
>   Running task is a thread within the executor process. We need to take
> stack trace for the executor process. The executor will be running in any
> NodeManager machine as a container.
>
>   YARN RM UI running jobs will have the host details where executor is
> running. Login to that NodeManager machine and jps -l will list all java
> processes, jstack -l  will give the stack trace.
>
>
> Thanks,
> Prabhu Joseph
>
> On Mon, Jan 11, 2016 at 7:56 PM, Umesh Kacha <umesh.ka...@gmail.com>
> wrote:
>
>> Hi Prabhu thanks for the response. How do I find pid of a slow running
>> task. Task is running in yarn cluster node. When I try to see pid of a
>> running task using my user I see some 7-8 digit number instead of user
>> running process any idea why spark creates this number instead of
>> displaying user
>> On Jan 3, 2016 6:06 AM, "Prabhu Joseph" <prabhujose.ga...@gmail.com>
>> wrote:
>>
>>> The attached image just has thread states, and WAITING threads need not
>>> be the issue. We need to take thread stack traces and identify at which
>>> area of code, threads are spending lot of time.
>>>
>>> Use jstack -l  or kill -3 , where pid is the process id of the
>>> executor process. Take jstack stack trace for every 2 seconds and total 1
>>> minute. This will help to identify the code where threads are spending lot
>>> of time and then try to tune.
>>>
>>> Thanks,
>>> Prabhu Joseph
>>>
>>>
>>>
>>> On Sat, Jan 2, 2016 at 1:28 PM, Umesh Kacha <umesh.ka...@gmail.com>
>>> wrote:
>>>
>>>> Hi thanks I did that and I have attached thread dump images. That was
>>>> the intention of my question asking for help to identify which waiting
>>>> thread is culprit.
>>>>
>>>> Regards,
>>>> Umesh
>>>>
>>>> On Sat, Jan 2, 2016 at 8:38 AM, Prabhu Joseph <
>>>> prabhujose.ga...@gmail.com> wrote:
>>>>
>>>>> Take thread dump of Executor process several times in a short time
>>>>> period and check what each threads are doing at different times which will
>>>>> help to identify the expensive sections in user code.
>>>>>
>>>>> Thanks,
>>>>> Prabhu Joseph
>>>>>
>>>>> On Sat, Jan 2, 2016 at 3:28 AM, unk1102 <umesh.ka...@gmail.com> wrote:
>>>>>
>>>>>> Sorry please see attached waiting thread log
>>>>>>
>>>>>> <
>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n25851/Screen_Shot_2016-01-02_at_2.jpg
>>>>>> >
>>>>>> <
>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n25851/Screen_Shot_2016-01-02_at_2.jpg
>>>>>> >
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> View this message in context:
>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-find-cause-waiting-threads-etc-of-hanging-job-for-7-hours-tp25850p25851.html
>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>> Nabble.com.
>>>>>>
>>>>>> -
>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>


Re: How to find cause(waiting threads etc) of hanging job for 7 hours?

2016-01-11 Thread Umesh Kacha
Hi Prabhu thanks for the response. How do I find pid of a slow running
task. Task is running in yarn cluster node. When I try to see pid of a
running task using my user I see some 7-8 digit number instead of user
running process any idea why spark creates this number instead of
displaying user
On Jan 3, 2016 6:06 AM, "Prabhu Joseph" <prabhujose.ga...@gmail.com> wrote:

> The attached image just has thread states, and WAITING threads need not be
> the issue. We need to take thread stack traces and identify at which area
> of code, threads are spending lot of time.
>
> Use jstack -l  or kill -3 , where pid is the process id of the
> executor process. Take jstack stack trace for every 2 seconds and total 1
> minute. This will help to identify the code where threads are spending lot
> of time and then try to tune.
>
> Thanks,
> Prabhu Joseph
>
>
>
> On Sat, Jan 2, 2016 at 1:28 PM, Umesh Kacha <umesh.ka...@gmail.com> wrote:
>
>> Hi thanks I did that and I have attached thread dump images. That was the
>> intention of my question asking for help to identify which waiting thread
>> is culprit.
>>
>> Regards,
>> Umesh
>>
>> On Sat, Jan 2, 2016 at 8:38 AM, Prabhu Joseph <prabhujose.ga...@gmail.com
>> > wrote:
>>
>>> Take thread dump of Executor process several times in a short time
>>> period and check what each threads are doing at different times which will
>>> help to identify the expensive sections in user code.
>>>
>>> Thanks,
>>> Prabhu Joseph
>>>
>>> On Sat, Jan 2, 2016 at 3:28 AM, unk1102 <umesh.ka...@gmail.com> wrote:
>>>
>>>> Sorry please see attached waiting thread log
>>>>
>>>> <
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n25851/Screen_Shot_2016-01-02_at_2.jpg
>>>> >
>>>> <
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n25851/Screen_Shot_2016-01-02_at_2.jpg
>>>> >
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context:
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-find-cause-waiting-threads-etc-of-hanging-job-for-7-hours-tp25850p25851.html
>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>>
>>>> -
>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>
>>>>
>>>
>>
>


Re: What should be the ideal value(unit) for spark.memory.offheap.size

2016-01-08 Thread Umesh Kacha
Hi for a 30 GB executor how much offheap should I give along with yarn
memory over head is it ok?

On Thu, Jan 7, 2016 at 4:24 AM, Ted Yu  wrote:

> Turns out that I should have specified -i to my former grep command :-)
>
> Thanks Marcelo
>
> But does this mean that specifying custom value for parameter 
> spark.memory.offheap.size
> would not take effect ?
>
> Cheers
>
> On Wed, Jan 6, 2016 at 2:47 PM, Marcelo Vanzin 
> wrote:
>
>> Try "git grep -i spark.memory.offheap.size"...
>>
>> On Wed, Jan 6, 2016 at 2:45 PM, Ted Yu  wrote:
>> > Maybe I looked in the wrong files - I searched *.scala and *.java files
>> (in
>> > latest Spark 1.6.0 RC) for '.offheap.' but didn't find the config.
>> >
>> > Can someone enlighten me ?
>> >
>> > Thanks
>> >
>> > On Wed, Jan 6, 2016 at 2:35 PM, Jakob Odersky 
>> wrote:
>> >>
>> >> Check the configuration guide for a description on units
>> >> (
>> http://spark.apache.org/docs/latest/configuration.html#spark-properties).
>> >> In your case, 5GB would be specified as 5g.
>> >>
>> >> On 6 January 2016 at 10:29, unk1102  wrote:
>> >>>
>> >>> Hi As part of Spark 1.6 release what should be ideal value or unit for
>> >>> spark.memory.offheap.size I have set as 5000 I assume it will be 5GB
>> is
>> >>> it
>> >>> correct? Please guide.
>> >>>
>> >>>
>> >>>
>> >>> --
>> >>> View this message in context:
>> >>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/What-should-be-the-ideal-value-unit-for-spark-memory-offheap-size-tp25898.html
>> >>> Sent from the Apache Spark User List mailing list archive at
>> Nabble.com.
>> >>>
>> >>> -
>> >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> >>> For additional commands, e-mail: user-h...@spark.apache.org
>> >>>
>> >>
>> >
>>
>>
>>
>> --
>> Marcelo
>>
>
>


Re: Do we need to enabled Tungsten sort in Spark 1.6?

2016-01-08 Thread Umesh Kacha
ok thanks so it will be enabled by default always if yes then in
documentation why default shuffle manager is mentioned as sort?

On Sat, Jan 9, 2016 at 1:55 AM, Ted Yu  wrote:

> From sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
> :
>
> case Some((SQLConf.Deprecated.TUNGSTEN_ENABLED, Some(value))) =>
>   val runFunc = (sqlContext: SQLContext) => {
> logWarning(
>   s"Property ${SQLConf.Deprecated.TUNGSTEN_ENABLED} is deprecated
> and " +
> s"will be ignored. Tungsten will continue to be used.")
> Seq(Row(SQLConf.Deprecated.TUNGSTEN_ENABLED, "true"))
>   }
>
> FYI
>
> On Fri, Jan 8, 2016 at 12:21 PM, unk1102  wrote:
>
>> Hi I was using Spark 1.5 with Tungsten sort and now I have using Spark
>> 1.6 I
>> dont see any difference I was expecting Spark 1.6 to be faster. Anyways do
>> we need to enable Tunsten and unsafe options or they are enabled by
>> default
>> I see in documentation that default sort manager is sort I though it is
>> Tungsten no? Please guide.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Do-we-need-to-enabled-Tungsten-sort-in-Spark-1-6-tp25923.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Why is this job running since one hour?

2016-01-07 Thread Umesh Kacha
Hi thanks for the response. Each Job is processing around 5gb of skewed
data does group by multiple fields and does aggregation and does
coalesce(1) and saves csv file in gzip format. I think coalesce is causing
problem but data is not that huge I don't understand why it keeps on
running for an hour and avoiding other jobs to run. Please guide.
On Jan 7, 2016 3:58 AM, "Jakob Odersky"  wrote:

> What is the job doing? How much data are you processing?
>
> On 6 January 2016 at 10:33, unk1102  wrote:
>
>> Hi I have one main Spark job which spawns multiple child spark jobs. One
>> of
>> the child spark job is running for an hour and it keeps on hanging there I
>> have taken snap shot please see
>> <
>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n25899/Screen_Shot_2016-01-06_at_11.jpg
>> >
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Why-is-this-job-running-since-one-hour-tp25899.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: How to load specific Hive partition in DataFrame Spark 1.6?

2016-01-07 Thread Umesh Kacha
Hi Yin, thanks much your answer solved my problem. Really appreciate it!

Regards


On Fri, Jan 8, 2016 at 1:26 AM, Yin Huai  wrote:

> Hi, we made the change because the partitioning discovery logic was too
> flexible and it introduced problems that were very confusing to users. To
> make your case work, we have introduced a new data source option called
> basePath. You can use
>
> DataFrame df = hiveContext.read().format("orc").option("basePath", "
> path/to/table/").load("path/to/table/entity=xyz")
>
> So, the partitioning discovery logic will understand that the base path is 
> path/to/table/
> and your dataframe will has the column "entity".
>
> You can find the doc at the end of partitioning discovery section of the
> sql programming guide (
> http://spark.apache.org/docs/latest/sql-programming-guide.html#partition-discovery
> ).
>
> Thanks,
>
> Yin
>
> On Thu, Jan 7, 2016 at 7:34 AM, unk1102  wrote:
>
>> Hi from Spark 1.6 onwards as per this  doc
>> <
>> http://spark.apache.org/docs/latest/sql-programming-guide.html#partition-discovery
>> >
>> We cant add specific hive partitions to DataFrame
>>
>> spark 1.5 the following used to work and the following dataframe will have
>> entity column
>>
>> DataFrame df =
>> hiveContext.read().format("orc").load("path/to/table/entity=xyz")
>>
>> But in Spark 1.6 above does not work and I have to give base path like the
>> following but it does not contain entity column which I want in DataFrame
>>
>> DataFrame df = hiveContext.read().format("orc").load("path/to/table/")
>>
>> How do I load specific hive partition in a dataframe? What was the driver
>> behind removing this feature which was efficient I believe now above Spark
>> 1.6 code load all partitions and if I filter for specific partitions it is
>> not efficient it hits memory and throws GC error because of thousands of
>> partitions get loaded into memory and not the specific one please guide.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-load-specific-Hive-partition-in-DataFrame-Spark-1-6-tp25904.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


RE: Spark on Apache Ingnite?

2016-01-05 Thread Umesh Kacha
Hi  Nate thanks much. I have exact same use cases mentioned by you. My
spark job does heavy writing involving  group by and huge data shuffling.
Can you please provide any pointer how can I run my existing spark job
which is running on yarn to make it run on ignite? Please guide. Thanks
again.
On Jan 6, 2016 02:28,  wrote:

> We started playing with Ignite back Hadoop, hive and spark services, and
> looking to move to it as our default for deployment going forward, still
> early but so far its been pretty nice and excited for the flexibility it
> will provide for our particular use cases.
>
> Would say in general its worth looking into if your data workloads are:
>
> a) mix of read/write, or heavy write at times
> b) want write/read access to data from services/apps outside of your spark
> workloads (old Hadoop jobs, custom apps, etc)
> c) have strings of spark jobs that could benefit from caching your data
> across them (think similar usage to tachyon)
> d) you have sparksql queries that could benefit from indexing and
> mutability
> (see pt (a) about mix read/write)
>
> If your data is read exclusive and very batch oriented, and your workloads
> are strictly spark based, benefits will be less and ignite would probably
> act as more of a tachyon replacement as many of the other features outside
> of RDD caching wont be leveraged.
>
>
> -Original Message-
> From: unk1102 [mailto:umesh.ka...@gmail.com]
> Sent: Tuesday, January 5, 2016 10:15 AM
> To: user@spark.apache.org
> Subject: Spark on Apache Ingnite?
>
> Hi has anybody tried and had success with Spark on Apache Ignite seems
> promising? https://ignite.apache.org/
>
>
>
> --
> View this message in context:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-on-Apache-Ingnite-
> tp25884.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional
> commands, e-mail: user-h...@spark.apache.org
>
>
>


Re: coalesce(1).saveAsTextfile() takes forever?

2016-01-05 Thread Umesh Kacha
Hi dataframe has not boolean option for coalesce it is only for RDD I
believe

sourceFrame.coalesce(1,true) //gives compilation error



On Wed, Jan 6, 2016 at 1:38 AM, Alexander Pivovarov 
wrote:

> try coalesce(1, true).
>
> On Tue, Jan 5, 2016 at 11:58 AM, unk1102  wrote:
>
>> hi I am trying to save many partitions of Dataframe into one CSV file and
>> it
>> take forever for large data sets of around 5-6 GB.
>>
>>
>> sourceFrame.coalesce(1).write().format("com.databricks.spark.csv").option("gzip").save("/path/hadoop")
>>
>> For small data above code works well but for large data it hangs forever
>> does not move on because of only one partitions has to shuffle data of GBs
>> please help me
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/coalesce-1-saveAsTextfile-takes-forever-tp25886.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: How to find cause(waiting threads etc) of hanging job for 7 hours?

2016-01-01 Thread Umesh Kacha
Hi thanks I did that and I have attached thread dump images. That was the
intention of my question asking for help to identify which waiting thread
is culprit.

Regards,
Umesh

On Sat, Jan 2, 2016 at 8:38 AM, Prabhu Joseph 
wrote:

> Take thread dump of Executor process several times in a short time period
> and check what each threads are doing at different times which will help to
> identify the expensive sections in user code.
>
> Thanks,
> Prabhu Joseph
>
> On Sat, Jan 2, 2016 at 3:28 AM, unk1102  wrote:
>
>> Sorry please see attached waiting thread log
>>
>> <
>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n25851/Screen_Shot_2016-01-02_at_2.jpg
>> >
>> <
>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n25851/Screen_Shot_2016-01-02_at_2.jpg
>> >
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-find-cause-waiting-threads-etc-of-hanging-job-for-7-hours-tp25850p25851.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Spark DataFrame callUdf does not compile?

2015-12-28 Thread Umesh Kacha
Hi thanks you understood question incorrectly. First of all I am passing
UDF name as String and if you see callUDF arguments then it does not take
string as first argument and if I use callUDF it will throw me exception
saying percentile_approx function not found. And another thing I mentioned
is that it works in Spark scala console so it does not have any problem of
calling it in not expected way. Hope now question is clear.

On Mon, Dec 28, 2015 at 9:21 PM, Hamel Kothari 
wrote:

> Also, if I'm reading correctly, it looks like you're calling "callUdf"
> when what you probably want is "callUDF" (notice the subtle capitalization
> difference). Docs:
> https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/functions.html#callUDF(java.lang.String,%20org.apache.spark.sql.Column..
> .)
>
> On Mon, Dec 28, 2015 at 10:48 AM Hamel Kothari 
> wrote:
>
>> Would you mind sharing more of your code? I can't really see the code
>> that well from the attached screenshot but it appears that "Lit" is
>> capitalized. Not sure what this method actually refers to but the
>> definition in functions.scala is lowercased.
>>
>> Even if that's not it, some more code would be helpful to solving this.
>> Also, since it's a compilation error, if you could share the compilation
>> error that would be very useful.
>>
>> -Hamel
>>
>> On Mon, Dec 28, 2015 at 10:26 AM unk1102  wrote:
>>
>>> <
>>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n25821/Screen_Shot_2015-12-28_at_8.jpg
>>> >
>>>
>>> Hi I am trying to invoke Hive UDF using
>>> dataframe.select(callUdf("percentile_approx",col("C1"),lit(0.25))) but it
>>> does not compile however same call works in Spark scala console I dont
>>> understand why. I am using Spark 1.5.2 maven source in my Java code. I
>>> have
>>> also explicitly added maven dependency hive-exec-1.2.1.spark.jar where
>>> percentile_approx is located but still does not compile code please check
>>> attached code image. Please guide. Thanks in advance.
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-DataFrame-callUdf-does-not-compile-tp25821.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>


Re: Spark DataFrame callUdf does not compile?

2015-12-28 Thread Umesh Kacha
Thanks but I tried everything I want to confirm I am writing code below if
you can compile the following in Java with spark 1.5.2 then great otherwise
nothing is helpful here as I am stumbling with this since last few days.

public class PercentileHiveApproxTestMain {

public static void main(String[] args) {
SparkConf sparkconf = new
SparkConf().setAppName("PercentileHiveApproxTestMain").setMaster("local[*]");
SparkContext sc = new SparkContext(sparkconf);
SqlContext sqlContext = new SqlContext(sc);
//load two column data from csv and create dataframe with columns
C1(int),C0(string)
DataFrame df =
sqlContext.read().format("com.databricks.spark.csv").load("/tmp/df.csv");
df.select(callUdf("percentile_approx",col("C1"),lit(0.25))).show() //does
not compile
}

}

On Mon, Dec 28, 2015 at 9:56 PM, Hamel Kothari <hamelkoth...@gmail.com>
wrote:

> If you scroll further down in the documentation, you will see that callUDF
> does have a version which takes (String, Column...) as arguments: *callUDF
> <https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/functions.html#callUDF(java.lang.String,%20org.apache.spark.sql.Column...)>*
> (java.lang.String udfName, Column
> <https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/Column.html>
> ... cols)
>
> Unfortunately the link I posted above doesn't seem to work because of the
> punctuation in the URL but it is there. If you use "callUdf" from Java with
> a string argument, which is what you seem to be doing, it expects a
> Seq because of the way it is defined in scala. That's also a
> deprecated method anyways.
>
> The reason you're getting the exception is not because that's the wrong
> method to call. It's because the percentile_approx UDF is never registered.
> If you're passing in a UDF by name, you must register it with your SQL
> context as follows (example taken from the documentation of the above
> referenced method):
>
>   import org.apache.spark.sql._
>
>   val df = Seq(("id1", 1), ("id2", 4), ("id3", 5)).toDF("id", "value")
>   val sqlContext = df.sqlContext
>   sqlContext.udf.register("simpleUDF", (v: Int) => v * v)
>   df.select($"id", callUDF("simpleUDF", $"value"))
>
>
>
>
> On Mon, Dec 28, 2015 at 11:08 AM Umesh Kacha <umesh.ka...@gmail.com>
> wrote:
>
>> Hi thanks you understood question incorrectly. First of all I am passing
>> UDF name as String and if you see callUDF arguments then it does not take
>> string as first argument and if I use callUDF it will throw me exception
>> saying percentile_approx function not found. And another thing I mentioned
>> is that it works in Spark scala console so it does not have any problem of
>> calling it in not expected way. Hope now question is clear.
>>
>> On Mon, Dec 28, 2015 at 9:21 PM, Hamel Kothari <hamelkoth...@gmail.com>
>> wrote:
>>
>>> Also, if I'm reading correctly, it looks like you're calling "callUdf"
>>> when what you probably want is "callUDF" (notice the subtle capitalization
>>> difference). Docs:
>>> https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/functions.html#callUDF(java.lang.String,%20org.apache.spark.sql.Column..
>>> .)
>>>
>>> On Mon, Dec 28, 2015 at 10:48 AM Hamel Kothari <hamelkoth...@gmail.com>
>>> wrote:
>>>
>>>> Would you mind sharing more of your code? I can't really see the code
>>>> that well from the attached screenshot but it appears that "Lit" is
>>>> capitalized. Not sure what this method actually refers to but the
>>>> definition in functions.scala is lowercased.
>>>>
>>>> Even if that's not it, some more code would be helpful to solving this.
>>>> Also, since it's a compilation error, if you could share the compilation
>>>> error that would be very useful.
>>>>
>>>> -Hamel
>>>>
>>>> On Mon, Dec 28, 2015 at 10:26 AM unk1102 <umesh.ka...@gmail.com> wrote:
>>>>
>>>>> <
>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n25821/Screen_Shot_2015-12-28_at_8.jpg
>>>>> >
>>>>>
>>>>> Hi I am trying to invoke Hive UDF using
>>>>> dataframe.select(callUdf("percentile_approx",col("C1"),lit(0.25))) but
>>>>> it
>>>>> does not compile however same call works in Spark scala console I dont
>>>>> understand why. I am using Spark 1.5.2 maven source in my Java code. I
>>>>> have
>>>>> also explicitly added maven dependency hive-exec-1.2.1.spark.jar where
>>>>> percentile_approx is located but still does not compile code please
>>>>> check
>>>>> attached code image. Please guide. Thanks in advance.
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> View this message in context:
>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-DataFrame-callUdf-does-not-compile-tp25821.html
>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>> Nabble.com.
>>>>>
>>>>> -
>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>>
>>>>>
>>


Re: How to make this Spark 1.5.2 code fast and shuffle less data

2015-12-10 Thread Umesh Kacha
Hi Benyi thanks for the reply yes I call each hive partition/ hdfs
directory in one thread so that I can make it faster if I dont use threads
then job is even more slow. Like I mentioned I have to process 2000 hive
partitions so 2000 hdfs direcotories containing ORC files right? If I dont
use threads then these 2000 directories will get processed one by one. By
using Executor Service threads I can make it faster by using thread pool of
20 jobs so that at a time 20 jobs are running in one main job.

On Fri, Dec 11, 2015 at 12:49 AM, Benyi Wang  wrote:

> I don't understand this: "I have the following method code which I call it
> from a thread spawn from spark driver. So in this case 2000 threads ..."
>
> Why do you call it from a thread?
> Are you process one partition in one thread?
>
> On Thu, Dec 10, 2015 at 11:13 AM, Benyi Wang 
> wrote:
>
>> DataFrame filterFrame1 = 
>> sourceFrame.filter(col("col1").contains("xyz"));DataFrame frameToProcess = 
>> sourceFrame.except(filterFrame1);
>>
>> except is really expensive. Do you actually want this:
>>
>> sourceFrame.filter(! col("col1").contains("xyz"))
>>
>> ​
>>
>> On Thu, Dec 10, 2015 at 9:57 AM, unk1102  wrote:
>>
>>> Hi I have spark job which reads Hive-ORC data and processes and
>>> generates csv
>>> file in the end. Now this ORC files are hive partitions and I have around
>>> 2000 partitions to process every day. These hive partitions size is
>>> around
>>> 800 GB in HDFS. I have the following method code which I call it from a
>>> thread spawn from spark driver. So in this case 2000 threads gets
>>> processed
>>> and those runs painfully slow around 12 hours making huge data shuffling
>>> each executor shuffles around 50 GB of data. I am using 40 executors of 4
>>> core and 30 GB memory each. I am using Hadoop 2.6 and Spark 1.5.2
>>> release.
>>>
>>> public void callThisFromThread() {
>>> DataFrame sourceFrame =
>>> hiveContext.read().format("orc").load("/path/in/hdfs");
>>> DataFrame filterFrame1 = sourceFrame.filter(col("col1").contains("xyz"));
>>> DataFrame frameToProcess = sourceFrame.except(filterFrame1);
>>> JavaRDD updatedRDD = frameToProcess.toJavaRDD().mapPartitions() {
>>> .
>>> }
>>> DataFrame updatedFrame =
>>> hiveContext.createDataFrame(updatedRdd,sourceFrame.schema());
>>> DataFrame selectFrame = updatedFrame.select("col1","col2...","col8");
>>> DataFrame groupFrame =
>>> selectFrame.groupBy("col1","col2","col8").agg("..");//8 column
>>> group
>>> by
>>> groupFrame.coalesec(1).save();//save as csv only one file so coalesce(1)
>>> }
>>>
>>> Please guide me how can I optimize above code I cant avoid group by
>>> which is
>>> evil I know I have to do group on 8 fields mentioned above.
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-make-this-Spark-1-5-2-code-fast-and-shuffle-less-data-tp25671.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>


Re: callUdf("percentile_approx",col("mycol"),lit(0.25)) does not compile spark 1.5.1 source but it does work in spark 1.5.1 bin

2015-11-02 Thread Umesh Kacha
Hi Ted I checked  hive-exec-1.2.1.spark.jar contains the following required
classes but still it doesn't compile I don't understand why is this Jar
getting overwritten in scope

org/apache/hadoop/hive/ql/udf/generic/GenericUDAFPercentileApprox$GenericUDAFMultiplePercentileApproxEvaluator.class

Please guide.

On Mon, Oct 19, 2015 at 4:30 PM, Umesh Kacha <umesh.ka...@gmail.com> wrote:

> Hi Ted thanks much for your help really appreciate it. I tried to use
> maven dependencies you mentioned but still callUdf is not compiling please
> find snap shot of my intellij editor. I am sorry you may have to zoom
> pictures as I can't share code. Thanks again.
> On Oct 19, 2015 8:32 AM, "Ted Yu" <yuzhih...@gmail.com> wrote:
>
>> Umesh:
>>
>> $ jar tvf
>> /home/hbase/.m2/repository/org/spark-project/hive/hive-exec/1.2.1.spark/hive-exec-1.2.1.spark.jar
>> | grep GenericUDAFPercentile
>>   2143 Fri Jul 31 23:51:48 PDT 2015
>> org/apache/hadoop/hive/ql/udf/generic/GenericUDAFPercentileApprox$1.class
>>   4602 Fri Jul 31 23:51:48 PDT 2015
>> org/apache/hadoop/hive/ql/udf/generic/GenericUDAFPercentileApprox$GenericUDAFMultiplePercentileApproxEvaluator.class
>>
>> As long as the following dependency is in your pom.xml:
>> [INFO] +- org.spark-project.hive:hive-exec:jar:1.2.1.spark:compile
>>
>> You should be able to invoke percentile_approx
>>
>> Cheers
>>
>> On Sun, Oct 18, 2015 at 8:58 AM, Umesh Kacha <umesh.ka...@gmail.com>
>> wrote:
>>
>>> Thanks much Ted so when do we get to use this sparkUdf in Java code
>>> using maven code dependencies?? You said JIRA 10671 is not pushed as
>>> part of 1.5.1 so it should be released in 1.6.0 as mentioned in the JIRA
>>> right?
>>>
>>> On Sun, Oct 18, 2015 at 9:20 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>>
>>>> The udf is defined in GenericUDAFPercentileApprox of hive.
>>>>
>>>> When spark-shell runs, it has access to the above class which is
>>>> packaged
>>>> in assembly/target/scala-2.10/spark-assembly-1.6.0-SNAPSHOT-hadoop2.7.0.jar
>>>> :
>>>>
>>>>   2143 Fri Oct 16 15:02:26 PDT 2015
>>>> org/apache/hadoop/hive/ql/udf/generic/GenericUDAFPercentileApprox$1.class
>>>>   4602 Fri Oct 16 15:02:26 PDT 2015
>>>> org/apache/hadoop/hive/ql/udf/generic/GenericUDAFPercentileApprox$GenericUDAFMultiplePercentileApproxEvaluator.class
>>>>   1697 Fri Oct 16 15:02:26 PDT 2015
>>>> org/apache/hadoop/hive/ql/udf/generic/GenericUDAFPercentileApprox$GenericUDAFPercentileApproxEvaluator$PercentileAggBuf.class
>>>>   6570 Fri Oct 16 15:02:26 PDT 2015
>>>> org/apache/hadoop/hive/ql/udf/generic/GenericUDAFPercentileApprox$GenericUDAFPercentileApproxEvaluator.class
>>>>   4334 Fri Oct 16 15:02:26 PDT 2015
>>>> org/apache/hadoop/hive/ql/udf/generic/GenericUDAFPercentileApprox$GenericUDAFSinglePercentileApproxEvaluator.class
>>>>   6293 Fri Oct 16 15:02:26 PDT 2015
>>>> org/apache/hadoop/hive/ql/udf/generic/GenericUDAFPercentileApprox.class
>>>>
>>>> That was the cause for different behavior.
>>>>
>>>> FYI
>>>>
>>>> On Sun, Oct 18, 2015 at 12:10 AM, unk1102 <umesh.ka...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi starting new thread following old thread looks like code for
>>>>> compiling
>>>>> callUdf("percentile_approx",col("mycol"),lit(0.25)) is not merged in
>>>>> spark
>>>>> 1.5.1 source but I dont understand why this function call works in
>>>>> Spark
>>>>> 1.5.1 spark-shell/bin. Please guide.
>>>>>
>>>>> -- Forwarded message --
>>>>> From: "Ted Yu" <yuzhih...@gmail.com>
>>>>> Date: Oct 14, 2015 3:26 AM
>>>>> Subject: Re: How to calculate percentile of a column of DataFrame?
>>>>> To: "Umesh Kacha" <umesh.ka...@gmail.com>
>>>>> Cc: "Michael Armbrust" <mich...@databricks.com>,
>>>>> "saif.a.ell...@wellsfargo.com" <saif.a.ell...@wellsfargo.com>,
>>>>> "user" <user@spark.apache.org>
>>>>>
>>>>> I modified DataFrameSuite, in master branch, to call percentile_approx
>>>>> instead of simpleUDF :
>>>>>
>>>>> - deprecated callUdf in SQLContext
>>>>> - callUDF in SQLContext *** FAI

Re: callUdf("percentile_approx",col("mycol"),lit(0.25)) does not compile spark 1.5.1 source but it does work in spark 1.5.1 bin

2015-10-18 Thread Umesh Kacha
Thanks much Ted so when do we get to use this sparkUdf in Java code using
maven code dependencies?? You said JIRA 10671 is not pushed as part of
1.5.1 so it should be released in 1.6.0 as mentioned in the JIRA right?

On Sun, Oct 18, 2015 at 9:20 PM, Ted Yu <yuzhih...@gmail.com> wrote:

> The udf is defined in GenericUDAFPercentileApprox of hive.
>
> When spark-shell runs, it has access to the above class which is packaged
> in assembly/target/scala-2.10/spark-assembly-1.6.0-SNAPSHOT-hadoop2.7.0.jar
> :
>
>   2143 Fri Oct 16 15:02:26 PDT 2015
> org/apache/hadoop/hive/ql/udf/generic/GenericUDAFPercentileApprox$1.class
>   4602 Fri Oct 16 15:02:26 PDT 2015
> org/apache/hadoop/hive/ql/udf/generic/GenericUDAFPercentileApprox$GenericUDAFMultiplePercentileApproxEvaluator.class
>   1697 Fri Oct 16 15:02:26 PDT 2015
> org/apache/hadoop/hive/ql/udf/generic/GenericUDAFPercentileApprox$GenericUDAFPercentileApproxEvaluator$PercentileAggBuf.class
>   6570 Fri Oct 16 15:02:26 PDT 2015
> org/apache/hadoop/hive/ql/udf/generic/GenericUDAFPercentileApprox$GenericUDAFPercentileApproxEvaluator.class
>   4334 Fri Oct 16 15:02:26 PDT 2015
> org/apache/hadoop/hive/ql/udf/generic/GenericUDAFPercentileApprox$GenericUDAFSinglePercentileApproxEvaluator.class
>   6293 Fri Oct 16 15:02:26 PDT 2015
> org/apache/hadoop/hive/ql/udf/generic/GenericUDAFPercentileApprox.class
>
> That was the cause for different behavior.
>
> FYI
>
> On Sun, Oct 18, 2015 at 12:10 AM, unk1102 <umesh.ka...@gmail.com> wrote:
>
>> Hi starting new thread following old thread looks like code for compiling
>> callUdf("percentile_approx",col("mycol"),lit(0.25)) is not merged in spark
>> 1.5.1 source but I dont understand why this function call works in Spark
>> 1.5.1 spark-shell/bin. Please guide.
>>
>> -- Forwarded message --
>> From: "Ted Yu" <yuzhih...@gmail.com>
>> Date: Oct 14, 2015 3:26 AM
>> Subject: Re: How to calculate percentile of a column of DataFrame?
>> To: "Umesh Kacha" <umesh.ka...@gmail.com>
>> Cc: "Michael Armbrust" <mich...@databricks.com>,
>> "saif.a.ell...@wellsfargo.com" <saif.a.ell...@wellsfargo.com>,
>> "user" <user@spark.apache.org>
>>
>> I modified DataFrameSuite, in master branch, to call percentile_approx
>> instead of simpleUDF :
>>
>> - deprecated callUdf in SQLContext
>> - callUDF in SQLContext *** FAILED ***
>>   org.apache.spark.sql.AnalysisException: undefined function
>> percentile_approx;
>>   at
>>
>> org.apache.spark.sql.catalyst.analysis.SimpleFunctionRegistry$$anonfun$2.apply(FunctionRegistry.scala:64)
>>   at
>>
>> org.apache.spark.sql.catalyst.analysis.SimpleFunctionRegistry$$anonfun$2.apply(FunctionRegistry.scala:64)
>>   at scala.Option.getOrElse(Option.scala:120)
>>   at
>>
>> org.apache.spark.sql.catalyst.analysis.SimpleFunctionRegistry.lookupFunction(FunctionRegistry.scala:63)
>>   at
>>
>> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$10$$anonfun$applyOrElse$5$$anonfun$applyOrElse$24.apply(Analyzer.scala:506)
>>   at
>>
>> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$10$$anonfun$applyOrElse$5$$anonfun$applyOrElse$24.apply(Analyzer.scala:506)
>>   at
>>
>> org.apache.spark.sql.catalyst.analysis.package$.withPosition(package.scala:48)
>>   at
>>
>> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$10$$anonfun$applyOrElse$5.applyOrElse(Analyzer.scala:505)
>>   at
>>
>> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$10$$anonfun$applyOrElse$5.applyOrElse(Analyzer.scala:502)
>>   at
>>
>> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:227)
>>
>> SPARK-10671 is included.
>> For 1.5.1, I guess the absence of SPARK-10671 means that SparkSQL treats
>> percentile_approx as normal UDF.
>>
>> Experts can correct me, if there is any misunderstanding.
>>
>> Cheers
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/callUdf-percentile-approx-col-mycol-lit-0-25-does-not-compile-spark-1-5-1-source-but-it-does-work-inn-tp25111.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: How to calculate percentile of a column of DataFrame?

2015-10-14 Thread Umesh Kacha
Hi Ted thanks much for your help. So fix is in JIRA 10671 and it is suppose
to release in spark 1.6.0 right? Until 1.6.0 is released I won't be able to
invoke callUdf using string and percentile_approx with lit as argument
right
On Oct 14, 2015 03:26, "Ted Yu" <yuzhih...@gmail.com> wrote:

> I modified DataFrameSuite, in master branch, to call percentile_approx
> instead of simpleUDF :
>
> - deprecated callUdf in SQLContext
> - callUDF in SQLContext *** FAILED ***
>   org.apache.spark.sql.AnalysisException: undefined function
> percentile_approx;
>   at
> org.apache.spark.sql.catalyst.analysis.SimpleFunctionRegistry$$anonfun$2.apply(FunctionRegistry.scala:64)
>   at
> org.apache.spark.sql.catalyst.analysis.SimpleFunctionRegistry$$anonfun$2.apply(FunctionRegistry.scala:64)
>   at scala.Option.getOrElse(Option.scala:120)
>   at
> org.apache.spark.sql.catalyst.analysis.SimpleFunctionRegistry.lookupFunction(FunctionRegistry.scala:63)
>   at
> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$10$$anonfun$applyOrElse$5$$anonfun$applyOrElse$24.apply(Analyzer.scala:506)
>   at
> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$10$$anonfun$applyOrElse$5$$anonfun$applyOrElse$24.apply(Analyzer.scala:506)
>   at
> org.apache.spark.sql.catalyst.analysis.package$.withPosition(package.scala:48)
>   at
> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$10$$anonfun$applyOrElse$5.applyOrElse(Analyzer.scala:505)
>   at
> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$10$$anonfun$applyOrElse$5.applyOrElse(Analyzer.scala:502)
>   at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:227)
>
> SPARK-10671 is included.
> For 1.5.1, I guess the absence of SPARK-10671 means that SparkSQL
> treats percentile_approx as normal UDF.
>
> Experts can correct me, if there is any misunderstanding.
>
> Cheers
>
> On Tue, Oct 13, 2015 at 6:09 AM, Umesh Kacha <umesh.ka...@gmail.com>
> wrote:
>
>> Hi Ted I am using the following line of code I can't paste entire code
>> sorry but the following only line doesn't compile in my spark job
>>
>>  sourceframe.select(callUDF("percentile_approx",col("mycol"), lit(0.25)))
>>
>> I am using Intellij editor java and maven dependencies of spark core
>> spark sql spark hive version 1.5.1
>> On Oct 13, 2015 18:21, "Ted Yu" <yuzhih...@gmail.com> wrote:
>>
>>> Can you pastebin your Java code and the command you used to compile ?
>>>
>>> Thanks
>>>
>>> On Oct 13, 2015, at 1:42 AM, Umesh Kacha <umesh.ka...@gmail.com> wrote:
>>>
>>> Hi Ted if fix went after 1.5.1 release then how come it's working with
>>> 1.5.1 binary in spark-shell.
>>> On Oct 13, 2015 1:32 PM, "Ted Yu" <yuzhih...@gmail.com> wrote:
>>>
>>>> Looks like the fix went in after 1.5.1 was released.
>>>>
>>>> You may verify using master branch build.
>>>>
>>>> Cheers
>>>>
>>>> On Oct 13, 2015, at 12:21 AM, Umesh Kacha <umesh.ka...@gmail.com>
>>>> wrote:
>>>>
>>>> Hi Ted, thanks much I tried using percentile_approx in Spark-shell like
>>>> you mentioned it works using 1.5.1 but it doesn't compile in Java using
>>>> 1.5.1 maven libraries it still complains same that callUdf can have string
>>>> and column types only. Please guide.
>>>> On Oct 13, 2015 12:34 AM, "Ted Yu" <yuzhih...@gmail.com> wrote:
>>>>
>>>>> SQL context available as sqlContext.
>>>>>
>>>>> scala> val df = Seq(("id1", 1), ("id2", 4), ("id3", 5)).toDF("id",
>>>>> "value")
>>>>> df: org.apache.spark.sql.DataFrame = [id: string, value: int]
>>>>>
>>>>> scala> df.select(callUDF("percentile_approx",col("value"),
>>>>> lit(0.25))).show()
>>>>> +--+
>>>>> |'percentile_approx(value,0.25)|
>>>>> +--+
>>>>> |   1.0|
>>>>> +--+
>>>>>
>>>>> Can you upgrade to 1.5.1 ?
>>>>>
>>>>> Cheers
>>>>>
>>>>> On Mon, Oct 12, 2015 at 11:55 AM, Umesh Kacha <umesh.ka...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Sorry fo

Re: How to calculate percentile of a column of DataFrame?

2015-10-13 Thread Umesh Kacha
Hi Ted if fix went after 1.5.1 release then how come it's working with
1.5.1 binary in spark-shell.
On Oct 13, 2015 1:32 PM, "Ted Yu" <yuzhih...@gmail.com> wrote:

> Looks like the fix went in after 1.5.1 was released.
>
> You may verify using master branch build.
>
> Cheers
>
> On Oct 13, 2015, at 12:21 AM, Umesh Kacha <umesh.ka...@gmail.com> wrote:
>
> Hi Ted, thanks much I tried using percentile_approx in Spark-shell like
> you mentioned it works using 1.5.1 but it doesn't compile in Java using
> 1.5.1 maven libraries it still complains same that callUdf can have string
> and column types only. Please guide.
> On Oct 13, 2015 12:34 AM, "Ted Yu" <yuzhih...@gmail.com> wrote:
>
>> SQL context available as sqlContext.
>>
>> scala> val df = Seq(("id1", 1), ("id2", 4), ("id3", 5)).toDF("id",
>> "value")
>> df: org.apache.spark.sql.DataFrame = [id: string, value: int]
>>
>> scala> df.select(callUDF("percentile_approx",col("value"),
>> lit(0.25))).show()
>> +--+
>> |'percentile_approx(value,0.25)|
>> +--+
>> |   1.0|
>> +--+
>>
>> Can you upgrade to 1.5.1 ?
>>
>> Cheers
>>
>> On Mon, Oct 12, 2015 at 11:55 AM, Umesh Kacha <umesh.ka...@gmail.com>
>> wrote:
>>
>>> Sorry forgot to tell that I am using Spark 1.4.1 as callUdf is available
>>> in Spark 1.4.0 as per JAvadocx
>>>
>>> On Tue, Oct 13, 2015 at 12:22 AM, Umesh Kacha <umesh.ka...@gmail.com>
>>> wrote:
>>>
>>>> Hi Ted thanks much for the detailed answer and appreciate your efforts.
>>>> Do we need to register Hive UDFs?
>>>>
>>>> sqlContext.udf.register("percentile_approx");???//is it valid?
>>>>
>>>> I am calling Hive UDF percentile_approx in the following manner which
>>>> gives compilation error
>>>>
>>>> df.select("col1").groupby("col1").agg(callUdf("percentile_approx",col("col1"),lit(0.25)));//compile
>>>> error
>>>>
>>>> //compile error because callUdf() takes String and Column* as arguments.
>>>>
>>>> Please guide. Thanks much.
>>>>
>>>> On Mon, Oct 12, 2015 at 11:44 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>>>
>>>>> Using spark-shell, I did the following exercise (master branch) :
>>>>>
>>>>>
>>>>> SQL context available as sqlContext.
>>>>>
>>>>> scala> val df = Seq(("id1", 1), ("id2", 4), ("id3", 5)).toDF("id",
>>>>> "value")
>>>>> df: org.apache.spark.sql.DataFrame = [id: string, value: int]
>>>>>
>>>>> scala> sqlContext.udf.register("simpleUDF", (v: Int, cnst: Int) => v *
>>>>> v + cnst)
>>>>> res0: org.apache.spark.sql.UserDefinedFunction =
>>>>> UserDefinedFunction(,IntegerType,List())
>>>>>
>>>>> scala> df.select($"id", callUDF("simpleUDF", $"value", lit(25))).show()
>>>>> +---++
>>>>> | id|'simpleUDF(value,25)|
>>>>> +---++
>>>>> |id1|  26|
>>>>> |id2|  41|
>>>>> |id3|  50|
>>>>> +---++
>>>>>
>>>>> Which Spark release are you using ?
>>>>>
>>>>> Can you pastebin the full stack trace where you got the error ?
>>>>>
>>>>> Cheers
>>>>>
>>>>> On Fri, Oct 9, 2015 at 1:09 PM, Umesh Kacha <umesh.ka...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> I have a doubt Michael I tried to use callUDF in  the following code
>>>>>> it does not work.
>>>>>>
>>>>>> sourceFrame.agg(callUdf("percentile_approx",col("myCol"),lit(0.25)))
>>>>>>
>>>>>> Above code does not compile because callUdf() takes only two
>>>>>> arguments function name in String and Column class type. Please guide.
>>>>>>
>>>>>> On Sat, Oct 10, 2015 at 1:29 AM, Umesh Kacha <umesh.ka...@gmail.com>
>>>>>> wrote:
>>

Re: How to calculate percentile of a column of DataFrame?

2015-10-13 Thread Umesh Kacha
Hi Ted, thanks much I tried using percentile_approx in Spark-shell like you
mentioned it works using 1.5.1 but it doesn't compile in Java using 1.5.1
maven libraries it still complains same that callUdf can have string and
column types only. Please guide.
On Oct 13, 2015 12:34 AM, "Ted Yu" <yuzhih...@gmail.com> wrote:

> SQL context available as sqlContext.
>
> scala> val df = Seq(("id1", 1), ("id2", 4), ("id3", 5)).toDF("id", "value")
> df: org.apache.spark.sql.DataFrame = [id: string, value: int]
>
> scala> df.select(callUDF("percentile_approx",col("value"),
> lit(0.25))).show()
> +--+
> |'percentile_approx(value,0.25)|
> +--+
> |   1.0|
> +--+
>
> Can you upgrade to 1.5.1 ?
>
> Cheers
>
> On Mon, Oct 12, 2015 at 11:55 AM, Umesh Kacha <umesh.ka...@gmail.com>
> wrote:
>
>> Sorry forgot to tell that I am using Spark 1.4.1 as callUdf is available
>> in Spark 1.4.0 as per JAvadocx
>>
>> On Tue, Oct 13, 2015 at 12:22 AM, Umesh Kacha <umesh.ka...@gmail.com>
>> wrote:
>>
>>> Hi Ted thanks much for the detailed answer and appreciate your efforts.
>>> Do we need to register Hive UDFs?
>>>
>>> sqlContext.udf.register("percentile_approx");???//is it valid?
>>>
>>> I am calling Hive UDF percentile_approx in the following manner which
>>> gives compilation error
>>>
>>> df.select("col1").groupby("col1").agg(callUdf("percentile_approx",col("col1"),lit(0.25)));//compile
>>> error
>>>
>>> //compile error because callUdf() takes String and Column* as arguments.
>>>
>>> Please guide. Thanks much.
>>>
>>> On Mon, Oct 12, 2015 at 11:44 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>>
>>>> Using spark-shell, I did the following exercise (master branch) :
>>>>
>>>>
>>>> SQL context available as sqlContext.
>>>>
>>>> scala> val df = Seq(("id1", 1), ("id2", 4), ("id3", 5)).toDF("id",
>>>> "value")
>>>> df: org.apache.spark.sql.DataFrame = [id: string, value: int]
>>>>
>>>> scala> sqlContext.udf.register("simpleUDF", (v: Int, cnst: Int) => v *
>>>> v + cnst)
>>>> res0: org.apache.spark.sql.UserDefinedFunction =
>>>> UserDefinedFunction(,IntegerType,List())
>>>>
>>>> scala> df.select($"id", callUDF("simpleUDF", $"value", lit(25))).show()
>>>> +---++
>>>> | id|'simpleUDF(value,25)|
>>>> +---++
>>>> |id1|  26|
>>>> |id2|  41|
>>>> |id3|  50|
>>>> +---++
>>>>
>>>> Which Spark release are you using ?
>>>>
>>>> Can you pastebin the full stack trace where you got the error ?
>>>>
>>>> Cheers
>>>>
>>>> On Fri, Oct 9, 2015 at 1:09 PM, Umesh Kacha <umesh.ka...@gmail.com>
>>>> wrote:
>>>>
>>>>> I have a doubt Michael I tried to use callUDF in  the following code
>>>>> it does not work.
>>>>>
>>>>> sourceFrame.agg(callUdf("percentile_approx",col("myCol"),lit(0.25)))
>>>>>
>>>>> Above code does not compile because callUdf() takes only two arguments
>>>>> function name in String and Column class type. Please guide.
>>>>>
>>>>> On Sat, Oct 10, 2015 at 1:29 AM, Umesh Kacha <umesh.ka...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> thanks much Michael let me try.
>>>>>>
>>>>>> On Sat, Oct 10, 2015 at 1:20 AM, Michael Armbrust <
>>>>>> mich...@databricks.com> wrote:
>>>>>>
>>>>>>> This is confusing because I made a typo...
>>>>>>>
>>>>>>> callUDF("percentile_approx", col("mycol"), lit(0.25))
>>>>>>>
>>>>>>> The first argument is the name of the UDF, all other arguments need
>>>>>>> to be columns that are passed in as arguments.  lit is just saying to 
>>>>>>> make
>>>>>>> a literal column that alw

Re: How to calculate percentile of a column of DataFrame?

2015-10-13 Thread Umesh Kacha
OK thanks much Ted looks like some issue while using maven dependencies in
Java code for 1.5.1. I am still not able to understand if spark 1.5.1
binary in spark-shell can recognize callUdf then why not callUdf not
getting compiled while using maven build.
On Oct 13, 2015 2:20 PM, "Ted Yu" <yuzhih...@gmail.com> wrote:

> Pardon me.
> I didn't read your previous response clearly.
>
> I will try to reproduce the compilation error on master branch.
> Right now, I have some other high priority task on hand.
>
> BTW I was looking at SPARK-10671
>
> FYI
>
> On Tue, Oct 13, 2015 at 1:42 AM, Umesh Kacha <umesh.ka...@gmail.com>
> wrote:
>
>> Hi Ted if fix went after 1.5.1 release then how come it's working with
>> 1.5.1 binary in spark-shell.
>> On Oct 13, 2015 1:32 PM, "Ted Yu" <yuzhih...@gmail.com> wrote:
>>
>>> Looks like the fix went in after 1.5.1 was released.
>>>
>>> You may verify using master branch build.
>>>
>>> Cheers
>>>
>>> On Oct 13, 2015, at 12:21 AM, Umesh Kacha <umesh.ka...@gmail.com> wrote:
>>>
>>> Hi Ted, thanks much I tried using percentile_approx in Spark-shell like
>>> you mentioned it works using 1.5.1 but it doesn't compile in Java using
>>> 1.5.1 maven libraries it still complains same that callUdf can have string
>>> and column types only. Please guide.
>>> On Oct 13, 2015 12:34 AM, "Ted Yu" <yuzhih...@gmail.com> wrote:
>>>
>>>> SQL context available as sqlContext.
>>>>
>>>> scala> val df = Seq(("id1", 1), ("id2", 4), ("id3", 5)).toDF("id",
>>>> "value")
>>>> df: org.apache.spark.sql.DataFrame = [id: string, value: int]
>>>>
>>>> scala> df.select(callUDF("percentile_approx",col("value"),
>>>> lit(0.25))).show()
>>>> +--+
>>>> |'percentile_approx(value,0.25)|
>>>> +--+
>>>> |   1.0|
>>>> +--+
>>>>
>>>> Can you upgrade to 1.5.1 ?
>>>>
>>>> Cheers
>>>>
>>>> On Mon, Oct 12, 2015 at 11:55 AM, Umesh Kacha <umesh.ka...@gmail.com>
>>>> wrote:
>>>>
>>>>> Sorry forgot to tell that I am using Spark 1.4.1 as callUdf is
>>>>> available in Spark 1.4.0 as per JAvadocx
>>>>>
>>>>> On Tue, Oct 13, 2015 at 12:22 AM, Umesh Kacha <umesh.ka...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Ted thanks much for the detailed answer and appreciate your
>>>>>> efforts. Do we need to register Hive UDFs?
>>>>>>
>>>>>> sqlContext.udf.register("percentile_approx");???//is it valid?
>>>>>>
>>>>>> I am calling Hive UDF percentile_approx in the following manner which
>>>>>> gives compilation error
>>>>>>
>>>>>> df.select("col1").groupby("col1").agg(callUdf("percentile_approx",col("col1"),lit(0.25)));//compile
>>>>>> error
>>>>>>
>>>>>> //compile error because callUdf() takes String and Column* as
>>>>>> arguments.
>>>>>>
>>>>>> Please guide. Thanks much.
>>>>>>
>>>>>> On Mon, Oct 12, 2015 at 11:44 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>>>>>
>>>>>>> Using spark-shell, I did the following exercise (master branch) :
>>>>>>>
>>>>>>>
>>>>>>> SQL context available as sqlContext.
>>>>>>>
>>>>>>> scala> val df = Seq(("id1", 1), ("id2", 4), ("id3", 5)).toDF("id",
>>>>>>> "value")
>>>>>>> df: org.apache.spark.sql.DataFrame = [id: string, value: int]
>>>>>>>
>>>>>>> scala> sqlContext.udf.register("simpleUDF", (v: Int, cnst: Int) => v
>>>>>>> * v + cnst)
>>>>>>> res0: org.apache.spark.sql.UserDefinedFunction =
>>>>>>> UserDefinedFunction(,IntegerType,List())
>>>>>>>
>>>>>>> scala> df.select($"id", callUDF("simpleUDF", $"value",
>>>>>>> lit(25))).show()
>>&g

Re: How to calculate percentile of a column of DataFrame?

2015-10-13 Thread Umesh Kacha
Hi Ted I am using the following line of code I can't paste entire code
sorry but the following only line doesn't compile in my spark job

 sourceframe.select(callUDF("percentile_approx",col("mycol"), lit(0.25)))

I am using Intellij editor java and maven dependencies of spark core spark
sql spark hive version 1.5.1
On Oct 13, 2015 18:21, "Ted Yu" <yuzhih...@gmail.com> wrote:

> Can you pastebin your Java code and the command you used to compile ?
>
> Thanks
>
> On Oct 13, 2015, at 1:42 AM, Umesh Kacha <umesh.ka...@gmail.com> wrote:
>
> Hi Ted if fix went after 1.5.1 release then how come it's working with
> 1.5.1 binary in spark-shell.
> On Oct 13, 2015 1:32 PM, "Ted Yu" <yuzhih...@gmail.com> wrote:
>
>> Looks like the fix went in after 1.5.1 was released.
>>
>> You may verify using master branch build.
>>
>> Cheers
>>
>> On Oct 13, 2015, at 12:21 AM, Umesh Kacha <umesh.ka...@gmail.com> wrote:
>>
>> Hi Ted, thanks much I tried using percentile_approx in Spark-shell like
>> you mentioned it works using 1.5.1 but it doesn't compile in Java using
>> 1.5.1 maven libraries it still complains same that callUdf can have string
>> and column types only. Please guide.
>> On Oct 13, 2015 12:34 AM, "Ted Yu" <yuzhih...@gmail.com> wrote:
>>
>>> SQL context available as sqlContext.
>>>
>>> scala> val df = Seq(("id1", 1), ("id2", 4), ("id3", 5)).toDF("id",
>>> "value")
>>> df: org.apache.spark.sql.DataFrame = [id: string, value: int]
>>>
>>> scala> df.select(callUDF("percentile_approx",col("value"),
>>> lit(0.25))).show()
>>> +--+
>>> |'percentile_approx(value,0.25)|
>>> +--+
>>> |   1.0|
>>> +--+
>>>
>>> Can you upgrade to 1.5.1 ?
>>>
>>> Cheers
>>>
>>> On Mon, Oct 12, 2015 at 11:55 AM, Umesh Kacha <umesh.ka...@gmail.com>
>>> wrote:
>>>
>>>> Sorry forgot to tell that I am using Spark 1.4.1 as callUdf is
>>>> available in Spark 1.4.0 as per JAvadocx
>>>>
>>>> On Tue, Oct 13, 2015 at 12:22 AM, Umesh Kacha <umesh.ka...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Ted thanks much for the detailed answer and appreciate your
>>>>> efforts. Do we need to register Hive UDFs?
>>>>>
>>>>> sqlContext.udf.register("percentile_approx");???//is it valid?
>>>>>
>>>>> I am calling Hive UDF percentile_approx in the following manner which
>>>>> gives compilation error
>>>>>
>>>>> df.select("col1").groupby("col1").agg(callUdf("percentile_approx",col("col1"),lit(0.25)));//compile
>>>>> error
>>>>>
>>>>> //compile error because callUdf() takes String and Column* as
>>>>> arguments.
>>>>>
>>>>> Please guide. Thanks much.
>>>>>
>>>>> On Mon, Oct 12, 2015 at 11:44 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>>>>
>>>>>> Using spark-shell, I did the following exercise (master branch) :
>>>>>>
>>>>>>
>>>>>> SQL context available as sqlContext.
>>>>>>
>>>>>> scala> val df = Seq(("id1", 1), ("id2", 4), ("id3", 5)).toDF("id",
>>>>>> "value")
>>>>>> df: org.apache.spark.sql.DataFrame = [id: string, value: int]
>>>>>>
>>>>>> scala> sqlContext.udf.register("simpleUDF", (v: Int, cnst: Int) => v
>>>>>> * v + cnst)
>>>>>> res0: org.apache.spark.sql.UserDefinedFunction =
>>>>>> UserDefinedFunction(,IntegerType,List())
>>>>>>
>>>>>> scala> df.select($"id", callUDF("simpleUDF", $"value",
>>>>>> lit(25))).show()
>>>>>> +---++
>>>>>> | id|'simpleUDF(value,25)|
>>>>>> +---++
>>>>>> |id1|  26|
>>>>>> |id2|  41|
>>>>>> |id3|  50|
>>>>>> +---++
>>>>>>
>>>>>>

Re: How to calculate percentile of a column of DataFrame?

2015-10-13 Thread Umesh Kacha
Hi Ted sorry for asking again. Did you get chance to look at compilation
issue? Thanks much.

Regards.
On Oct 13, 2015 18:39, "Umesh Kacha" <umesh.ka...@gmail.com> wrote:

> Hi Ted I am using the following line of code I can't paste entire code
> sorry but the following only line doesn't compile in my spark job
>
>  sourceframe.select(callUDF("percentile_approx",col("mycol"), lit(0.25)))
>
> I am using Intellij editor java and maven dependencies of spark core spark
> sql spark hive version 1.5.1
> On Oct 13, 2015 18:21, "Ted Yu" <yuzhih...@gmail.com> wrote:
>
>> Can you pastebin your Java code and the command you used to compile ?
>>
>> Thanks
>>
>> On Oct 13, 2015, at 1:42 AM, Umesh Kacha <umesh.ka...@gmail.com> wrote:
>>
>> Hi Ted if fix went after 1.5.1 release then how come it's working with
>> 1.5.1 binary in spark-shell.
>> On Oct 13, 2015 1:32 PM, "Ted Yu" <yuzhih...@gmail.com> wrote:
>>
>>> Looks like the fix went in after 1.5.1 was released.
>>>
>>> You may verify using master branch build.
>>>
>>> Cheers
>>>
>>> On Oct 13, 2015, at 12:21 AM, Umesh Kacha <umesh.ka...@gmail.com> wrote:
>>>
>>> Hi Ted, thanks much I tried using percentile_approx in Spark-shell like
>>> you mentioned it works using 1.5.1 but it doesn't compile in Java using
>>> 1.5.1 maven libraries it still complains same that callUdf can have string
>>> and column types only. Please guide.
>>> On Oct 13, 2015 12:34 AM, "Ted Yu" <yuzhih...@gmail.com> wrote:
>>>
>>>> SQL context available as sqlContext.
>>>>
>>>> scala> val df = Seq(("id1", 1), ("id2", 4), ("id3", 5)).toDF("id",
>>>> "value")
>>>> df: org.apache.spark.sql.DataFrame = [id: string, value: int]
>>>>
>>>> scala> df.select(callUDF("percentile_approx",col("value"),
>>>> lit(0.25))).show()
>>>> +------+
>>>> |'percentile_approx(value,0.25)|
>>>> +--+
>>>> |   1.0|
>>>> +--+
>>>>
>>>> Can you upgrade to 1.5.1 ?
>>>>
>>>> Cheers
>>>>
>>>> On Mon, Oct 12, 2015 at 11:55 AM, Umesh Kacha <umesh.ka...@gmail.com>
>>>> wrote:
>>>>
>>>>> Sorry forgot to tell that I am using Spark 1.4.1 as callUdf is
>>>>> available in Spark 1.4.0 as per JAvadocx
>>>>>
>>>>> On Tue, Oct 13, 2015 at 12:22 AM, Umesh Kacha <umesh.ka...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Ted thanks much for the detailed answer and appreciate your
>>>>>> efforts. Do we need to register Hive UDFs?
>>>>>>
>>>>>> sqlContext.udf.register("percentile_approx");???//is it valid?
>>>>>>
>>>>>> I am calling Hive UDF percentile_approx in the following manner which
>>>>>> gives compilation error
>>>>>>
>>>>>> df.select("col1").groupby("col1").agg(callUdf("percentile_approx",col("col1"),lit(0.25)));//compile
>>>>>> error
>>>>>>
>>>>>> //compile error because callUdf() takes String and Column* as
>>>>>> arguments.
>>>>>>
>>>>>> Please guide. Thanks much.
>>>>>>
>>>>>> On Mon, Oct 12, 2015 at 11:44 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>>>>>
>>>>>>> Using spark-shell, I did the following exercise (master branch) :
>>>>>>>
>>>>>>>
>>>>>>> SQL context available as sqlContext.
>>>>>>>
>>>>>>> scala> val df = Seq(("id1", 1), ("id2", 4), ("id3", 5)).toDF("id",
>>>>>>> "value")
>>>>>>> df: org.apache.spark.sql.DataFrame = [id: string, value: int]
>>>>>>>
>>>>>>> scala> sqlContext.udf.register("simpleUDF", (v: Int, cnst: Int) => v
>>>>>>> * v + cnst)
>>>>>>> res0: org.apache.spark.sql.UserDefinedFunction =
>>>>>>> UserDefinedFunction(,IntegerType,List())
>>>>>>>
&g

Re: How to calculate percentile of a column of DataFrame?

2015-10-12 Thread Umesh Kacha
Hi if you can help it would be great as I am stuck don't know how to remove
compilation error in callUdf when we pass three parameters function name
string column name as col and lit function please guide
On Oct 11, 2015 1:05 AM, "Umesh Kacha" <umesh.ka...@gmail.com> wrote:

> Hi any idea? how do I call percentlie_approx using callUdf() please guide.
>
> On Sat, Oct 10, 2015 at 1:39 AM, Umesh Kacha <umesh.ka...@gmail.com>
> wrote:
>
>> I have a doubt Michael I tried to use callUDF in  the following code it
>> does not work.
>>
>> sourceFrame.agg(callUdf("percentile_approx",col("myCol"),lit(0.25)))
>>
>> Above code does not compile because callUdf() takes only two arguments
>> function name in String and Column class type. Please guide.
>>
>> On Sat, Oct 10, 2015 at 1:29 AM, Umesh Kacha <umesh.ka...@gmail.com>
>> wrote:
>>
>>> thanks much Michael let me try.
>>>
>>> On Sat, Oct 10, 2015 at 1:20 AM, Michael Armbrust <
>>> mich...@databricks.com> wrote:
>>>
>>>> This is confusing because I made a typo...
>>>>
>>>> callUDF("percentile_approx", col("mycol"), lit(0.25))
>>>>
>>>> The first argument is the name of the UDF, all other arguments need to
>>>> be columns that are passed in as arguments.  lit is just saying to make a
>>>> literal column that always has the value 0.25.
>>>>
>>>> On Fri, Oct 9, 2015 at 12:16 PM, <saif.a.ell...@wellsfargo.com> wrote:
>>>>
>>>>> Yes but I mean, this is rather curious. How is def lit(literal:Any)
>>>>> --> becomes a percentile function lit(25)
>>>>>
>>>>>
>>>>>
>>>>> Thanks for clarification
>>>>>
>>>>> Saif
>>>>>
>>>>>
>>>>>
>>>>> *From:* Umesh Kacha [mailto:umesh.ka...@gmail.com]
>>>>> *Sent:* Friday, October 09, 2015 4:10 PM
>>>>> *To:* Ellafi, Saif A.
>>>>> *Cc:* Michael Armbrust; user
>>>>>
>>>>> *Subject:* Re: How to calculate percentile of a column of DataFrame?
>>>>>
>>>>>
>>>>>
>>>>> I found it in 1.3 documentation lit says something else not percent
>>>>>
>>>>>
>>>>>
>>>>> public static Column 
>>>>> <https://spark.apache.org/docs/1.3.1/api/java/org/apache/spark/sql/Column.html>
>>>>>  lit(Object literal)
>>>>>
>>>>> Creates a Column
>>>>> <https://spark.apache.org/docs/1.3.1/api/java/org/apache/spark/sql/Column.html>
>>>>>  of
>>>>> literal value.
>>>>>
>>>>> The passed in object is returned directly if it is already a Column
>>>>> <https://spark.apache.org/docs/1.3.1/api/java/org/apache/spark/sql/Column.html>.
>>>>> If the object is a Scala Symbol, it is converted into a Column
>>>>> <https://spark.apache.org/docs/1.3.1/api/java/org/apache/spark/sql/Column.html>
>>>>>  also.
>>>>> Otherwise, a new Column
>>>>> <https://spark.apache.org/docs/1.3.1/api/java/org/apache/spark/sql/Column.html>
>>>>>  is
>>>>> created to represent the literal value.
>>>>>
>>>>>
>>>>>
>>>>> On Sat, Oct 10, 2015 at 12:39 AM, <saif.a.ell...@wellsfargo.com>
>>>>> wrote:
>>>>>
>>>>> Where can we find other available functions such as lit() ? I can’t
>>>>> find lit in the api.
>>>>>
>>>>>
>>>>>
>>>>> Thanks
>>>>>
>>>>>
>>>>>
>>>>> *From:* Michael Armbrust [mailto:mich...@databricks.com]
>>>>> *Sent:* Friday, October 09, 2015 4:04 PM
>>>>> *To:* unk1102
>>>>> *Cc:* user
>>>>> *Subject:* Re: How to calculate percentile of a column of DataFrame?
>>>>>
>>>>>
>>>>>
>>>>> You can use callUDF(col("mycol"), lit(0.25)) to call hive UDFs from
>>>>> dataframes.
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Oct 9, 2015 at 12:01 PM, unk1102 <umesh.ka...@gmail.com>
>>>>> wrote:
>>>>>
>>>>> Hi how to calculate percentile of a column in a DataFrame? I cant find
>>>>> any
>>>>> percentile_approx function in Spark aggregation functions. For e.g. in
>>>>> Hive
>>>>> we have percentile_approx and we can use it in the following way
>>>>>
>>>>> hiveContext.sql("select percentile_approx("mycol",0.25) from myTable);
>>>>>
>>>>> I can see ntile function but not sure how it is gonna give results
>>>>> same as
>>>>> above query please guide.
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> View this message in context:
>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-calculate-percentile-of-a-column-of-DataFrame-tp25000.html
>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>> Nabble.com.
>>>>>
>>>>> -
>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>
>>
>


Re: How to calculate percentile of a column of DataFrame?

2015-10-12 Thread Umesh Kacha
Hi Ted thanks if I dont pass lit function then how can I tell
percentile_approx function to give me 25% or 50% like we do in Hive
percentile_approx(mycol,0.25).

Regards

On Mon, Oct 12, 2015 at 7:20 PM, Ted Yu <yuzhih...@gmail.com> wrote:

> Umesh:
> Have you tried calling callUdf without the lit() parameter ?
>
> Cheers
>
> On Mon, Oct 12, 2015 at 6:27 AM, Umesh Kacha <umesh.ka...@gmail.com>
> wrote:
>
>> Hi if you can help it would be great as I am stuck don't know how to
>> remove compilation error in callUdf when we pass three parameters function
>> name string column name as col and lit function please guide
>> On Oct 11, 2015 1:05 AM, "Umesh Kacha" <umesh.ka...@gmail.com> wrote:
>>
>>> Hi any idea? how do I call percentlie_approx using callUdf() please
>>> guide.
>>>
>>> On Sat, Oct 10, 2015 at 1:39 AM, Umesh Kacha <umesh.ka...@gmail.com>
>>> wrote:
>>>
>>>> I have a doubt Michael I tried to use callUDF in  the following code it
>>>> does not work.
>>>>
>>>> sourceFrame.agg(callUdf("percentile_approx",col("myCol"),lit(0.25)))
>>>>
>>>> Above code does not compile because callUdf() takes only two arguments
>>>> function name in String and Column class type. Please guide.
>>>>
>>>> On Sat, Oct 10, 2015 at 1:29 AM, Umesh Kacha <umesh.ka...@gmail.com>
>>>> wrote:
>>>>
>>>>> thanks much Michael let me try.
>>>>>
>>>>> On Sat, Oct 10, 2015 at 1:20 AM, Michael Armbrust <
>>>>> mich...@databricks.com> wrote:
>>>>>
>>>>>> This is confusing because I made a typo...
>>>>>>
>>>>>> callUDF("percentile_approx", col("mycol"), lit(0.25))
>>>>>>
>>>>>> The first argument is the name of the UDF, all other arguments need
>>>>>> to be columns that are passed in as arguments.  lit is just saying to 
>>>>>> make
>>>>>> a literal column that always has the value 0.25.
>>>>>>
>>>>>> On Fri, Oct 9, 2015 at 12:16 PM, <saif.a.ell...@wellsfargo.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Yes but I mean, this is rather curious. How is def lit(literal:Any)
>>>>>>> --> becomes a percentile function lit(25)
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Thanks for clarification
>>>>>>>
>>>>>>> Saif
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> *From:* Umesh Kacha [mailto:umesh.ka...@gmail.com]
>>>>>>> *Sent:* Friday, October 09, 2015 4:10 PM
>>>>>>> *To:* Ellafi, Saif A.
>>>>>>> *Cc:* Michael Armbrust; user
>>>>>>>
>>>>>>> *Subject:* Re: How to calculate percentile of a column of DataFrame?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> I found it in 1.3 documentation lit says something else not percent
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> public static Column 
>>>>>>> <https://spark.apache.org/docs/1.3.1/api/java/org/apache/spark/sql/Column.html>
>>>>>>>  lit(Object literal)
>>>>>>>
>>>>>>> Creates a Column
>>>>>>> <https://spark.apache.org/docs/1.3.1/api/java/org/apache/spark/sql/Column.html>
>>>>>>>  of
>>>>>>> literal value.
>>>>>>>
>>>>>>> The passed in object is returned directly if it is already a Column
>>>>>>> <https://spark.apache.org/docs/1.3.1/api/java/org/apache/spark/sql/Column.html>.
>>>>>>> If the object is a Scala Symbol, it is converted into a Column
>>>>>>> <https://spark.apache.org/docs/1.3.1/api/java/org/apache/spark/sql/Column.html>
>>>>>>>  also.
>>>>>>> Otherwise, a new Column
>>>>>>> <https://spark.apache.org/docs/1.3.1/api/java/org/apache/spark/sql/Column.html>
>>>>>>>  is
>>>>>>> created to represent the literal value.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Sat, O

Re: How to calculate percentile of a column of DataFrame?

2015-10-12 Thread Umesh Kacha
Sorry forgot to tell that I am using Spark 1.4.1 as callUdf is available in
Spark 1.4.0 as per JAvadocx

On Tue, Oct 13, 2015 at 12:22 AM, Umesh Kacha <umesh.ka...@gmail.com> wrote:

> Hi Ted thanks much for the detailed answer and appreciate your efforts. Do
> we need to register Hive UDFs?
>
> sqlContext.udf.register("percentile_approx");???//is it valid?
>
> I am calling Hive UDF percentile_approx in the following manner which
> gives compilation error
>
> df.select("col1").groupby("col1").agg(callUdf("percentile_approx",col("col1"),lit(0.25)));//compile
> error
>
> //compile error because callUdf() takes String and Column* as arguments.
>
> Please guide. Thanks much.
>
> On Mon, Oct 12, 2015 at 11:44 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>
>> Using spark-shell, I did the following exercise (master branch) :
>>
>>
>> SQL context available as sqlContext.
>>
>> scala> val df = Seq(("id1", 1), ("id2", 4), ("id3", 5)).toDF("id",
>> "value")
>> df: org.apache.spark.sql.DataFrame = [id: string, value: int]
>>
>> scala> sqlContext.udf.register("simpleUDF", (v: Int, cnst: Int) => v * v
>> + cnst)
>> res0: org.apache.spark.sql.UserDefinedFunction =
>> UserDefinedFunction(,IntegerType,List())
>>
>> scala> df.select($"id", callUDF("simpleUDF", $"value", lit(25))).show()
>> +---++
>> | id|'simpleUDF(value,25)|
>> +---++
>> |id1|  26|
>> |id2|  41|
>> |id3|  50|
>> +---++
>>
>> Which Spark release are you using ?
>>
>> Can you pastebin the full stack trace where you got the error ?
>>
>> Cheers
>>
>> On Fri, Oct 9, 2015 at 1:09 PM, Umesh Kacha <umesh.ka...@gmail.com>
>> wrote:
>>
>>> I have a doubt Michael I tried to use callUDF in  the following code it
>>> does not work.
>>>
>>> sourceFrame.agg(callUdf("percentile_approx",col("myCol"),lit(0.25)))
>>>
>>> Above code does not compile because callUdf() takes only two arguments
>>> function name in String and Column class type. Please guide.
>>>
>>> On Sat, Oct 10, 2015 at 1:29 AM, Umesh Kacha <umesh.ka...@gmail.com>
>>> wrote:
>>>
>>>> thanks much Michael let me try.
>>>>
>>>> On Sat, Oct 10, 2015 at 1:20 AM, Michael Armbrust <
>>>> mich...@databricks.com> wrote:
>>>>
>>>>> This is confusing because I made a typo...
>>>>>
>>>>> callUDF("percentile_approx", col("mycol"), lit(0.25))
>>>>>
>>>>> The first argument is the name of the UDF, all other arguments need to
>>>>> be columns that are passed in as arguments.  lit is just saying to make a
>>>>> literal column that always has the value 0.25.
>>>>>
>>>>> On Fri, Oct 9, 2015 at 12:16 PM, <saif.a.ell...@wellsfargo.com> wrote:
>>>>>
>>>>>> Yes but I mean, this is rather curious. How is def lit(literal:Any)
>>>>>> --> becomes a percentile function lit(25)
>>>>>>
>>>>>>
>>>>>>
>>>>>> Thanks for clarification
>>>>>>
>>>>>> Saif
>>>>>>
>>>>>>
>>>>>>
>>>>>> *From:* Umesh Kacha [mailto:umesh.ka...@gmail.com]
>>>>>> *Sent:* Friday, October 09, 2015 4:10 PM
>>>>>> *To:* Ellafi, Saif A.
>>>>>> *Cc:* Michael Armbrust; user
>>>>>>
>>>>>> *Subject:* Re: How to calculate percentile of a column of DataFrame?
>>>>>>
>>>>>>
>>>>>>
>>>>>> I found it in 1.3 documentation lit says something else not percent
>>>>>>
>>>>>>
>>>>>>
>>>>>> public static Column 
>>>>>> <https://spark.apache.org/docs/1.3.1/api/java/org/apache/spark/sql/Column.html>
>>>>>>  lit(Object literal)
>>>>>>
>>>>>> Creates a Column
>>>>>> <https://spark.apache.org/docs/1.3.1/api/java/org/apache/spark/sql/Column.html>
>>>>>>  of
>>>>>> literal value.
>>>>>>
>>>>>> The passed in obje

Re: How to calculate percentile of a column of DataFrame?

2015-10-12 Thread Umesh Kacha
Hi Ted thanks much for the detailed answer and appreciate your efforts. Do
we need to register Hive UDFs?

sqlContext.udf.register("percentile_approx");???//is it valid?

I am calling Hive UDF percentile_approx in the following manner which gives
compilation error

df.select("col1").groupby("col1").agg(callUdf("percentile_approx",col("col1"),lit(0.25)));//compile
error

//compile error because callUdf() takes String and Column* as arguments.

Please guide. Thanks much.

On Mon, Oct 12, 2015 at 11:44 PM, Ted Yu <yuzhih...@gmail.com> wrote:

> Using spark-shell, I did the following exercise (master branch) :
>
>
> SQL context available as sqlContext.
>
> scala> val df = Seq(("id1", 1), ("id2", 4), ("id3", 5)).toDF("id", "value")
> df: org.apache.spark.sql.DataFrame = [id: string, value: int]
>
> scala> sqlContext.udf.register("simpleUDF", (v: Int, cnst: Int) => v * v +
> cnst)
> res0: org.apache.spark.sql.UserDefinedFunction =
> UserDefinedFunction(,IntegerType,List())
>
> scala> df.select($"id", callUDF("simpleUDF", $"value", lit(25))).show()
> +---++
> | id|'simpleUDF(value,25)|
> +---++
> |id1|  26|
> |id2|  41|
> |id3|  50|
> +---++
>
> Which Spark release are you using ?
>
> Can you pastebin the full stack trace where you got the error ?
>
> Cheers
>
> On Fri, Oct 9, 2015 at 1:09 PM, Umesh Kacha <umesh.ka...@gmail.com> wrote:
>
>> I have a doubt Michael I tried to use callUDF in  the following code it
>> does not work.
>>
>> sourceFrame.agg(callUdf("percentile_approx",col("myCol"),lit(0.25)))
>>
>> Above code does not compile because callUdf() takes only two arguments
>> function name in String and Column class type. Please guide.
>>
>> On Sat, Oct 10, 2015 at 1:29 AM, Umesh Kacha <umesh.ka...@gmail.com>
>> wrote:
>>
>>> thanks much Michael let me try.
>>>
>>> On Sat, Oct 10, 2015 at 1:20 AM, Michael Armbrust <
>>> mich...@databricks.com> wrote:
>>>
>>>> This is confusing because I made a typo...
>>>>
>>>> callUDF("percentile_approx", col("mycol"), lit(0.25))
>>>>
>>>> The first argument is the name of the UDF, all other arguments need to
>>>> be columns that are passed in as arguments.  lit is just saying to make a
>>>> literal column that always has the value 0.25.
>>>>
>>>> On Fri, Oct 9, 2015 at 12:16 PM, <saif.a.ell...@wellsfargo.com> wrote:
>>>>
>>>>> Yes but I mean, this is rather curious. How is def lit(literal:Any)
>>>>> --> becomes a percentile function lit(25)
>>>>>
>>>>>
>>>>>
>>>>> Thanks for clarification
>>>>>
>>>>> Saif
>>>>>
>>>>>
>>>>>
>>>>> *From:* Umesh Kacha [mailto:umesh.ka...@gmail.com]
>>>>> *Sent:* Friday, October 09, 2015 4:10 PM
>>>>> *To:* Ellafi, Saif A.
>>>>> *Cc:* Michael Armbrust; user
>>>>>
>>>>> *Subject:* Re: How to calculate percentile of a column of DataFrame?
>>>>>
>>>>>
>>>>>
>>>>> I found it in 1.3 documentation lit says something else not percent
>>>>>
>>>>>
>>>>>
>>>>> public static Column 
>>>>> <https://spark.apache.org/docs/1.3.1/api/java/org/apache/spark/sql/Column.html>
>>>>>  lit(Object literal)
>>>>>
>>>>> Creates a Column
>>>>> <https://spark.apache.org/docs/1.3.1/api/java/org/apache/spark/sql/Column.html>
>>>>>  of
>>>>> literal value.
>>>>>
>>>>> The passed in object is returned directly if it is already a Column
>>>>> <https://spark.apache.org/docs/1.3.1/api/java/org/apache/spark/sql/Column.html>.
>>>>> If the object is a Scala Symbol, it is converted into a Column
>>>>> <https://spark.apache.org/docs/1.3.1/api/java/org/apache/spark/sql/Column.html>
>>>>>  also.
>>>>> Otherwise, a new Column
>>>>> <https://spark.apache.org/docs/1.3.1/api/java/org/apache/spark/sql/Column.html>
>>>>>  is
>>>>> created to represent the literal value.
>>>>>
>>

Re: How to calculate percentile of a column of DataFrame?

2015-10-12 Thread Umesh Kacha
Hi Ted thanks much are you saying above code will work in only 1.5.1? I
tried upgrading to 1.5.1 but I have found potential bug my Spark job
creates hive partitions using hiveContext.sql("insert into partitions")
when I use Spark 1.5.1 I cant see any partitions files orc files getting
created in HDFS I can see empty partitions directory under Hive table along
with many staging files created by spark.

On Tue, Oct 13, 2015 at 12:34 AM, Ted Yu <yuzhih...@gmail.com> wrote:

> SQL context available as sqlContext.
>
> scala> val df = Seq(("id1", 1), ("id2", 4), ("id3", 5)).toDF("id", "value")
> df: org.apache.spark.sql.DataFrame = [id: string, value: int]
>
> scala> df.select(callUDF("percentile_approx",col("value"),
> lit(0.25))).show()
> +--+
> |'percentile_approx(value,0.25)|
> +--+
> |   1.0|
> +--+
>
> Can you upgrade to 1.5.1 ?
>
> Cheers
>
> On Mon, Oct 12, 2015 at 11:55 AM, Umesh Kacha <umesh.ka...@gmail.com>
> wrote:
>
>> Sorry forgot to tell that I am using Spark 1.4.1 as callUdf is available
>> in Spark 1.4.0 as per JAvadocx
>>
>> On Tue, Oct 13, 2015 at 12:22 AM, Umesh Kacha <umesh.ka...@gmail.com>
>> wrote:
>>
>>> Hi Ted thanks much for the detailed answer and appreciate your efforts.
>>> Do we need to register Hive UDFs?
>>>
>>> sqlContext.udf.register("percentile_approx");???//is it valid?
>>>
>>> I am calling Hive UDF percentile_approx in the following manner which
>>> gives compilation error
>>>
>>> df.select("col1").groupby("col1").agg(callUdf("percentile_approx",col("col1"),lit(0.25)));//compile
>>> error
>>>
>>> //compile error because callUdf() takes String and Column* as arguments.
>>>
>>> Please guide. Thanks much.
>>>
>>> On Mon, Oct 12, 2015 at 11:44 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>>
>>>> Using spark-shell, I did the following exercise (master branch) :
>>>>
>>>>
>>>> SQL context available as sqlContext.
>>>>
>>>> scala> val df = Seq(("id1", 1), ("id2", 4), ("id3", 5)).toDF("id",
>>>> "value")
>>>> df: org.apache.spark.sql.DataFrame = [id: string, value: int]
>>>>
>>>> scala> sqlContext.udf.register("simpleUDF", (v: Int, cnst: Int) => v *
>>>> v + cnst)
>>>> res0: org.apache.spark.sql.UserDefinedFunction =
>>>> UserDefinedFunction(,IntegerType,List())
>>>>
>>>> scala> df.select($"id", callUDF("simpleUDF", $"value", lit(25))).show()
>>>> +---++
>>>> | id|'simpleUDF(value,25)|
>>>> +---++
>>>> |id1|  26|
>>>> |id2|  41|
>>>> |id3|  50|
>>>> +---++
>>>>
>>>> Which Spark release are you using ?
>>>>
>>>> Can you pastebin the full stack trace where you got the error ?
>>>>
>>>> Cheers
>>>>
>>>> On Fri, Oct 9, 2015 at 1:09 PM, Umesh Kacha <umesh.ka...@gmail.com>
>>>> wrote:
>>>>
>>>>> I have a doubt Michael I tried to use callUDF in  the following code
>>>>> it does not work.
>>>>>
>>>>> sourceFrame.agg(callUdf("percentile_approx",col("myCol"),lit(0.25)))
>>>>>
>>>>> Above code does not compile because callUdf() takes only two arguments
>>>>> function name in String and Column class type. Please guide.
>>>>>
>>>>> On Sat, Oct 10, 2015 at 1:29 AM, Umesh Kacha <umesh.ka...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> thanks much Michael let me try.
>>>>>>
>>>>>> On Sat, Oct 10, 2015 at 1:20 AM, Michael Armbrust <
>>>>>> mich...@databricks.com> wrote:
>>>>>>
>>>>>>> This is confusing because I made a typo...
>>>>>>>
>>>>>>> callUDF("percentile_approx", col("mycol"), lit(0.25))
>>>>>>>
>>>>>>> The first argument is the name of the UDF, all other arguments need
>>>>>>> t

Re: How to calculate percentile of a column of DataFrame?

2015-10-10 Thread Umesh Kacha
Hi any idea? how do I call percentlie_approx using callUdf() please guide.

On Sat, Oct 10, 2015 at 1:39 AM, Umesh Kacha <umesh.ka...@gmail.com> wrote:

> I have a doubt Michael I tried to use callUDF in  the following code it
> does not work.
>
> sourceFrame.agg(callUdf("percentile_approx",col("myCol"),lit(0.25)))
>
> Above code does not compile because callUdf() takes only two arguments
> function name in String and Column class type. Please guide.
>
> On Sat, Oct 10, 2015 at 1:29 AM, Umesh Kacha <umesh.ka...@gmail.com>
> wrote:
>
>> thanks much Michael let me try.
>>
>> On Sat, Oct 10, 2015 at 1:20 AM, Michael Armbrust <mich...@databricks.com
>> > wrote:
>>
>>> This is confusing because I made a typo...
>>>
>>> callUDF("percentile_approx", col("mycol"), lit(0.25))
>>>
>>> The first argument is the name of the UDF, all other arguments need to
>>> be columns that are passed in as arguments.  lit is just saying to make a
>>> literal column that always has the value 0.25.
>>>
>>> On Fri, Oct 9, 2015 at 12:16 PM, <saif.a.ell...@wellsfargo.com> wrote:
>>>
>>>> Yes but I mean, this is rather curious. How is def lit(literal:Any) -->
>>>> becomes a percentile function lit(25)
>>>>
>>>>
>>>>
>>>> Thanks for clarification
>>>>
>>>> Saif
>>>>
>>>>
>>>>
>>>> *From:* Umesh Kacha [mailto:umesh.ka...@gmail.com]
>>>> *Sent:* Friday, October 09, 2015 4:10 PM
>>>> *To:* Ellafi, Saif A.
>>>> *Cc:* Michael Armbrust; user
>>>>
>>>> *Subject:* Re: How to calculate percentile of a column of DataFrame?
>>>>
>>>>
>>>>
>>>> I found it in 1.3 documentation lit says something else not percent
>>>>
>>>>
>>>>
>>>> public static Column 
>>>> <https://spark.apache.org/docs/1.3.1/api/java/org/apache/spark/sql/Column.html>
>>>>  lit(Object literal)
>>>>
>>>> Creates a Column
>>>> <https://spark.apache.org/docs/1.3.1/api/java/org/apache/spark/sql/Column.html>
>>>>  of
>>>> literal value.
>>>>
>>>> The passed in object is returned directly if it is already a Column
>>>> <https://spark.apache.org/docs/1.3.1/api/java/org/apache/spark/sql/Column.html>.
>>>> If the object is a Scala Symbol, it is converted into a Column
>>>> <https://spark.apache.org/docs/1.3.1/api/java/org/apache/spark/sql/Column.html>
>>>>  also.
>>>> Otherwise, a new Column
>>>> <https://spark.apache.org/docs/1.3.1/api/java/org/apache/spark/sql/Column.html>
>>>>  is
>>>> created to represent the literal value.
>>>>
>>>>
>>>>
>>>> On Sat, Oct 10, 2015 at 12:39 AM, <saif.a.ell...@wellsfargo.com> wrote:
>>>>
>>>> Where can we find other available functions such as lit() ? I can’t
>>>> find lit in the api.
>>>>
>>>>
>>>>
>>>> Thanks
>>>>
>>>>
>>>>
>>>> *From:* Michael Armbrust [mailto:mich...@databricks.com]
>>>> *Sent:* Friday, October 09, 2015 4:04 PM
>>>> *To:* unk1102
>>>> *Cc:* user
>>>> *Subject:* Re: How to calculate percentile of a column of DataFrame?
>>>>
>>>>
>>>>
>>>> You can use callUDF(col("mycol"), lit(0.25)) to call hive UDFs from
>>>> dataframes.
>>>>
>>>>
>>>>
>>>> On Fri, Oct 9, 2015 at 12:01 PM, unk1102 <umesh.ka...@gmail.com> wrote:
>>>>
>>>> Hi how to calculate percentile of a column in a DataFrame? I cant find
>>>> any
>>>> percentile_approx function in Spark aggregation functions. For e.g. in
>>>> Hive
>>>> we have percentile_approx and we can use it in the following way
>>>>
>>>> hiveContext.sql("select percentile_approx("mycol",0.25) from myTable);
>>>>
>>>> I can see ntile function but not sure how it is gonna give results same
>>>> as
>>>> above query please guide.
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context:
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-calculate-percentile-of-a-column-of-DataFrame-tp25000.html
>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>>
>>>> -
>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>
>>>>
>>>>
>>>>
>>>>
>>>
>>>
>>
>


Re: Why dataframe.persist(StorageLevels.MEMORY_AND_DISK_SER) hangs for long time?

2015-10-10 Thread Umesh Kacha
Hi Alex thanks for the response. I am using 40 executor with 30 gb
including 5 gb menoryOverhead and 4 cores. My cluster has around 100 nodes
with 30 gig and 8 cores.
On Oct 11, 2015 06:54, "Alex Rovner"  wrote:

> How many executors are you running with? How many nodes in your cluster?
>
> On Thursday, October 8, 2015, unk1102  wrote:
>
>> Hi as recommended I am caching my Spark job dataframe as
>> dataframe.persist(StorageLevels.MEMORY_AND_DISK_SER) but what I see in
>> Spark
>> job UI is this persist stage runs for so long showing 10 GB of shuffle
>> read
>> and 5 GB of shuffle write it takes to long to finish and because of that
>> sometimes my Spark job throws timeout or throws OOM and hence executors
>> gets
>> killed by YARN. I am using Spark 1.4.1. I am using all sort of
>> optimizations
>> like Tungsten, Kryo I have given storage.memoryFraction as 0.2 and
>> storage.shuffle as 0.2 also. My data is huge around 1 TB I am using
>> default
>> 200 partitions for spark.sql.shuffle.partitions. Please help me I am
>> clueless please guide.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Why-dataframe-persist-StorageLevels-MEMORY-AND-DISK-SER-hangs-for-long-time-tp24981.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>
> --
> *Alex Rovner*
> *Director, Data Engineering *
> *o:* 646.759.0052
>
> * *
>
>


Re: How to calculate percentile of a column of DataFrame?

2015-10-09 Thread Umesh Kacha
I found it in 1.3 documentation lit says something else not percent

public static Column

lit(Object literal)

Creates a Column

of
literal value.

The passed in object is returned directly if it is already a Column
.
If the object is a Scala Symbol, it is converted into a Column

also.
Otherwise, a new Column

is
created to represent the literal value.

On Sat, Oct 10, 2015 at 12:39 AM,  wrote:

> Where can we find other available functions such as lit() ? I can’t find
> lit in the api.
>
>
>
> Thanks
>
>
>
> *From:* Michael Armbrust [mailto:mich...@databricks.com]
> *Sent:* Friday, October 09, 2015 4:04 PM
> *To:* unk1102
> *Cc:* user
> *Subject:* Re: How to calculate percentile of a column of DataFrame?
>
>
>
> You can use callUDF(col("mycol"), lit(0.25)) to call hive UDFs from
> dataframes.
>
>
>
> On Fri, Oct 9, 2015 at 12:01 PM, unk1102  wrote:
>
> Hi how to calculate percentile of a column in a DataFrame? I cant find any
> percentile_approx function in Spark aggregation functions. For e.g. in Hive
> we have percentile_approx and we can use it in the following way
>
> hiveContext.sql("select percentile_approx("mycol",0.25) from myTable);
>
> I can see ntile function but not sure how it is gonna give results same as
> above query please guide.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-calculate-percentile-of-a-column-of-DataFrame-tp25000.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
>


Re: How to calculate percentile of a column of DataFrame?

2015-10-09 Thread Umesh Kacha
thanks much Michael let me try.

On Sat, Oct 10, 2015 at 1:20 AM, Michael Armbrust <mich...@databricks.com>
wrote:

> This is confusing because I made a typo...
>
> callUDF("percentile_approx", col("mycol"), lit(0.25))
>
> The first argument is the name of the UDF, all other arguments need to be
> columns that are passed in as arguments.  lit is just saying to make a
> literal column that always has the value 0.25.
>
> On Fri, Oct 9, 2015 at 12:16 PM, <saif.a.ell...@wellsfargo.com> wrote:
>
>> Yes but I mean, this is rather curious. How is def lit(literal:Any) -->
>> becomes a percentile function lit(25)
>>
>>
>>
>> Thanks for clarification
>>
>> Saif
>>
>>
>>
>> *From:* Umesh Kacha [mailto:umesh.ka...@gmail.com]
>> *Sent:* Friday, October 09, 2015 4:10 PM
>> *To:* Ellafi, Saif A.
>> *Cc:* Michael Armbrust; user
>>
>> *Subject:* Re: How to calculate percentile of a column of DataFrame?
>>
>>
>>
>> I found it in 1.3 documentation lit says something else not percent
>>
>>
>>
>> public static Column 
>> <https://spark.apache.org/docs/1.3.1/api/java/org/apache/spark/sql/Column.html>
>>  lit(Object literal)
>>
>> Creates a Column
>> <https://spark.apache.org/docs/1.3.1/api/java/org/apache/spark/sql/Column.html>
>>  of
>> literal value.
>>
>> The passed in object is returned directly if it is already a Column
>> <https://spark.apache.org/docs/1.3.1/api/java/org/apache/spark/sql/Column.html>.
>> If the object is a Scala Symbol, it is converted into a Column
>> <https://spark.apache.org/docs/1.3.1/api/java/org/apache/spark/sql/Column.html>
>>  also.
>> Otherwise, a new Column
>> <https://spark.apache.org/docs/1.3.1/api/java/org/apache/spark/sql/Column.html>
>>  is
>> created to represent the literal value.
>>
>>
>>
>> On Sat, Oct 10, 2015 at 12:39 AM, <saif.a.ell...@wellsfargo.com> wrote:
>>
>> Where can we find other available functions such as lit() ? I can’t find
>> lit in the api.
>>
>>
>>
>> Thanks
>>
>>
>>
>> *From:* Michael Armbrust [mailto:mich...@databricks.com]
>> *Sent:* Friday, October 09, 2015 4:04 PM
>> *To:* unk1102
>> *Cc:* user
>> *Subject:* Re: How to calculate percentile of a column of DataFrame?
>>
>>
>>
>> You can use callUDF(col("mycol"), lit(0.25)) to call hive UDFs from
>> dataframes.
>>
>>
>>
>> On Fri, Oct 9, 2015 at 12:01 PM, unk1102 <umesh.ka...@gmail.com> wrote:
>>
>> Hi how to calculate percentile of a column in a DataFrame? I cant find any
>> percentile_approx function in Spark aggregation functions. For e.g. in
>> Hive
>> we have percentile_approx and we can use it in the following way
>>
>> hiveContext.sql("select percentile_approx("mycol",0.25) from myTable);
>>
>> I can see ntile function but not sure how it is gonna give results same as
>> above query please guide.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-calculate-percentile-of-a-column-of-DataFrame-tp25000.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>>
>>
>>
>
>


Re: How to calculate percentile of a column of DataFrame?

2015-10-09 Thread Umesh Kacha
I have a doubt Michael I tried to use callUDF in  the following code it
does not work.

sourceFrame.agg(callUdf("percentile_approx",col("myCol"),lit(0.25)))

Above code does not compile because callUdf() takes only two arguments
function name in String and Column class type. Please guide.

On Sat, Oct 10, 2015 at 1:29 AM, Umesh Kacha <umesh.ka...@gmail.com> wrote:

> thanks much Michael let me try.
>
> On Sat, Oct 10, 2015 at 1:20 AM, Michael Armbrust <mich...@databricks.com>
> wrote:
>
>> This is confusing because I made a typo...
>>
>> callUDF("percentile_approx", col("mycol"), lit(0.25))
>>
>> The first argument is the name of the UDF, all other arguments need to be
>> columns that are passed in as arguments.  lit is just saying to make a
>> literal column that always has the value 0.25.
>>
>> On Fri, Oct 9, 2015 at 12:16 PM, <saif.a.ell...@wellsfargo.com> wrote:
>>
>>> Yes but I mean, this is rather curious. How is def lit(literal:Any) -->
>>> becomes a percentile function lit(25)
>>>
>>>
>>>
>>> Thanks for clarification
>>>
>>> Saif
>>>
>>>
>>>
>>> *From:* Umesh Kacha [mailto:umesh.ka...@gmail.com]
>>> *Sent:* Friday, October 09, 2015 4:10 PM
>>> *To:* Ellafi, Saif A.
>>> *Cc:* Michael Armbrust; user
>>>
>>> *Subject:* Re: How to calculate percentile of a column of DataFrame?
>>>
>>>
>>>
>>> I found it in 1.3 documentation lit says something else not percent
>>>
>>>
>>>
>>> public static Column 
>>> <https://spark.apache.org/docs/1.3.1/api/java/org/apache/spark/sql/Column.html>
>>>  lit(Object literal)
>>>
>>> Creates a Column
>>> <https://spark.apache.org/docs/1.3.1/api/java/org/apache/spark/sql/Column.html>
>>>  of
>>> literal value.
>>>
>>> The passed in object is returned directly if it is already a Column
>>> <https://spark.apache.org/docs/1.3.1/api/java/org/apache/spark/sql/Column.html>.
>>> If the object is a Scala Symbol, it is converted into a Column
>>> <https://spark.apache.org/docs/1.3.1/api/java/org/apache/spark/sql/Column.html>
>>>  also.
>>> Otherwise, a new Column
>>> <https://spark.apache.org/docs/1.3.1/api/java/org/apache/spark/sql/Column.html>
>>>  is
>>> created to represent the literal value.
>>>
>>>
>>>
>>> On Sat, Oct 10, 2015 at 12:39 AM, <saif.a.ell...@wellsfargo.com> wrote:
>>>
>>> Where can we find other available functions such as lit() ? I can’t find
>>> lit in the api.
>>>
>>>
>>>
>>> Thanks
>>>
>>>
>>>
>>> *From:* Michael Armbrust [mailto:mich...@databricks.com]
>>> *Sent:* Friday, October 09, 2015 4:04 PM
>>> *To:* unk1102
>>> *Cc:* user
>>> *Subject:* Re: How to calculate percentile of a column of DataFrame?
>>>
>>>
>>>
>>> You can use callUDF(col("mycol"), lit(0.25)) to call hive UDFs from
>>> dataframes.
>>>
>>>
>>>
>>> On Fri, Oct 9, 2015 at 12:01 PM, unk1102 <umesh.ka...@gmail.com> wrote:
>>>
>>> Hi how to calculate percentile of a column in a DataFrame? I cant find
>>> any
>>> percentile_approx function in Spark aggregation functions. For e.g. in
>>> Hive
>>> we have percentile_approx and we can use it in the following way
>>>
>>> hiveContext.sql("select percentile_approx("mycol",0.25) from myTable);
>>>
>>> I can see ntile function but not sure how it is gonna give results same
>>> as
>>> above query please guide.
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-calculate-percentile-of-a-column-of-DataFrame-tp25000.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>>
>>>
>>>
>>
>>
>


Re: How to increase Spark partitions for the DataFrame?

2015-10-08 Thread Umesh Kacha
Hi Lan, thanks for the response yes I know and I have confirmed in UI that
it has only 12 partitions because of 12 HDFS blocks and hive orc file strip
size is 33554432.

On Thu, Oct 8, 2015 at 11:55 PM, Lan Jiang  wrote:

> The partition number should be the same as the HDFS block number instead
> of file number. Did you confirmed from the spark UI that only 12 partitions
> were created? What is your ORC orc.stripe.size?
>
> Lan
>
>
> > On Oct 8, 2015, at 1:13 PM, unk1102  wrote:
> >
> > Hi I have the following code where I read ORC files from HDFS and it
> loads
> > directory which contains 12 ORC files. Now since HDFS directory contains
> 12
> > files it will create 12 partitions by default. These directory is huge
> and
> > when ORC files gets decompressed it becomes around 10 GB how do I
> increase
> > partitions for the below code so that my Spark job runs faster and does
> not
> > hang for long time because of reading 10 GB files through shuffle in 12
> > partitions. Please guide.
> >
> > DataFrame df =
> > hiveContext.read().format("orc").load("/hdfs/path/to/orc/files/");
> > df.select().groupby(..)
> >
> >
> >
> >
> > --
> > View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-increase-Spark-partitions-for-the-DataFrame-tp24980.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > -
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > For additional commands, e-mail: user-h...@spark.apache.org
> >
>
>


Re: How to increase Spark partitions for the DataFrame?

2015-10-08 Thread Umesh Kacha
Hi Lan thanks for the reply. I have tried to do the following but it did
not increase partition

DataFrame df = hiveContext.read().format("orc").load("/hdfs/path/to/orc/
files/").repartition(100);

Yes I have checked in namenode ui ORC files contains 12 files/blocks of 128
MB each and ORC files when decompressed its around 10 GB and its
uncompressed file size is around 1 GB

On Fri, Oct 9, 2015 at 12:43 AM, Lan Jiang <ljia...@gmail.com> wrote:

> Hmm, that’s odd.
>
> You can always use repartition(n) to increase the partition number, but
> then there will be shuffle. How large is your ORC file? Have you used
> NameNode UI to check how many HDFS blocks each ORC file has?
>
> Lan
>
>
> On Oct 8, 2015, at 2:08 PM, Umesh Kacha <umesh.ka...@gmail.com> wrote:
>
> Hi Lan, thanks for the response yes I know and I have confirmed in UI that
> it has only 12 partitions because of 12 HDFS blocks and hive orc file strip
> size is 33554432.
>
> On Thu, Oct 8, 2015 at 11:55 PM, Lan Jiang <ljia...@gmail.com> wrote:
>
>> The partition number should be the same as the HDFS block number instead
>> of file number. Did you confirmed from the spark UI that only 12 partitions
>> were created? What is your ORC orc.stripe.size?
>>
>> Lan
>>
>>
>> > On Oct 8, 2015, at 1:13 PM, unk1102 <umesh.ka...@gmail.com> wrote:
>> >
>> > Hi I have the following code where I read ORC files from HDFS and it
>> loads
>> > directory which contains 12 ORC files. Now since HDFS directory
>> contains 12
>> > files it will create 12 partitions by default. These directory is huge
>> and
>> > when ORC files gets decompressed it becomes around 10 GB how do I
>> increase
>> > partitions for the below code so that my Spark job runs faster and does
>> not
>> > hang for long time because of reading 10 GB files through shuffle in 12
>> > partitions. Please guide.
>> >
>> > DataFrame df =
>> > hiveContext.read().format("orc").load("/hdfs/path/to/orc/files/");
>> > df.select().groupby(..)
>> >
>> >
>> >
>> >
>> > --
>> > View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-increase-Spark-partitions-for-the-DataFrame-tp24980.html
>> > Sent from the Apache Spark User List mailing list archive at Nabble.com
>> .
>> >
>> > -
>> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> > For additional commands, e-mail: user-h...@spark.apache.org
>> >
>>
>>
>
>


Re: ORC files created by Spark job can't be accessed using hive table

2015-10-06 Thread Umesh Kacha
Thanks Michael so the following code written using Spark 1.5.1 should be
able to recognise by Hive table right

dataFrame.write().mode(SaveMode.Append).partitionBy("
entity","date").format("orc").save("baseTable");

Hive console:
Create external table bla bla
stored as ORC
Location '/user/xyz/baseTable'

On Tue, Oct 6, 2015 at 10:54 PM, Michael Armbrust <mich...@databricks.com>
wrote:

> I believe this is fixed in Spark 1.5.1 as long as the table is only using
> types that hive understands and is not partitioned.  The problem with
> partitioned tables it that hive does not support dynamic discovery unless
> you manually run the repair command.
>
> On Tue, Oct 6, 2015 at 9:33 AM, Umesh Kacha <umesh.ka...@gmail.com> wrote:
>
>> Hi Ted thanks I know I solved that by using dataframe for both reading
>> and writing. I am running into different problem now if spark can read hive
>> orc files why can't hive read orc files created by Spark?
>> On Oct 6, 2015 9:28 PM, "Ted Yu" <yuzhih...@gmail.com> wrote:
>>
>>> See this thread:
>>> http://search-hadoop.com/m/q3RTtwwjNxXvPEe1
>>>
>>> A brief search in Spark JIRAs didn't find anything opened on this
>>> subject.
>>>
>>> On Tue, Oct 6, 2015 at 8:51 AM, unk1102 <umesh.ka...@gmail.com> wrote:
>>>
>>>> Hi I have a spark job which creates ORC files in partitions using the
>>>> following code
>>>>
>>>>
>>>> dataFrame.write().mode(SaveMode.Append).partitionBy("entity","date").format("orc").save("baseTable");
>>>>
>>>> Above code creates successfully orc files which is readable in Spark
>>>> dataframe
>>>>
>>>> But when I try to load orc files generated using above code into hive
>>>> orc
>>>> table or hive external table nothing gets printed looks like table is
>>>> empty
>>>> what's wrong here I can see orc files in hdfs but hive table does not
>>>> read
>>>> it please guide
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context:
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/ORC-files-created-by-Spark-job-can-t-be-accessed-using-hive-table-tp24954.html
>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>>
>>>> -
>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>
>>>>
>>>
>


Re: ORC files created by Spark job can't be accessed using hive table

2015-10-06 Thread Umesh Kacha
Hi Ted thanks I know I solved that by using dataframe for both reading and
writing. I am running into different problem now if spark can read hive orc
files why can't hive read orc files created by Spark?
On Oct 6, 2015 9:28 PM, "Ted Yu"  wrote:

> See this thread:
> http://search-hadoop.com/m/q3RTtwwjNxXvPEe1
>
> A brief search in Spark JIRAs didn't find anything opened on this subject.
>
> On Tue, Oct 6, 2015 at 8:51 AM, unk1102  wrote:
>
>> Hi I have a spark job which creates ORC files in partitions using the
>> following code
>>
>>
>> dataFrame.write().mode(SaveMode.Append).partitionBy("entity","date").format("orc").save("baseTable");
>>
>> Above code creates successfully orc files which is readable in Spark
>> dataframe
>>
>> But when I try to load orc files generated using above code into hive orc
>> table or hive external table nothing gets printed looks like table is
>> empty
>> what's wrong here I can see orc files in hdfs but hive table does not read
>> it please guide
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/ORC-files-created-by-Spark-job-can-t-be-accessed-using-hive-table-tp24954.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: How to optimize group by query fired using hiveContext.sql?

2015-10-05 Thread Umesh Kacha
Hi thanks I usually get see the following errors in Spark logs and because
of that I think executor gets lost all of the following happens because
huge data shuffle and I cant avoid that dont know what to do please guide

15/08/16 12:26:46 WARN spark.HeartbeatReceiver: Removing executor 10
with no recent heartbeats:

1051638 ms exceeds timeout 100 ms

Or

org.apache.spark.shuffle.MetadataFetchFailedException: Missing an
output location for shuffle 0
at 
org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:384)
at 
org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:381)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
at 
org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:380)
at 
org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:176)
at 
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42)
at 
org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:722)



OR YARN kills container because of

Container [pid=26783,containerID=container_1389136889967_0009_01_02]
is running beyond physical memory limits. Current usage: 30.2 GB of 30
GB physical memory used; Killing container.


On Mon, Oct 5, 2015 at 8:00 AM, Alex Rovner <alex.rov...@magnetic.com>
wrote:

> Can you at least copy paste the error(s) you are seeing when the job
> fails? Without the error message(s), it's hard to even suggest anything.
>
> *Alex Rovner*
> *Director, Data Engineering *
> *o:* 646.759.0052
>
> * <http://www.magnetic.com/>*
>
> On Sat, Oct 3, 2015 at 9:50 AM, Umesh Kacha <umesh.ka...@gmail.com> wrote:
>
>> Hi thanks I cant share yarn logs because of privacy in my company but I
>> can tell you I have seen yarn logs there I have not found anything except
>> YARN killing container because it is exceeds physical memory capacity.
>>
>> I am using the following command line script Above job launches around
>> 1500 ExecutorService threads from a driver with a thread pool of 15 so at a
>> time 15 jobs will be running as showing in UI.
>>
>> ./spark-submit --class com.xyz.abc.MySparkJob
>>
>> --conf "spark.executor.extraJavaOptions=-XX:MaxPermSize=512M" -
>>
>> -driver-java-options -XX:MaxPermSize=512m -
>>
>> -driver-memory 4g --master yarn-client
>>
>> --executor-memory 27G --executor-cores 2
>>
>> --num-executors 40
>>
>> --jars /path/to/others-jars
>>
>> /path/to/spark-job.jar
>>
>>
>> On Sat, Oct 3, 2015 at 7:11 PM, Alex Rovner <alex.rov...@magnetic.com>
>> wrote:
>>
>>> Can you send over your yarn logs along with the command you are using to
>>> submit your job?
>>>
>>> *Alex Rovner*
>>> *Director, Data Engineering *
>>> *o:* 646.759.0052
>>>
>>> * <http://www.magnetic.com/>*
>>>
>>> On Sat, Oct 3, 

Re: Store DStreams into Hive using Hive Streaming

2015-10-05 Thread Umesh Kacha
Hi no didn't find any solution still I need that feature of hive streaming
using Spark please let me know if you get something. Alternative solution
is to use storm for hive processing. I would like to stick to Spark so
still searching.
On Oct 5, 2015 2:51 PM, "Krzysztof Zarzycki"  wrote:

> I'm also interested in this feature. Did you guys found some information
> about how to use Hive Streaming with Spark Streaming?
>
> Thanks,
> Krzysiek
>
> 2015-07-17 20:16 GMT+02:00 unk1102 :
>
>> Hi I have similar use case did you found solution for this problem of
>> loading
>> DStreams in Hive using Spark Streaming. Please guide. Thanks.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Store-DStreams-into-Hive-using-Hive-Streaming-tp18307p23885.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Hive ORC Malformed while loading into spark data frame

2015-10-04 Thread Umesh Kacha
Thanks much Zhan Zhang. I will open a JIRA saying orc files created using
hiveContext.sql can't be read by dataframe reader.

Regards,
Umesh
On Oct 4, 2015 10:14, "Zhan Zhang" <zzh...@hortonworks.com> wrote:

> HI Umesh,
>
> It depends on how you create and read the orc file, although everything
> happens in side of spark. Because there are two paths in spark to create
> table, one is through hive, and the other one is through data frame. Due to
> version compatibility issue,
> there may be conflicts between these two paths. You have to use
> dataframe.write and dataframe.read to avoid such issue. The ORC path has to
> be upgraded to the same version as hive to solve this issue.
>
> Because ORC becomes a independent project now, and we are waiting for it
> to be totally isolated from hive. Then we can upgrade ORC to latest
> version, and put it to SqlContext. I think you can open a JIRA to tracking
> this upgrade.
>
> BTW, my name is Zhan Zhang instead of Zang.
>
> Thanks.
>
> Zhan Zhang
>
> On Oct 3, 2015, at 2:18 AM, Umesh Kacha <umesh.ka...@gmail.com> wrote:
>
> Hi Zang any idea why is this happening? I can load ORC files created by
> Hive table but I cant load ORC files created by Spark itself. It looks like
> bug.
>
> On Wed, Sep 30, 2015 at 12:03 PM, Umesh Kacha <umesh.ka...@gmail.com>
> wrote:
>
>> Hi Zang thanks much please find the code below
>>
>> Working code loading data from a path created by Hive table using hive
>> console outside of spark :
>>
>> DataFrame df =
>> hiveContext.read().format("orc").load("/hdfs/path/to/hive/table/partition")
>>
>> Not working code inside spark hive tables created using hiveContext.sql
>> insert into partition queries
>>
>> DataFrame df =
>> hiveContext.read().format("orc").load("/hdfs/path/to/hive/table/partition/created/by/spark")
>>
>> You see above is same in both cases just second code is trying to load
>> orc data created by Spark.
>> On Sep 30, 2015 11:22 AM, "Zhan Zhang" <zzh...@hortonworks.com> wrote:
>>
>>> Hi Umesh,
>>>
>>> The potential reason is that Hive and Spark does not use same
>>> OrcInputFormat. In new hive version, there are NewOrcInputFormat, but it is
>>> not in spark because of backward compatibility (which is not available in
>>> hive-0.12).
>>> Do you mind post the code that works and not works for you?
>>>
>>> Thanks.
>>>
>>> Zhan Zhang
>>>
>>> On Sep 29, 2015, at 10:05 PM, Umesh Kacha <umesh.ka...@gmail.com> wrote:
>>>
>>> Hi I can read/load orc data created by hive table in a dataframe why is
>>> it throwing Malformed ORC exception when I try to load data created by
>>> hiveContext.sql into dataframe?
>>> On Sep 30, 2015 2:37 AM, "Hortonworks" <zzh...@hortonworks.com> wrote:
>>>
>>>> You can try to use data frame for both read and write
>>>>
>>>> Thanks
>>>>
>>>> Zhan Zhang
>>>>
>>>>
>>>> Sent from my iPhone
>>>>
>>>> On Sep 29, 2015, at 1:56 PM, Umesh Kacha <umesh.ka...@gmail.com> wrote:
>>>>
>>>> Hi Zang, thanks for the response. Table is created using Spark
>>>> hiveContext.sql and data inserted into table also using hiveContext.sql.
>>>> Insert into partition table. When I try to load orc data into dataframe I
>>>> am loading particular partition data stored in path say
>>>> /user/xyz/Hive/xyz.db/sparktable/partition1=abc
>>>>
>>>> Regards,
>>>> Umesh
>>>> On Sep 30, 2015 02:21, "Hortonworks" <zzh...@hortonworks.com> wrote:
>>>>
>>>>> How was the table is generated, by hive or by spark?
>>>>>
>>>>> If you generate table using have but read it by data frame, it may
>>>>> have some comparability issue.
>>>>>
>>>>> Thanks
>>>>>
>>>>> Zhan Zhang
>>>>>
>>>>>
>>>>> Sent from my iPhone
>>>>>
>>>>> > On Sep 29, 2015, at 1:47 PM, unk1102 <umesh.ka...@gmail.com> wrote:
>>>>> >
>>>>> > Hi I have a spark job which creates hive tables in orc format with
>>>>> > partitions. It works well I can read data back into hive table using
>>>>> hive
>>>>> > console. But if I try further process orc files generated by Spark
&

Re: How to use registered Hive UDF in Spark DataFrame?

2015-10-04 Thread Umesh Kacha
Hi I tried to use callUDF in the following way it throws exception saying
cant recognise myUDF even though I registered it.

List colList = new ArrayList();
colSeq.add(col("myColumn").as("modifiedColumn"));
Seq colSeq = JavaConversions.asScalaBuffer(colList);//I need to do
this because the following call wont accept just one col() it needs
Seq
DataFrame resultFrame =
sourceFrame.select(callUDF("MyUDF").toString(),colSeq);

Above call fails saying cant recognise ''MyUDF myColumn as modifiedColumn'
in given columns bla bla...

On Sat, Oct 3, 2015 at 2:36 AM, Michael Armbrust <mich...@databricks.com>
wrote:

> callUDF("MyUDF", col("col1").as("name")
>
> or
>
> callUDF("MyUDF", col("col1").alias("name")
>
> On Fri, Oct 2, 2015 at 3:29 PM, Umesh Kacha <umesh.ka...@gmail.com> wrote:
>
>> Hi Michael,
>>
>> Thanks much. How do we give alias name for resultant columns? For e.g.
>> when using
>>
>> hiveContext.sql("select MyUDF("test") as mytest from myTable");
>>
>> how do we do that in DataFrame callUDF
>>
>> callUDF("MyUDF", col("col1"))???
>>
>> On Fri, Oct 2, 2015 at 8:23 PM, Michael Armbrust <mich...@databricks.com>
>> wrote:
>>
>>> import org.apache.spark.sql.functions.*
>>>
>>> callUDF("MyUDF", col("col1"), col("col2"))
>>>
>>> On Fri, Oct 2, 2015 at 6:25 AM, unk1102 <umesh.ka...@gmail.com> wrote:
>>>
>>>> Hi I have registed my hive UDF using the following code:
>>>>
>>>> hiveContext.udf().register("MyUDF",new UDF1(String,String)) {
>>>> public String call(String o) throws Execption {
>>>> //bla bla
>>>> }
>>>> },DataTypes.String);
>>>>
>>>> Now I want to use above MyUDF in DataFrame. How do we use it? I know
>>>> how to
>>>> use it in a sql and it works fine
>>>>
>>>> hiveContext.sql(select MyUDF("test") from myTable);
>>>>
>>>> My hiveContext.sql() query involves group by on multiple columns so for
>>>> scaling purpose I am trying to convert this query into DataFrame APIs
>>>>
>>>>
>>>> dataframe.select("col1","col2","coln").groupby(""col1","col2","coln").count();
>>>>
>>>> Can we do the follwing dataframe.select(MyUDF("col1"))??? Please guide.
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context:
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-registered-Hive-UDF-in-Spark-DataFrame-tp24907.html
>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>>
>>>> -
>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>
>>>>
>>>
>>
>


Re: How to optimize group by query fired using hiveContext.sql?

2015-10-03 Thread Umesh Kacha
Hi Alex thanks much for the reply. Please read the following for more
details about my problem.

http://stackoverflow.com/questions/32317285/spark-executor-oom-issue-on-yarn

My each container has 8 core and 30 GB max memory. So I am using
yarn-client mode using 40 executors with 27GB/2 cores. If I use more cores
then my job start loosing more executors. I tried to set
spark.yarn.executor.memoryOverhead around 2 GB even 8 GB but it does not
help I loose executors no matter what. The reason is my jobs shuffles lots
of data even 20 GB of data in every job in UI I have seen it. Shuffle
happens because of group by and I cant avoid it in my case.



On Sat, Oct 3, 2015 at 6:27 PM, Alex Rovner 
wrote:

> This sounds like you need to increase YARN overhead settings with the 
> "spark.yarn.executor.memoryOverhead"
> parameter. See http://spark.apache.org/docs/latest/running-on-yarn.html
> for more information on the setting.
>
> If that does not work for you, please provide the error messages and the
> command line you are using to submit your jobs for further troubleshooting.
>
>
> *Alex Rovner*
> *Director, Data Engineering *
> *o:* 646.759.0052
>
> * *
>
> On Sat, Oct 3, 2015 at 6:19 AM, unk1102  wrote:
>
>> Hi I have couple of Spark jobs which uses group by query which is getting
>> fired from hiveContext.sql() Now I know group by is evil but my use case I
>> cant avoid group by I have around 7-8 fields on which I need to do group
>> by.
>> Also I am using df1.except(df2) which also seems heavy operation and does
>> lots of shuffling please see my UI snap
>> <
>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n24914/IMG_20151003_151830218.jpg
>> >
>>
>> I have tried almost all optimisation including Spark 1.5 but nothing seems
>> to be working and my job fails hangs because of executor will reach
>> physical
>> memory limit and YARN will kill it. I have around 1TB of data to process
>> and
>> it is skewed. Please guide.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-optimize-group-by-query-fired-using-hiveContext-sql-tp24914.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Hive ORC Malformed while loading into spark data frame

2015-10-03 Thread Umesh Kacha
Hi Zang any idea why is this happening? I can load ORC files created by
Hive table but I cant load ORC files created by Spark itself. It looks like
bug.

On Wed, Sep 30, 2015 at 12:03 PM, Umesh Kacha <umesh.ka...@gmail.com> wrote:

> Hi Zang thanks much please find the code below
>
> Working code loading data from a path created by Hive table using hive
> console outside of spark :
>
> DataFrame df =
> hiveContext.read().format("orc").load("/hdfs/path/to/hive/table/partition")
>
> Not working code inside spark hive tables created using hiveContext.sql
> insert into partition queries
>
> DataFrame df =
> hiveContext.read().format("orc").load("/hdfs/path/to/hive/table/partition/created/by/spark")
>
> You see above is same in both cases just second code is trying to load orc
> data created by Spark.
> On Sep 30, 2015 11:22 AM, "Zhan Zhang" <zzh...@hortonworks.com> wrote:
>
>> Hi Umesh,
>>
>> The potential reason is that Hive and Spark does not use same
>> OrcInputFormat. In new hive version, there are NewOrcInputFormat, but it is
>> not in spark because of backward compatibility (which is not available in
>> hive-0.12).
>> Do you mind post the code that works and not works for you?
>>
>> Thanks.
>>
>> Zhan Zhang
>>
>> On Sep 29, 2015, at 10:05 PM, Umesh Kacha <umesh.ka...@gmail.com> wrote:
>>
>> Hi I can read/load orc data created by hive table in a dataframe why is
>> it throwing Malformed ORC exception when I try to load data created by
>> hiveContext.sql into dataframe?
>> On Sep 30, 2015 2:37 AM, "Hortonworks" <zzh...@hortonworks.com> wrote:
>>
>>> You can try to use data frame for both read and write
>>>
>>> Thanks
>>>
>>> Zhan Zhang
>>>
>>>
>>> Sent from my iPhone
>>>
>>> On Sep 29, 2015, at 1:56 PM, Umesh Kacha <umesh.ka...@gmail.com> wrote:
>>>
>>> Hi Zang, thanks for the response. Table is created using Spark
>>> hiveContext.sql and data inserted into table also using hiveContext.sql.
>>> Insert into partition table. When I try to load orc data into dataframe I
>>> am loading particular partition data stored in path say
>>> /user/xyz/Hive/xyz.db/sparktable/partition1=abc
>>>
>>> Regards,
>>> Umesh
>>> On Sep 30, 2015 02:21, "Hortonworks" <zzh...@hortonworks.com> wrote:
>>>
>>>> How was the table is generated, by hive or by spark?
>>>>
>>>> If you generate table using have but read it by data frame, it may have
>>>> some comparability issue.
>>>>
>>>> Thanks
>>>>
>>>> Zhan Zhang
>>>>
>>>>
>>>> Sent from my iPhone
>>>>
>>>> > On Sep 29, 2015, at 1:47 PM, unk1102 <umesh.ka...@gmail.com> wrote:
>>>> >
>>>> > Hi I have a spark job which creates hive tables in orc format with
>>>> > partitions. It works well I can read data back into hive table using
>>>> hive
>>>> > console. But if I try further process orc files generated by Spark
>>>> job by
>>>> > loading into dataframe  then I get the following exception
>>>> > Caused by: java.io.IOException: Malformed ORC file
>>>> > hdfs://localhost:9000/user/hive/warehouse/partorc/part_tiny.txt.
>>>> Invalid
>>>> > postscript.
>>>> >
>>>> > Dataframe df = hiveContext.read().format("orc").load(to/path);
>>>> >
>>>> > Please guide.
>>>> >
>>>> >
>>>> >
>>>> > --
>>>> > View this message in context:
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Hive-ORC-Malformed-while-loading-into-spark-data-frame-tp24876.html
>>>> > Sent from the Apache Spark User List mailing list archive at
>>>> Nabble.com <http://nabble.com/>.
>>>> >
>>>> > -
>>>> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>> > For additional commands, e-mail: user-h...@spark.apache.org
>>>> >
>>>> >
>>>>
>>>> --
>>>> CONFIDENTIALITY NOTICE
>>>> NOTICE: This message is intended for the use of the individual or
>>>> entity to
>>>> which it is addressed and may contain information that is confidential,
>>>> privileged and exempt from disclosure under applicable law. If the
>>>> reader
>>>> of this message is not the intended recipient, you are hereby notified
>>>> that
>>>> any printing, copying, dissemination, distribution, disclosure or
>>>> forwarding of this communication is strictly prohibited. If you have
>>>> received this communication in error, please contact the sender
>>>> immediately
>>>> and delete it from your system. Thank You.
>>>>
>>>
>>> CONFIDENTIALITY NOTICE
>>> NOTICE: This message is intended for the use of the individual or entity
>>> to which it is addressed and may contain information that is confidential,
>>> privileged and exempt from disclosure under applicable law. If the reader
>>> of this message is not the intended recipient, you are hereby notified that
>>> any printing, copying, dissemination, distribution, disclosure or
>>> forwarding of this communication is strictly prohibited. If you have
>>> received this communication in error, please contact the sender immediately
>>> and delete it from your system. Thank You.
>>
>>
>>


Re: How to optimize group by query fired using hiveContext.sql?

2015-10-03 Thread Umesh Kacha
Hi thanks I cant share yarn logs because of privacy in my company but I can
tell you I have seen yarn logs there I have not found anything except YARN
killing container because it is exceeds physical memory capacity.

I am using the following command line script Above job launches around 1500
ExecutorService threads from a driver with a thread pool of 15 so at a time
15 jobs will be running as showing in UI.

./spark-submit --class com.xyz.abc.MySparkJob

--conf "spark.executor.extraJavaOptions=-XX:MaxPermSize=512M" -

-driver-java-options -XX:MaxPermSize=512m -

-driver-memory 4g --master yarn-client

--executor-memory 27G --executor-cores 2

--num-executors 40

--jars /path/to/others-jars

/path/to/spark-job.jar


On Sat, Oct 3, 2015 at 7:11 PM, Alex Rovner <alex.rov...@magnetic.com>
wrote:

> Can you send over your yarn logs along with the command you are using to
> submit your job?
>
> *Alex Rovner*
> *Director, Data Engineering *
> *o:* 646.759.0052
>
> * <http://www.magnetic.com/>*
>
> On Sat, Oct 3, 2015 at 9:07 AM, Umesh Kacha <umesh.ka...@gmail.com> wrote:
>
>> Hi Alex thanks much for the reply. Please read the following for more
>> details about my problem.
>>
>>
>> http://stackoverflow.com/questions/32317285/spark-executor-oom-issue-on-yarn
>>
>> My each container has 8 core and 30 GB max memory. So I am using
>> yarn-client mode using 40 executors with 27GB/2 cores. If I use more cores
>> then my job start loosing more executors. I tried to set
>> spark.yarn.executor.memoryOverhead around 2 GB even 8 GB but it does not
>> help I loose executors no matter what. The reason is my jobs shuffles lots
>> of data even 20 GB of data in every job in UI I have seen it. Shuffle
>> happens because of group by and I cant avoid it in my case.
>>
>>
>>
>> On Sat, Oct 3, 2015 at 6:27 PM, Alex Rovner <alex.rov...@magnetic.com>
>> wrote:
>>
>>> This sounds like you need to increase YARN overhead settings with the 
>>> "spark.yarn.executor.memoryOverhead"
>>> parameter. See http://spark.apache.org/docs/latest/running-on-yarn.html
>>> for more information on the setting.
>>>
>>> If that does not work for you, please provide the error messages and the
>>> command line you are using to submit your jobs for further troubleshooting.
>>>
>>>
>>> *Alex Rovner*
>>> *Director, Data Engineering *
>>> *o:* 646.759.0052
>>>
>>> * <http://www.magnetic.com/>*
>>>
>>> On Sat, Oct 3, 2015 at 6:19 AM, unk1102 <umesh.ka...@gmail.com> wrote:
>>>
>>>> Hi I have couple of Spark jobs which uses group by query which is
>>>> getting
>>>> fired from hiveContext.sql() Now I know group by is evil but my use
>>>> case I
>>>> cant avoid group by I have around 7-8 fields on which I need to do
>>>> group by.
>>>> Also I am using df1.except(df2) which also seems heavy operation and
>>>> does
>>>> lots of shuffling please see my UI snap
>>>> <
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n24914/IMG_20151003_151830218.jpg
>>>> >
>>>>
>>>> I have tried almost all optimisation including Spark 1.5 but nothing
>>>> seems
>>>> to be working and my job fails hangs because of executor will reach
>>>> physical
>>>> memory limit and YARN will kill it. I have around 1TB of data to
>>>> process and
>>>> it is skewed. Please guide.
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context:
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-optimize-group-by-query-fired-using-hiveContext-sql-tp24914.html
>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>>
>>>> -
>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>
>>>>
>>>
>>
>


Re: How to use registered Hive UDF in Spark DataFrame?

2015-10-02 Thread Umesh Kacha
Hi Michael,

Thanks much. How do we give alias name for resultant columns? For e.g. when
using

hiveContext.sql("select MyUDF("test") as mytest from myTable");

how do we do that in DataFrame callUDF

callUDF("MyUDF", col("col1"))???

On Fri, Oct 2, 2015 at 8:23 PM, Michael Armbrust 
wrote:

> import org.apache.spark.sql.functions.*
>
> callUDF("MyUDF", col("col1"), col("col2"))
>
> On Fri, Oct 2, 2015 at 6:25 AM, unk1102  wrote:
>
>> Hi I have registed my hive UDF using the following code:
>>
>> hiveContext.udf().register("MyUDF",new UDF1(String,String)) {
>> public String call(String o) throws Execption {
>> //bla bla
>> }
>> },DataTypes.String);
>>
>> Now I want to use above MyUDF in DataFrame. How do we use it? I know how
>> to
>> use it in a sql and it works fine
>>
>> hiveContext.sql(select MyUDF("test") from myTable);
>>
>> My hiveContext.sql() query involves group by on multiple columns so for
>> scaling purpose I am trying to convert this query into DataFrame APIs
>>
>>
>> dataframe.select("col1","col2","coln").groupby(""col1","col2","coln").count();
>>
>> Can we do the follwing dataframe.select(MyUDF("col1"))??? Please guide.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-registered-Hive-UDF-in-Spark-DataFrame-tp24907.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Hive ORC Malformed while loading into spark data frame

2015-09-30 Thread Umesh Kacha
Hi Zang thanks much please find the code below

Working code loading data from a path created by Hive table using hive
console outside of spark :

DataFrame df =
hiveContext.read().format("orc").load("/hdfs/path/to/hive/table/partition")

Not working code inside spark hive tables created using hiveContext.sql
insert into partition queries

DataFrame df =
hiveContext.read().format("orc").load("/hdfs/path/to/hive/table/partition/created/by/spark")

You see above is same in both cases just second code is trying to load orc
data created by Spark.
On Sep 30, 2015 11:22 AM, "Zhan Zhang" <zzh...@hortonworks.com> wrote:

> Hi Umesh,
>
> The potential reason is that Hive and Spark does not use same
> OrcInputFormat. In new hive version, there are NewOrcInputFormat, but it is
> not in spark because of backward compatibility (which is not available in
> hive-0.12).
> Do you mind post the code that works and not works for you?
>
> Thanks.
>
> Zhan Zhang
>
> On Sep 29, 2015, at 10:05 PM, Umesh Kacha <umesh.ka...@gmail.com> wrote:
>
> Hi I can read/load orc data created by hive table in a dataframe why is it
> throwing Malformed ORC exception when I try to load data created by
> hiveContext.sql into dataframe?
> On Sep 30, 2015 2:37 AM, "Hortonworks" <zzh...@hortonworks.com> wrote:
>
>> You can try to use data frame for both read and write
>>
>> Thanks
>>
>> Zhan Zhang
>>
>>
>> Sent from my iPhone
>>
>> On Sep 29, 2015, at 1:56 PM, Umesh Kacha <umesh.ka...@gmail.com> wrote:
>>
>> Hi Zang, thanks for the response. Table is created using Spark
>> hiveContext.sql and data inserted into table also using hiveContext.sql.
>> Insert into partition table. When I try to load orc data into dataframe I
>> am loading particular partition data stored in path say
>> /user/xyz/Hive/xyz.db/sparktable/partition1=abc
>>
>> Regards,
>> Umesh
>> On Sep 30, 2015 02:21, "Hortonworks" <zzh...@hortonworks.com> wrote:
>>
>>> How was the table is generated, by hive or by spark?
>>>
>>> If you generate table using have but read it by data frame, it may have
>>> some comparability issue.
>>>
>>> Thanks
>>>
>>> Zhan Zhang
>>>
>>>
>>> Sent from my iPhone
>>>
>>> > On Sep 29, 2015, at 1:47 PM, unk1102 <umesh.ka...@gmail.com> wrote:
>>> >
>>> > Hi I have a spark job which creates hive tables in orc format with
>>> > partitions. It works well I can read data back into hive table using
>>> hive
>>> > console. But if I try further process orc files generated by Spark job
>>> by
>>> > loading into dataframe  then I get the following exception
>>> > Caused by: java.io.IOException: Malformed ORC file
>>> > hdfs://localhost:9000/user/hive/warehouse/partorc/part_tiny.txt.
>>> Invalid
>>> > postscript.
>>> >
>>> > Dataframe df = hiveContext.read().format("orc").load(to/path);
>>> >
>>> > Please guide.
>>> >
>>> >
>>> >
>>> > --
>>> > View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Hive-ORC-Malformed-while-loading-into-spark-data-frame-tp24876.html
>>> > Sent from the Apache Spark User List mailing list archive at
>>> Nabble.com <http://nabble.com/>.
>>> >
>>> > -
>>> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> > For additional commands, e-mail: user-h...@spark.apache.org
>>> >
>>> >
>>>
>>> --
>>> CONFIDENTIALITY NOTICE
>>> NOTICE: This message is intended for the use of the individual or entity
>>> to
>>> which it is addressed and may contain information that is confidential,
>>> privileged and exempt from disclosure under applicable law. If the reader
>>> of this message is not the intended recipient, you are hereby notified
>>> that
>>> any printing, copying, dissemination, distribution, disclosure or
>>> forwarding of this communication is strictly prohibited. If you have
>>> received this communication in error, please contact the sender
>>> immediately
>>> and delete it from your system. Thank You.
>>>
>>
>> CONFIDENTIALITY NOTICE
>> NOTICE: This message is intended for the use of the individual or entity
>> to which it is addressed and may contain information that is confidential,
>> privileged and exempt from disclosure under applicable law. If the reader
>> of this message is not the intended recipient, you are hereby notified that
>> any printing, copying, dissemination, distribution, disclosure or
>> forwarding of this communication is strictly prohibited. If you have
>> received this communication in error, please contact the sender immediately
>> and delete it from your system. Thank You.
>
>
>


Re: Hive ORC Malformed while loading into spark data frame

2015-09-29 Thread Umesh Kacha
Hi Zang, thanks for the response. Table is created using Spark
hiveContext.sql and data inserted into table also using hiveContext.sql.
Insert into partition table. When I try to load orc data into dataframe I
am loading particular partition data stored in path say
/user/xyz/Hive/xyz.db/sparktable/partition1=abc

Regards,
Umesh
On Sep 30, 2015 02:21, "Hortonworks"  wrote:

> How was the table is generated, by hive or by spark?
>
> If you generate table using have but read it by data frame, it may have
> some comparability issue.
>
> Thanks
>
> Zhan Zhang
>
>
> Sent from my iPhone
>
> > On Sep 29, 2015, at 1:47 PM, unk1102  wrote:
> >
> > Hi I have a spark job which creates hive tables in orc format with
> > partitions. It works well I can read data back into hive table using hive
> > console. But if I try further process orc files generated by Spark job by
> > loading into dataframe  then I get the following exception
> > Caused by: java.io.IOException: Malformed ORC file
> > hdfs://localhost:9000/user/hive/warehouse/partorc/part_tiny.txt. Invalid
> > postscript.
> >
> > Dataframe df = hiveContext.read().format("orc").load(to/path);
> >
> > Please guide.
> >
> >
> >
> > --
> > View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Hive-ORC-Malformed-while-loading-into-spark-data-frame-tp24876.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > -
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > For additional commands, e-mail: user-h...@spark.apache.org
> >
> >
>
> --
> CONFIDENTIALITY NOTICE
> NOTICE: This message is intended for the use of the individual or entity to
> which it is addressed and may contain information that is confidential,
> privileged and exempt from disclosure under applicable law. If the reader
> of this message is not the intended recipient, you are hereby notified that
> any printing, copying, dissemination, distribution, disclosure or
> forwarding of this communication is strictly prohibited. If you have
> received this communication in error, please contact the sender immediately
> and delete it from your system. Thank You.
>


Re: Hive ORC Malformed while loading into spark data frame

2015-09-29 Thread Umesh Kacha
Hi I can read/load orc data created by hive table in a dataframe why is it
throwing Malformed ORC exception when I try to load data created by
hiveContext.sql into dataframe?
On Sep 30, 2015 2:37 AM, "Hortonworks" <zzh...@hortonworks.com> wrote:

> You can try to use data frame for both read and write
>
> Thanks
>
> Zhan Zhang
>
>
> Sent from my iPhone
>
> On Sep 29, 2015, at 1:56 PM, Umesh Kacha <umesh.ka...@gmail.com> wrote:
>
> Hi Zang, thanks for the response. Table is created using Spark
> hiveContext.sql and data inserted into table also using hiveContext.sql.
> Insert into partition table. When I try to load orc data into dataframe I
> am loading particular partition data stored in path say
> /user/xyz/Hive/xyz.db/sparktable/partition1=abc
>
> Regards,
> Umesh
> On Sep 30, 2015 02:21, "Hortonworks" <zzh...@hortonworks.com> wrote:
>
>> How was the table is generated, by hive or by spark?
>>
>> If you generate table using have but read it by data frame, it may have
>> some comparability issue.
>>
>> Thanks
>>
>> Zhan Zhang
>>
>>
>> Sent from my iPhone
>>
>> > On Sep 29, 2015, at 1:47 PM, unk1102 <umesh.ka...@gmail.com> wrote:
>> >
>> > Hi I have a spark job which creates hive tables in orc format with
>> > partitions. It works well I can read data back into hive table using
>> hive
>> > console. But if I try further process orc files generated by Spark job
>> by
>> > loading into dataframe  then I get the following exception
>> > Caused by: java.io.IOException: Malformed ORC file
>> > hdfs://localhost:9000/user/hive/warehouse/partorc/part_tiny.txt. Invalid
>> > postscript.
>> >
>> > Dataframe df = hiveContext.read().format("orc").load(to/path);
>> >
>> > Please guide.
>> >
>> >
>> >
>> > --
>> > View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Hive-ORC-Malformed-while-loading-into-spark-data-frame-tp24876.html
>> > Sent from the Apache Spark User List mailing list archive at Nabble.com
>> <http://nabble.com>.
>> >
>> > -
>> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> > For additional commands, e-mail: user-h...@spark.apache.org
>> >
>> >
>>
>> --
>> CONFIDENTIALITY NOTICE
>> NOTICE: This message is intended for the use of the individual or entity
>> to
>> which it is addressed and may contain information that is confidential,
>> privileged and exempt from disclosure under applicable law. If the reader
>> of this message is not the intended recipient, you are hereby notified
>> that
>> any printing, copying, dissemination, distribution, disclosure or
>> forwarding of this communication is strictly prohibited. If you have
>> received this communication in error, please contact the sender
>> immediately
>> and delete it from your system. Thank You.
>>
>
> CONFIDENTIALITY NOTICE
> NOTICE: This message is intended for the use of the individual or entity
> to which it is addressed and may contain information that is confidential,
> privileged and exempt from disclosure under applicable law. If the reader
> of this message is not the intended recipient, you are hereby notified that
> any printing, copying, dissemination, distribution, disclosure or
> forwarding of this communication is strictly prohibited. If you have
> received this communication in error, please contact the sender immediately
> and delete it from your system. Thank You.


Re: Why my Spark job is slow and it throws OOM which leads YARN killing executors?

2015-09-12 Thread Umesh Kacha
Hi Richard, thanks much for the reply. If I dont create threads job runs
too slow since I have thousand jobs or thousand hive partitions directory
to process. hiveContext.sql(...) runs fine and creates output as I expected
do I need to call any action method really? Job works fine as expected I am
just hitting physical memory limit and YARN kills executor I saw it in YARN
logs. I believe because of group by queries lots shuffle data moves around
and creates mess. Please guide.

On Sun, Sep 13, 2015 at 2:09 AM, Richard W. Eggert II <
richard.egg...@gmail.com> wrote:

> Without a stack trace, I can't say for certain what is causing your
> OutOfMemoryError, but I do see a number of problems with your code.
>
> First of all, given that Spark is a parallel processing framework, it is
> almost never necessary to manually create a thread pool within the driver.
> You should instead chain together some RDDs and let Spark parallelize the
> work on the cluster for you.
>
> Secondly, unless I'm mistaken, SQLContext.sql (which HiveContext inherits)
> does not actually execute your SQL query. It just creates a DataFrame that
> represents the query. You have to invoke one of DataFrame's "action"
> methods (such as count, collect, foreach, or saveAsTextFile) to cause Spark
> to create a job to actually execute the query. The documentation is
> admittedly a bit vague and misleading about this, however.
>
> Rich
>
> On September 12, 2015, at 3:52 PM, unk1102  wrote:
>
> Hi I have the following Spark driver program/job which reads ORC files
> (i.e.
> hive partitions as HDFS directories) process them in DataFrame and use them
> as table in hiveContext.sql(). Job runs fine it gives correct results but
> it
> hits physical memory limit after one hour or so and YARN kills executor and
> things gets slower and slower. Please see the following code and help me
> identify problem. I created 20 Threads from driver program and spawn them.
> Thread logic contains lambda function which gets executed on executors.
> Please guide I am new to Spark. Thanks much.
>
>   public class DataSpark {
>
> public static final Map dMap = new
> LinkedHashMap<>();
>
> public static final String[] colNameArr = new String[]
> {"_col0","col2","bla bla 45 columns"};
>
> public static void main(String[] args) throws Exception {
>
>
> Set workers = new HashSet<>();
>
> SparkConf sparkConf = new SparkConf().setAppName("SparkTest");
> setSparkConfProperties(sparkConf);
> SparkContext sc = new SparkContext(sparkConf);
> final FileSystem fs = FileSystem.get(sc.hadoopConfiguration());
> HiveContext hiveContext = createHiveContext(sc);
>
> declareHiveUDFs(hiveContext);
>
> DateTimeFormatter df = DateTimeFormat.forPattern("MMdd");
> String yestday = "20150912";
> hiveContext.sql(" use xyz ");
> createTables(hiveContext);
> DataFrame partitionFrame = hiveContext.sql(" show partitions
> data partition(date=\""+ yestday + "\")");
>
> //add csv files to distributed cache
> Row[] rowArr = partitionFrame.collect();
> for(Row row : rowArr) {
> String[] splitArr = row.getString(0).split("/");
> String entity = splitArr[0].split("=")[1];
> int date =  Integer.parseInt(splitArr[1].split("=")[1]);
>
> String sourcePath =
> "/user/xyz/Hive/xyz.db/data/entity="+entity+"/date="+date;
> Path spath = new Path(sourcePath);
> if(fs.getContentSummary(spath).getFileCount() > 0) {
> DataWorker worker = new DataWorker(hiveContext,entity,
> date);
> workers.add(worker);
> }
> }
>
> ExecutorService executorService =
> Executors.newFixedThreadPool(20);
> executorService.invokeAll(workers);
> executorService.shutdown();
>
>
> sc.stop();
> }
>
> private static void setSparkConfProperties(SparkConf sparkConf) {
> sparkConf.set("spark.rdd.compress","true");
>
> sparkConf.set("spark.shuffle.consolidateFiles","true");
>
> sparkConf.set("spark.executor.logs.rolling.maxRetainedFiles","90");
> sparkConf.set("spark.executor.logs.rolling.strategy","time");
> sparkConf.set("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer");
> sparkConf.set("spark.shuffle.manager","tungsten-sort");
>
>sparkConf.set("spark.shuffle.memoryFraction","0.5");
>sparkConf.set("spark.storage.memoryFraction","0.2");
>
> }
>
> private static HiveContext createHiveContext(SparkContext sc) {
> HiveContext hiveContext = new HiveContext(sc);
> hiveContext.setConf("spark.sql.codgen","true");
> 

Re: How to enable Tungsten in Spark 1.5 for Spark SQL?

2015-09-10 Thread Umesh Kacha
Nice Ted thanks much highest performance without any configuration changes
amazed!  Looking forward to running Spark 1.5 on my 2 tb skewed data which
involves group by union etc any other tips if you know for spark 1.5
On Sep 10, 2015 8:12 PM, "Ted Yu"  wrote:

> Please see the following
> in sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala :
>
>   val TUNGSTEN_ENABLED = booleanConf("spark.sql.tungsten.enabled",
> defaultValue = Some(true),
> doc = "When true, use the optimized Tungsten physical execution
> backend which explicitly " +
>   "manages memory and dynamically generates bytecode for
> expression evaluation.")
>
>   val CODEGEN_ENABLED = booleanConf("spark.sql.codegen",
> defaultValue = Some(true),  // use TUNGSTEN_ENABLED as default
> doc = "When true, code will be dynamically generated at runtime for
> expression evaluation in" +
>   " a specific query.",
> isPublic = false)
>
>   val UNSAFE_ENABLED = booleanConf("spark.sql.unsafe.enabled",
> defaultValue = Some(true),  // use TUNGSTEN_ENABLED as default
> doc = "When true, use the new optimized Tungsten physical execution
> backend.",
> isPublic = false)
>
> Cheers
>
> On Thu, Sep 10, 2015 at 7:39 AM, unk1102  wrote:
>
>> Hi Spark 1.5 looks promising how do we enable project tungsten for spark
>> sql
>> or is it enabled by default please guide.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-enable-Tungsten-in-Spark-1-5-for-Spark-SQL-tp24642.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Why is huge data shuffling in Spark when using union()/coalesce(1,false) on DataFrame?

2015-09-09 Thread Umesh Kacha
Hi Richard, thanks for the response. My use case is weird I need to process
data row by row for one partition and update required rows. Updated rows
percentage would be 30%. As per above stackoverflow.com answer suggestions
I refactored code to use mappartitionswithindex

JavaRDD indexedRdd = sourceRdd.cache().mapPartitionsWithIndex(new
Function2() { @Override public
Iterator call(Integer ind, Iterator rowIterator) throws Exception
{ List rowList = new ArrayList<>(); while (rowIterator.hasNext()) {
Row row = rowIterator.next(); List rowAsList =
iterate(JavaConversions.seqAsJavaList(row.toSeq())); Row updatedRow =
RowFactory.create(rowAsList.toArray()); rowList.add(updatedRow); } return
rowList.iterator(); } }, true). union(remainingrdd).coalesce(200,true);

Above code still hits memory limits as I have 2 tb data to process and
above resulted rdd I use to create DataFrame which again I use it to
register as temp table using hiveContext and execute few insert into
partitions query using hiveContext.sql

Please help me optimize above code.
On Sep 9, 2015 2:55 AM, "Richard Marscher"  wrote:

> Hi,
>
> what is the reasoning behind the use of `coalesce(1,false)`? This is
> saying to aggregate all data into a single partition, which must fit in
> memory on one node in the Spark cluster. If the cluster has more than one
> node it must shuffle to move the data. It doesn't seem like the following
> map or union necessitate coalesce, but the use case is not clear to me.
>
> On Fri, Sep 4, 2015 at 12:29 PM, unk1102  wrote:
>
>> Hi I have Spark job which does some processing on ORC data and stores back
>> ORC data using DataFrameWriter save() API introduced in Spark 1.4.0. I
>> have
>> the following piece of code which is using heavy shuffle memory. How do I
>> optimize below code? Is there anything wrong with it? It is working fine
>> as
>> expected only causing slowness because of GC pause and shuffles lots of
>> data
>> so hitting memory issues. Please guide I am new to Spark. Thanks in
>> advance.
>>
>> JavaRDD updatedDsqlRDD = orderedFrame.toJavaRDD().coalesce(1,
>> false).map(new Function() {
>>@Override
>>public Row call(Row row) throws Exception {
>> List rowAsList;
>> Row row1 = null;
>> if (row != null) {
>>   rowAsList = iterate(JavaConversions.seqAsJavaList(row.toSeq()));
>>   row1 = RowFactory.create(rowAsList.toArray());
>> }
>> return row1;
>>}
>> }).union(modifiedRDD);
>> DataFrame updatedDataFrame =
>> hiveContext.createDataFrame(updatedDsqlRDD,renamedSourceFrame.schema());
>>
>> updatedDataFrame.write().mode(SaveMode.Append).format("orc").partitionBy("entity",
>> "date").save("baseTable");
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Why-is-huge-data-shuffling-in-Spark-when-using-union-coalesce-1-false-on-DataFrame-tp24581.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>
>
> --
> *Richard Marscher*
> Software Engineer
> Localytics
> Localytics.com  | Our Blog
>  | Twitter  |
> Facebook  | LinkedIn
> 
>


Re: NPE while reading ORC file using Spark 1.4 API

2015-09-08 Thread Umesh Kacha
Hi Zhan, thanks for the reply. Yes schema should be same actually I am
reading Hive table partitions as ORC format into Spark. So I believe it
should be same. I am new to Hive so dont know if schema can be different in
Hive partitioned table.

On Wed, Sep 9, 2015 at 12:16 AM, Zhan Zhang  wrote:

> Does your directory includes some orc files that is not having same schema
> of the table? Please refer to
> https://issues.apache.org/jira/browse/SPARK-10304
>
> Thanks.
>
> Zhan Zhang
>
> On Sep 8, 2015, at 11:39 AM, unk1102  wrote:
>
> Hi I read many ORC files in Spark and process it those files are basically
> Hive partitions. Most of the times processing goes well but for few files I
> get the following exception dont know why? These files are working fine in
> Hive using Hive queries. Please guide. Thanks in advance.
>
> DataFrame df = hiveContext.read().format("orc").load("/path/in/hdfs");
>
> java.lang.NullPointerException
>at
>
> org.apache.spark.sql.hive.HiveInspectors$class.unwrapperFor(HiveInspectors.scala:402)
>at
>
> org.apache.spark.sql.hive.orc.OrcTableScan.unwrapperFor(OrcRelation.scala:206)
>at
>
> org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$8.apply(OrcRelation.scala:238)
>at
>
> org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$8.apply(OrcRelation.scala:238)
>at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>at scala.collection.immutable.List.foreach(List.scala:318)
>at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>at
> org.apache.spark.sql.hive.orc.OrcTableScan.org
> $apache$spark$sql$hive$orc$OrcTableScan$$fillObject(OrcRelation.scala:238)
>at
>
> org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$execute$3.apply(OrcRelation.scala:290)
>at
>
> org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$execute$3.apply(OrcRelation.scala:288)
>at
>
> org.apache.spark.rdd.HadoopRDD$HadoopMapPartitionsWithSplitRDD.compute(HadoopRDD.scala:380)
>at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
>at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
>at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
>at org.apache.spark.scheduler.Task.run(Task.scala:70)
>at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>at java.lang.Thread.run(Thread.java:744)
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/NPE-while-reading-ORC-file-using-Spark-1-4-API-tp24609.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
>
>


Re: Spark executor OOM issue on YARN

2015-08-31 Thread Umesh Kacha
Hi Ted thanks I know by default spark.sql.shuffle.partition are 200. It
would be great if you help me solve OOM issue.

On Mon, Aug 31, 2015 at 11:43 PM, Ted Yu  wrote:

> Please see this thread w.r.t. spark.sql.shuffle.partitions :
> http://search-hadoop.com/m/q3RTtE7JOv1bDJtY
>
> FYI
>
> On Mon, Aug 31, 2015 at 11:03 AM, unk1102  wrote:
>
>> Hi I have Spark job and its executors hits OOM issue after some time and
>> my
>> job hangs because of it followed by couple of IOException, Rpc client
>> disassociated, shuffle not found etc
>>
>> I have tried almost everything dont know how do I solve this OOM issue
>> please guide I am fed up now. Here what I tried but nothing worked
>>
>> -I tried 60 executors with each executor having 12 Gig/2 core
>> -I tried 30 executors with each executor having 20 Gig/2 core
>> -I tried 40 executors with each executor having 30 Gig/6 core (I also
>> tried
>> 7 and 8 core)
>> -I tried to set spark.storage.memoryFraction to 0.2 in order to solve OOM
>> issue I also tried to set it 0.0
>> -I tried to set spark.shuffle.memoryFraction to 0.4 since I need more
>> shuffling memory
>> -I tried to set spark.default.parallelism to 500,1000,1500 but it did not
>> help avoid OOM what is the ideal value for it?
>> -I also tried to set spark.sql.shuffle.partitions to 500 but it did not
>> help
>> it just creates 500 output part files. Please make me understand
>> difference
>> between spark.default.parallelism and spark.sql.shuffle.partitions.
>>
>> My data is skewed but not that much large I dont understand why it is
>> hitting OOM I dont cache anything I jsut have four group by queries I am
>> calling using hivecontext.sql(). I have around 1000 threads which I spawn
>> from driver and each thread will execute these four queries.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-executor-OOM-issue-on-YARN-tp24522.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: spark.sql.shuffle.partitions=1 seems to be working fine but creates timeout for large skewed data

2015-08-20 Thread Umesh Kacha
Hi Hemant sorry for the confusion I meant final output part files in the
final directory hdfs I never meant intermediate files. Thanks. My goal is
to reduce those many files because of my use case explained in the first
email with calculations.
On Aug 20, 2015 5:59 PM, Hemant Bhanawat hemant9...@gmail.com wrote:

 Sorry, I misread your mail. Thanks for pointing that out.

 BTW, are the 8 files shuffle intermediate output and not the final
 output? I assume yes. I didn't know that you can keep intermediate output
 on HDFS and I don't think that is recommended.




 On Thu, Aug 20, 2015 at 2:43 PM, Hemant Bhanawat hemant9...@gmail.com
 wrote:

 Looks like you are using hash based shuffling and not sort based
 shuffling which creates a single file per maptask.

 On Thu, Aug 20, 2015 at 12:43 AM, unk1102 umesh.ka...@gmail.com wrote:

 Hi I have a Spark job which deals with large skewed dataset. I have
 around
 1000 Hive partitions to process in four different tables every day. So
 if I
 go with 200 spark.sql.shuffle.partitions default partitions created by
 Spark
 I end up with 4 * 1000 * 200 = 8 small small files in HDFS which
 wont be
 good for HDFS name node I have been told if you keep on creating such
 large
 no of small small files namenode will crash is it true? please help me
 understand. Anyways so to avoid creating small files I did set
 spark.sql.shuffle.partitions=1 it seems to be creating 1 output file and
 as
 per my understanding because of only one output there is so much
 shuffling
 to do to bring all data to once reducer please correct me if I am wrong.
 This is causing memory/timeout issues how do I deal with it

 I tried to give spark.shuffle.storage=0.7 also still this memory seems
 not
 enough for it. I have 25 gb executor with 4 cores and 20 such executors
 still Spark job fails please guide.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/spark-sql-shuffle-partitions-1-seems-to-be-working-fine-but-creates-timeout-for-large-skewed-data-tp24346.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






Re: How to avoid executor time out on yarn spark while dealing with large shuffle skewed data?

2015-08-20 Thread Umesh Kacha
Hi where do I see GC time in UI? I have set spark.yarn.executor.memoryOverhead
as 3500 which seems to be good enough I believe. So you mean only GC could
be the reason behind timeout I checked Yarn logs I did not see any GC error
there. Please guide. Thanks much.

On Thu, Aug 20, 2015 at 8:14 PM, Sandy Ryza sandy.r...@cloudera.com wrote:

 Moving this back onto user@

 Regarding GC, can you look in the web UI and see whether the GC time
 metric dominates the amount of time spent on each task (or at least the
 tasks that aren't completing)?

 Also, have you tried bumping your spark.yarn.executor.memoryOverhead?
 YARN may be killing your executors for using too much off-heap space.  You
 can see whether this is happening by looking in the Spark AM or YARN
 NodeManager logs.

 -Sandy

 On Thu, Aug 20, 2015 at 7:39 AM, Umesh Kacha umesh.ka...@gmail.com
 wrote:

 Hi thanks much for the response. Yes I tried default settings too 0.2 it
 was also going into timeout if it is spending time in GC then why it is not
 throwing GC error I don't see any such error. Yarn logs are not helpful at
 all. What is tungsten how do I use it? Spark is doing great I believe my
 job runs successfully and 60% tasks completes only after first executor
 gets lost things are messing.
 On Aug 20, 2015 7:59 PM, Sandy Ryza sandy.r...@cloudera.com wrote:

 What sounds most likely is that you're hitting heavy garbage
 collection.  Did you hit issues when the shuffle memory fraction was at its
 default of 0.2?  A potential danger with setting the shuffle storage to 0.7
 is that it allows shuffle objects to get into the GC old generation, which
 triggers more stop-the-world garbage collections.

 Have you tried enabling Tungsten / unsafe?

 Unfortunately, Spark is still not that great at dealing with
 heavily-skewed shuffle data, because its reduce-side aggregation still
 operates on Java objects instead of binary data.

 -Sandy

 On Thu, Aug 20, 2015 at 7:21 AM, Umesh Kacha umesh.ka...@gmail.com
 wrote:

 Hi Sandy thanks much for the response. I am using Spark 1.4.1 and I
 have set spark.shuffle.storage as 0.7 as my spark job involves 4 groupby
 queries executed using hiveContext.sql my data set is skewed so will be
 more shuffling I believe I don't know what's wrong spark job runs fine for
 almost an hour and when shuffle read shuffle write column in UI starts to
 show more than 10 gb executor starts to getting lost because of timeout and
 slowly other executor starts getting lost. Please guide.
 On Aug 20, 2015 7:38 PM, Sandy Ryza sandy.r...@cloudera.com wrote:

 What version of Spark are you using?  Have you set any shuffle configs?

 On Wed, Aug 19, 2015 at 11:46 AM, unk1102 umesh.ka...@gmail.com
 wrote:

 I have one Spark job which seems to run fine but after one hour or so
 executor start getting lost because of time out something like the
 following
 error

 cluster.yarnScheduler : Removing an executor 14 65 timeout exceeds
 60 seconds

 and because of above error couple of chained errors starts to come
 like
 FetchFailedException, Rpc client disassociated, Connection reset by
 peer,
 IOException etc

 Please see the following UI page I have noticed when shuffle
 read/write
 starts to increase more than 10 GB executors starts getting lost
 because of
 timeout. How do I clear this stacked memory of 10 GB in shuffle
 read/write
 section I dont cache anything why Spark is not clearing those memory.
 Please
 guide.

 IMG_20150819_231418358.jpg
 
 http://apache-spark-user-list.1001560.n3.nabble.com/file/n24345/IMG_20150819_231418358.jpg
 




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-avoid-executor-time-out-on-yarn-spark-while-dealing-with-large-shuffle-skewed-data-tp24345.html
 Sent from the Apache Spark User List mailing list archive at
 Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org







Re: How to use custom Hadoop InputFormat in DataFrame?

2015-08-10 Thread Umesh Kacha
Hi Michael thanks for the reply. I know that I can create DataFrame using
JavaBean or Struct Type I want to know how can I create DataFrame from
above code which is custom Hadoop format.

On Tue, Aug 11, 2015 at 12:04 AM, Michael Armbrust mich...@databricks.com
wrote:

 You can't create a DataFrame from an arbitrary object since we don't know
 how to figure out the schema.  You can either create a JavaBean
 https://spark.apache.org/docs/latest/sql-programming-guide.html#programmatically-specifying-the-schema
 or manually create a row + specify the schema
 https://spark.apache.org/docs/latest/sql-programming-guide.html#programmatically-specifying-the-schema
 .



 On Mon, Aug 10, 2015 at 11:22 AM, unk1102 umesh.ka...@gmail.com wrote:

 Hi I have my own Hadoop custom InputFormat which I want to use in
 DataFrame.
 How do we do that? I know I can use sc.hadoopFile(..) but then how do I
 convert it into DataFrame

 JavaPairRDDVoid,MyRecordWritable myFormatAsPairRdd =

 jsc.hadoopFile(hdfs://tmp/data/myformat.xyz,MyInputFormat.class,Void.class,MyRecordWritable.class);
 JavaRDDMyRecordWritable myformatRdd =  myFormatAsPairRdd.values();
 DataFrame myFormatAsDataframe =
 sqlContext.createDataFrame(myformatRdd,??);

 In above code what should I put in place of ?? I tried to put
 MyRecordWritable.class but it does not work as it is not schema it is
 Record
 Writable. Please guide.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-custom-Hadoop-InputFormat-in-DataFrame-tp24198.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: How to create DataFrame from a binary file?

2015-08-10 Thread Umesh Kacha
Hi Bo thanks much let me explain please see the following code

JavaPairRDDString,PortableDataStream pairRdd =
javaSparkContext.binaryFiles(/hdfs/path/to/binfile);
JavaRDDPortableDataStream javardd = pairRdd.values();

DataFrame binDataFrame = sqlContext.createDataFrame(javaBinRdd,
 PortableDataStream.class);
binDataFrame.show(); //shows just one row with above file path
/hdfs/path/to/binfile

I want binary data in DataFrame from above file so that I can directly do
analytics on it. My data is binary so I cant use StructType
with primitive data types rigth since everything is binary/byte. My custom
data format in binary is same as Parquet I did not find any good example
where/how parquet is read into DataFrame. Please guide.





On Sun, Aug 9, 2015 at 11:52 PM, bo yang bobyan...@gmail.com wrote:

 Well, my post uses raw text json file to show how to create data frame
 with a custom data schema. The key idea is to show the flexibility to deal
 with any format of data by using your own schema. Sorry if I did not make
 you fully understand.

 Anyway, let us know once you figure out your problem.




 On Sun, Aug 9, 2015 at 11:10 AM, Umesh Kacha umesh.ka...@gmail.com
 wrote:

 Hi Bo I know how to create a DataFrame my question is how to create a
 DataFrame for binary files and in your blog it is raw text json files
 please read my question properly thanks.

 On Sun, Aug 9, 2015 at 11:21 PM, bo yang bobyan...@gmail.com wrote:

 You can create your own data schema (StructType in spark), and use
 following method to create data frame with your own data schema:

 sqlContext.createDataFrame(yourRDD, structType);

 I wrote a post on how to do it. You can also get the sample code there:

 Light-Weight Self-Service Data Query through Spark SQL:

 https://www.linkedin.com/pulse/light-weight-self-service-data-query-through-spark-sql-bo-yang

 Take a look and feel free to  let me know for any question.

 Best,
 Bo



 On Sat, Aug 8, 2015 at 1:42 PM, unk1102 umesh.ka...@gmail.com wrote:

 Hi how do we create DataFrame from a binary file stored in HDFS? I was
 thinking to use

 JavaPairRDDString,PortableDataStream pairRdd =
 javaSparkContext.binaryFiles(/hdfs/path/to/binfile);
 JavaRDDPortableDataStream javardd = pairRdd.values();

 I can see that PortableDataStream has method called toArray which can
 convert into byte array I was thinking if I have JavaRDDbyte[] can I
 call
 the following and get DataFrame

 DataFrame binDataFrame =
 sqlContext.createDataFrame(javaBinRdd,Byte.class);

 Please guide I am new to Spark. I have my own custom format which is
 binary
 format and I was thinking if I can convert my custom format into
 DataFrame
 using binary operations then I dont need to create my own custom Hadoop
 format am I on right track? Will reading binary data into DataFrame
 scale?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-create-DataFrame-from-a-binary-file-tp24179.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org







Re: How to create DataFrame from a binary file?

2015-08-09 Thread Umesh Kacha
Hi Bo I know how to create a DataFrame my question is how to create a
DataFrame for binary files and in your blog it is raw text json files
please read my question properly thanks.

On Sun, Aug 9, 2015 at 11:21 PM, bo yang bobyan...@gmail.com wrote:

 You can create your own data schema (StructType in spark), and use
 following method to create data frame with your own data schema:

 sqlContext.createDataFrame(yourRDD, structType);

 I wrote a post on how to do it. You can also get the sample code there:

 Light-Weight Self-Service Data Query through Spark SQL:

 https://www.linkedin.com/pulse/light-weight-self-service-data-query-through-spark-sql-bo-yang

 Take a look and feel free to  let me know for any question.

 Best,
 Bo



 On Sat, Aug 8, 2015 at 1:42 PM, unk1102 umesh.ka...@gmail.com wrote:

 Hi how do we create DataFrame from a binary file stored in HDFS? I was
 thinking to use

 JavaPairRDDString,PortableDataStream pairRdd =
 javaSparkContext.binaryFiles(/hdfs/path/to/binfile);
 JavaRDDPortableDataStream javardd = pairRdd.values();

 I can see that PortableDataStream has method called toArray which can
 convert into byte array I was thinking if I have JavaRDDbyte[] can I
 call
 the following and get DataFrame

 DataFrame binDataFrame =
 sqlContext.createDataFrame(javaBinRdd,Byte.class);

 Please guide I am new to Spark. I have my own custom format which is
 binary
 format and I was thinking if I can convert my custom format into DataFrame
 using binary operations then I dont need to create my own custom Hadoop
 format am I on right track? Will reading binary data into DataFrame scale?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-create-DataFrame-from-a-binary-file-tp24179.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: How to control Spark Executors from getting Lost when using YARN client mode?

2015-08-03 Thread Umesh Kacha
Hi all any help will be much appreciated my spark job runs fine but in the
middle it starts loosing executors because of netafetchfailed exception
saying shuffle not found at the location since executor is lost
On Jul 31, 2015 11:41 PM, Umesh Kacha umesh.ka...@gmail.com wrote:

 Hi thanks for the response. It looks like YARN container is getting killed
 but dont know why I see shuffle metafetchexception as mentioned in the
 following SO link. I have enough memory 8 nodes 8 cores 30 gig memory each.
 And because of this metafetchexpcetion YARN killing container running
 executor how can it over run memory I tried to give each executor 25 gig
 still it is not sufficient and it fails. Please guide I dont understand
 what is going on I am using Spark 1.4.0 I am using spark.shuffle.memory as
 0.0 and spark.storage.memory as 0.5. I have almost all optimal properties
 like Kyro serializer I have kept 500 akka frame size 20 akka threads dont
 know I am trapped its been two days I am trying to recover from this issue.


 http://stackoverflow.com/questions/29850784/what-are-the-likely-causes-of-org-apache-spark-shuffle-metadatafetchfailedexcept



 On Thu, Jul 30, 2015 at 9:56 PM, Ashwin Giridharan ashwin.fo...@gmail.com
  wrote:

 What is your cluster configuration ( size and resources) ?

 If you do not have enough resources, then your executor will not run.
 Moreover allocating 8 cores to an executor is too much.

 If you have a cluster with four nodes running NodeManagers, each equipped
 with 4 cores and 8GB of memory,
 then an optimal configuration would be,

 --num-executors 8 --executor-cores 2 --executor-memory 2G

 Thanks,
 Ashwin

 On Thu, Jul 30, 2015 at 12:08 PM, unk1102 umesh.ka...@gmail.com wrote:

 Hi I have one Spark job which runs fine locally with less data but when I
 schedule it on YARN to execute I keep on getting the following ERROR and
 slowly all executors gets removed from UI and my job fails

 15/07/30 10:18:13 ERROR cluster.YarnScheduler: Lost executor 8 on
 myhost1.com: remote Rpc client disassociated
 15/07/30 10:18:13 ERROR cluster.YarnScheduler: Lost executor 6 on
 myhost2.com: remote Rpc client disassociated
 I use the following command to schedule spark job in yarn-client mode

  ./spark-submit --class com.xyz.MySpark --conf
 spark.executor.extraJavaOptions=-XX:MaxPermSize=512M
 --driver-java-options
 -XX:MaxPermSize=512m --driver-memory 3g --master yarn-client
 --executor-memory 2G --executor-cores 8 --num-executors 12
 /home/myuser/myspark-1.0.jar

 I dont know what is the problem please guide. I am new to Spark. Thanks
 in
 advance.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-control-Spark-Executors-from-getting-Lost-when-using-YARN-client-mode-tp24084.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




 --
 Thanks  Regards,
 Ashwin Giridharan





Re: How to create Spark DataFrame using custom Hadoop InputFormat?

2015-07-31 Thread Umesh Kacha
Hi thanks Void works I use same custom format in Hive and it works with
Void as key. Please share example if you have to create DataFrame using
custom Hadoop format.
On Aug 1, 2015 2:07 AM, Ted Yu yuzhih...@gmail.com wrote:

 I don't think using Void class is the right choice - it is not even a
 Writable.

 BTW in the future, capture text output instead of image.

 Thanks

 On Fri, Jul 31, 2015 at 12:35 PM, Umesh Kacha umesh.ka...@gmail.com
 wrote:

 Hi Ted thanks My key is always Void because my custom format file is non
 splittable so key is Void and values is  MyRecordWritable which extends
 Hadoop Writable. I am sharing my log as snap please dont mind as I cant
 paste code outside.

 Regards,
 Umesh

 On Sat, Aug 1, 2015 at 12:59 AM, Ted Yu yuzhih...@gmail.com wrote:

 Looking closer at the code you posted, the error likely was caused by
 the 3rd parameter: Void.class

 It is supposed to be the class of key.

 FYI

 On Fri, Jul 31, 2015 at 11:24 AM, unk1102 umesh.ka...@gmail.com wrote:

 Hi I am having my own Hadoop custom InputFormat which I need to use in
 creating DataFrame. I tried to do the following

 JavaPairRDDVoid,MyRecordWritable myFormatAsPairRdd =

 jsc.hadoopFile(hdfs://tmp/data/myformat.xyz,MyInputFormat.class,Void.class,MyRecordWritable.class);
 JavaRDDMyRecordWritable myformatRdd =  myFormatAsPairRdd.values();
 DataFrame myFormatAsDataframe =
 sqlContext.createDataFrame(myformatRdd,MyFormatSchema.class);
 myFormatAsDataframe.show();

 Above code does not work and throws exception saying
 java.lang.IllegalArgumentException object is not an instance of
 declaring
 class

 My custom Hadoop InputFormat works very well with Hive,MapReduce etc
 How do
 I make it work with Spark please guide I am new to Spark. Thank in
 advance.




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-create-Spark-DataFrame-using-custom-Hadoop-InputFormat-tp24101.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org







Re: How to control Spark Executors from getting Lost when using YARN client mode?

2015-07-31 Thread Umesh Kacha
Hi thanks for the response. It looks like YARN container is getting killed
but dont know why I see shuffle metafetchexception as mentioned in the
following SO link. I have enough memory 8 nodes 8 cores 30 gig memory each.
And because of this metafetchexpcetion YARN killing container running
executor how can it over run memory I tried to give each executor 25 gig
still it is not sufficient and it fails. Please guide I dont understand
what is going on I am using Spark 1.4.0 I am using spark.shuffle.memory as
0.0 and spark.storage.memory as 0.5. I have almost all optimal properties
like Kyro serializer I have kept 500 akka frame size 20 akka threads dont
know I am trapped its been two days I am trying to recover from this issue.

http://stackoverflow.com/questions/29850784/what-are-the-likely-causes-of-org-apache-spark-shuffle-metadatafetchfailedexcept



On Thu, Jul 30, 2015 at 9:56 PM, Ashwin Giridharan ashwin.fo...@gmail.com
wrote:

 What is your cluster configuration ( size and resources) ?

 If you do not have enough resources, then your executor will not run.
 Moreover allocating 8 cores to an executor is too much.

 If you have a cluster with four nodes running NodeManagers, each equipped
 with 4 cores and 8GB of memory,
 then an optimal configuration would be,

 --num-executors 8 --executor-cores 2 --executor-memory 2G

 Thanks,
 Ashwin

 On Thu, Jul 30, 2015 at 12:08 PM, unk1102 umesh.ka...@gmail.com wrote:

 Hi I have one Spark job which runs fine locally with less data but when I
 schedule it on YARN to execute I keep on getting the following ERROR and
 slowly all executors gets removed from UI and my job fails

 15/07/30 10:18:13 ERROR cluster.YarnScheduler: Lost executor 8 on
 myhost1.com: remote Rpc client disassociated
 15/07/30 10:18:13 ERROR cluster.YarnScheduler: Lost executor 6 on
 myhost2.com: remote Rpc client disassociated
 I use the following command to schedule spark job in yarn-client mode

  ./spark-submit --class com.xyz.MySpark --conf
 spark.executor.extraJavaOptions=-XX:MaxPermSize=512M
 --driver-java-options
 -XX:MaxPermSize=512m --driver-memory 3g --master yarn-client
 --executor-memory 2G --executor-cores 8 --num-executors 12
 /home/myuser/myspark-1.0.jar

 I dont know what is the problem please guide. I am new to Spark. Thanks in
 advance.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-control-Spark-Executors-from-getting-Lost-when-using-YARN-client-mode-tp24084.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




 --
 Thanks  Regards,
 Ashwin Giridharan



Re: How to create Spark DataFrame using custom Hadoop InputFormat?

2015-07-31 Thread Umesh Kacha
Hi Ted thanks much for the reply. I cant share code on public forum. I have
created custom format by extending Hadoop mapred InputFormat class and same
way RecordReader class. If you can help me how do I use the same in
DataFrame it would be very helpful.

On Sat, Aug 1, 2015 at 12:12 AM, Ted Yu yuzhih...@gmail.com wrote:

 Can you pastebin the complete stack trace ?

 If you can show skeleton of MyInputFormat and MyRecordWritable, that
 would provide additional information as well.

 Cheers

 On Fri, Jul 31, 2015 at 11:24 AM, unk1102 umesh.ka...@gmail.com wrote:

 Hi I am having my own Hadoop custom InputFormat which I need to use in
 creating DataFrame. I tried to do the following

 JavaPairRDDVoid,MyRecordWritable myFormatAsPairRdd =

 jsc.hadoopFile(hdfs://tmp/data/myformat.xyz,MyInputFormat.class,Void.class,MyRecordWritable.class);
 JavaRDDMyRecordWritable myformatRdd =  myFormatAsPairRdd.values();
 DataFrame myFormatAsDataframe =
 sqlContext.createDataFrame(myformatRdd,MyFormatSchema.class);
 myFormatAsDataframe.show();

 Above code does not work and throws exception saying
 java.lang.IllegalArgumentException object is not an instance of declaring
 class

 My custom Hadoop InputFormat works very well with Hive,MapReduce etc How
 do
 I make it work with Spark please guide I am new to Spark. Thank in
 advance.




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-create-Spark-DataFrame-using-custom-Hadoop-InputFormat-tp24101.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Spark Streaming Kafka could not find leader offset for Set()

2015-07-30 Thread Umesh Kacha
Hi Cody sorry my bad you were right there was a typo in topicSet. When I
corrected typo in topicSet it started working. Thanks a lot.

Regards

On Thu, Jul 30, 2015 at 7:43 PM, Cody Koeninger c...@koeninger.org wrote:

 Can you post the code including the values of kafkaParams and topicSet,
 ideally the relevant output of kafka-topics.sh --describe as well

 On Wed, Jul 29, 2015 at 11:39 PM, Umesh Kacha umesh.ka...@gmail.com
 wrote:

 Hi thanks for the response. Like I already mentioned in the question
 kafka topic is valid and it has data I can see data in it using another
 kafka consumer.
 On Jul 30, 2015 7:31 AM, Cody Koeninger c...@koeninger.org wrote:

 The last time someone brought this up on the mailing list, the issue
 actually was that the topic(s) didn't exist in Kafka at the time the spark
 job was running.





 On Wed, Jul 29, 2015 at 6:17 PM, Tathagata Das t...@databricks.com
 wrote:

 There is a known issue that Kafka cannot return leader if there is not
 data in the topic. I think it was raised in another thread in this forum.
 Is that the issue?

 On Wed, Jul 29, 2015 at 10:38 AM, unk1102 umesh.ka...@gmail.com
 wrote:

 Hi I have Spark Streaming code which streams from Kafka topic it used
 to work
 fine but suddenly it started throwing the following exception

 Exception in thread main org.apache.spark.SparkException:
 org.apache.spark.SparkException: Couldn't find leader offsets for Set()
 at

 org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$createDirectStream$2.apply(KafkaUtils.scala:413)
 at

 org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$createDirectStream$2.apply(KafkaUtils.scala:413)
 at scala.util.Either.fold(Either.scala:97)
 at

 org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:412)
 at

 org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:528)
 at

 org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala)
 My Spark Streaming client code is very simple I just create one
 receiver
 using the following code and trying to print messages it consumed

 JavaPairInputDStreamString, String messages =
 KafkaUtils.createDirectStream(jssc,
 String.class,
 String.class,
 StringDecoder.class,
 StringDecoder.class,
 kafkaParams,
 topicSet);

 Kafka param is only one I specify kafka.ofset.reset=largest. Kafka
 topic has
 data I can see data using other Kafka consumers but above Spark
 Streaming
 code throws exception saying leader offset not found. I tried both
 smallest
 and largest offset. I wonder what happened this code used to work
 earlier. I
 am using Spark-Streaming 1.3.1 as it was working in this version I
 tried in
 1.4.1 and same exception. Please guide. I am new to Spark thanks in
 advance.




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Kafka-could-not-find-leader-offset-for-Set-tp24066.html
 Sent from the Apache Spark User List mailing list archive at
 Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org







Re: Spark Streaming Kafka could not find leader offset for Set()

2015-07-29 Thread Umesh Kacha
Hi thanks for the response. Like I already mentioned in the question kafka
topic is valid and it has data I can see data in it using another kafka
consumer.
On Jul 30, 2015 7:31 AM, Cody Koeninger c...@koeninger.org wrote:

 The last time someone brought this up on the mailing list, the issue
 actually was that the topic(s) didn't exist in Kafka at the time the spark
 job was running.





 On Wed, Jul 29, 2015 at 6:17 PM, Tathagata Das t...@databricks.com
 wrote:

 There is a known issue that Kafka cannot return leader if there is not
 data in the topic. I think it was raised in another thread in this forum.
 Is that the issue?

 On Wed, Jul 29, 2015 at 10:38 AM, unk1102 umesh.ka...@gmail.com wrote:

 Hi I have Spark Streaming code which streams from Kafka topic it used to
 work
 fine but suddenly it started throwing the following exception

 Exception in thread main org.apache.spark.SparkException:
 org.apache.spark.SparkException: Couldn't find leader offsets for Set()
 at

 org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$createDirectStream$2.apply(KafkaUtils.scala:413)
 at

 org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$createDirectStream$2.apply(KafkaUtils.scala:413)
 at scala.util.Either.fold(Either.scala:97)
 at

 org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:412)
 at

 org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:528)
 at

 org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala)
 My Spark Streaming client code is very simple I just create one receiver
 using the following code and trying to print messages it consumed

 JavaPairInputDStreamString, String messages =
 KafkaUtils.createDirectStream(jssc,
 String.class,
 String.class,
 StringDecoder.class,
 StringDecoder.class,
 kafkaParams,
 topicSet);

 Kafka param is only one I specify kafka.ofset.reset=largest. Kafka topic
 has
 data I can see data using other Kafka consumers but above Spark Streaming
 code throws exception saying leader offset not found. I tried both
 smallest
 and largest offset. I wonder what happened this code used to work
 earlier. I
 am using Spark-Streaming 1.3.1 as it was working in this version I tried
 in
 1.4.1 and same exception. Please guide. I am new to Spark thanks in
 advance.




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Kafka-could-not-find-leader-offset-for-Set-tp24066.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






Re: How do we control output part files created by Spark job?

2015-07-11 Thread Umesh Kacha
Hi Sriknath thanks much it worked when I set spark.sql.shuffle.partitions=10
I think reducing shuffle partitions will slower my group by query of
hiveContext or it wont slow it down please guide.

On Sat, Jul 11, 2015 at 7:41 AM, Srikanth srikanth...@gmail.com wrote:

 Is there a join involved in your sql?
 Have a look at spark.sql.shuffle.partitions?

 Srikanth

 On Wed, Jul 8, 2015 at 1:29 AM, Umesh Kacha umesh.ka...@gmail.com wrote:

 Hi Srikant thanks for the response. I have the following code:

 hiveContext.sql(insert into... ).coalesce(6)

 Above code does not create 6 part files it creates around 200 small
 files.

 Please guide. Thanks.
 On Jul 8, 2015 4:07 AM, Srikanth srikanth...@gmail.com wrote:

 Did you do

 yourRdd.coalesce(6).saveAsTextFile()

 or

 yourRdd.coalesce(6)
 yourRdd.saveAsTextFile()
 ?

 Srikanth

 On Tue, Jul 7, 2015 at 12:59 PM, Umesh Kacha umesh.ka...@gmail.com
 wrote:

 Hi I tried both approach using df. repartition(6) and df.coalesce(6) it
 doesn't reduce part-x files. Even after calling above method I still
 see around 200 small part files of size 20 mb each which is again orc 
 files.


 On Tue, Jul 7, 2015 at 12:52 AM, Sathish Kumaran Vairavelu 
 vsathishkuma...@gmail.com wrote:

 Try coalesce function to limit no of part files
 On Mon, Jul 6, 2015 at 1:23 PM kachau umesh.ka...@gmail.com wrote:

 Hi I am having couple of Spark jobs which processes thousands of
 files every
 day. File size may very from MBs to GBs. After finishing job I
 usually save
 using the following code

 finalJavaRDD.saveAsParquetFile(/path/in/hdfs); OR
 dataFrame.write.format(orc).save(/path/in/hdfs) //storing as ORC
 file as
 of Spark 1.4

 Spark job creates plenty of small part files in final output
 directory. As
 far as I understand Spark creates part file for each partition/task
 please
 correct me if I am wrong. How do we control amount of part files Spark
 creates? Finally I would like to create Hive table using these
 parquet/orc
 directory and I heard Hive is slow when we have large no of small
 files.
 Please guide I am new to Spark. Thanks in advance.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-do-we-control-output-part-files-created-by-Spark-job-tp23649.html
 Sent from the Apache Spark User List mailing list archive at
 Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org







Re: How do we control output part files created by Spark job?

2015-07-07 Thread Umesh Kacha
Hi I tried both approach using df. repartition(6) and df.coalesce(6) it
doesn't reduce part-x files. Even after calling above method I still
see around 200 small part files of size 20 mb each which is again orc files.

On Tue, Jul 7, 2015 at 12:52 AM, Sathish Kumaran Vairavelu 
vsathishkuma...@gmail.com wrote:

 Try coalesce function to limit no of part files
 On Mon, Jul 6, 2015 at 1:23 PM kachau umesh.ka...@gmail.com wrote:

 Hi I am having couple of Spark jobs which processes thousands of files
 every
 day. File size may very from MBs to GBs. After finishing job I usually
 save
 using the following code

 finalJavaRDD.saveAsParquetFile(/path/in/hdfs); OR
 dataFrame.write.format(orc).save(/path/in/hdfs) //storing as ORC file
 as
 of Spark 1.4

 Spark job creates plenty of small part files in final output directory. As
 far as I understand Spark creates part file for each partition/task please
 correct me if I am wrong. How do we control amount of part files Spark
 creates? Finally I would like to create Hive table using these parquet/orc
 directory and I heard Hive is slow when we have large no of small files.
 Please guide I am new to Spark. Thanks in advance.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-do-we-control-output-part-files-created-by-Spark-job-tp23649.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: How do we control output part files created by Spark job?

2015-07-07 Thread Umesh Kacha
Hi Srikant thanks for the response. I have the following code:

hiveContext.sql(insert into... ).coalesce(6)

Above code does not create 6 part files it creates around 200 small files.

Please guide. Thanks.
On Jul 8, 2015 4:07 AM, Srikanth srikanth...@gmail.com wrote:

 Did you do

 yourRdd.coalesce(6).saveAsTextFile()

 or

 yourRdd.coalesce(6)
 yourRdd.saveAsTextFile()
 ?

 Srikanth

 On Tue, Jul 7, 2015 at 12:59 PM, Umesh Kacha umesh.ka...@gmail.com
 wrote:

 Hi I tried both approach using df. repartition(6) and df.coalesce(6) it
 doesn't reduce part-x files. Even after calling above method I still
 see around 200 small part files of size 20 mb each which is again orc files.


 On Tue, Jul 7, 2015 at 12:52 AM, Sathish Kumaran Vairavelu 
 vsathishkuma...@gmail.com wrote:

 Try coalesce function to limit no of part files
 On Mon, Jul 6, 2015 at 1:23 PM kachau umesh.ka...@gmail.com wrote:

 Hi I am having couple of Spark jobs which processes thousands of files
 every
 day. File size may very from MBs to GBs. After finishing job I usually
 save
 using the following code

 finalJavaRDD.saveAsParquetFile(/path/in/hdfs); OR
 dataFrame.write.format(orc).save(/path/in/hdfs) //storing as ORC
 file as
 of Spark 1.4

 Spark job creates plenty of small part files in final output directory.
 As
 far as I understand Spark creates part file for each partition/task
 please
 correct me if I am wrong. How do we control amount of part files Spark
 creates? Finally I would like to create Hive table using these
 parquet/orc
 directory and I heard Hive is slow when we have large no of small files.
 Please guide I am new to Spark. Thanks in advance.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-do-we-control-output-part-files-created-by-Spark-job-tp23649.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org