Re: How to make new composite columns by combining rows in the same group?

2016-08-26 Thread Xinh Huynh
That looks like a pivot table. Have you looked into using the pivot table 
method with DataFrames?

Xinh

> On Aug 26, 2016, at 4:54 AM, Rex X  wrote:
> 
> 1. Given following CSV file
> $cat data.csv
> 
> ID,City,Zip,Price,Rating
> 1,A,95123,100,0
> 1,B,95124,102,1
> 1,A,95126,100,1
> 2,B,95123,200,0
> 2,B,95124,201,1
> 2,C,95124,203,0
> 3,A,95126,300,1
> 3,C,95124,280,0
> 4,C,95124,400,1
> 
> We want to group by ID, and make new composite columns of Price and Rating 
> based on the value of $City-$Zip. 
> 
> 
> 2. The Expected Result:
> 
> IDA_95123_PriceA_95123_Rating A_95126_Price   A_95126_Rating  
> B_95123_Price   B_95123_Rating  B_95124_Price   B_95124_Rating  C_95124_Price 
>   C_95124_Rating
> 1 100 1   100 2   0   0   102 2   0   > 0
> 2 0   0   0   0   200 1   201 2   203 
> 1
> 3 0   0   300 2   0   0   0   0   280 
> 1
> 4 0   0   0   0   0   0   0   0   400 
> 2
> 
> Any tips would be greatly appreciated!
> 
> Thank you.
> 
> Regards,
> Rex
> 


Re: How to filter based on a constant value

2016-07-30 Thread Xinh Huynh
Hi Mitch,

I think you were missing a step:
[your result] maxdate: org.apache.spark.sql.Row = [2015-12-15]
Since maxdate is of type Row, you would want to extract the first column of
the Row with:

>> val maxdateStr = maxdate.getString(0)

assuming the column type is String.
API doc is here:
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Row

Then you can do the query:

>> col("transactiondate") === maxdateStr

Xinh

On Sat, Jul 30, 2016 at 5:20 PM, ayan guha  wrote:

> select *
> from (select *,
>  rank() over (order by transactiondate) r
>from ll_18740868 where transactiondescription='XYZ'
>   ) inner
> where r=1
>
> Hi Mitch,
>
> If using SQL is fine, you can try the code above. You need to register
> ll_18740868  as temp table.
>
> On Sun, Jul 31, 2016 at 6:49 AM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>>
>> Hi,
>>
>> I would like to find out when it was the last time I paid a company with
>> Debit Card
>>
>>
>> This is the way I do it.
>>
>> 1) Find the date when I paid last
>> 2) Find the rest of details from the row(s)
>>
>> So
>>
>> var HASHTAG = "XYZ"
>> scala> var maxdate =
>> ll_18740868.filter(col("transactiondescription").contains(HASHTAG)).agg(max("transactiondate")).collect.apply(0)
>> maxdate: org.apache.spark.sql.Row = [2015-12-15]
>>
>> OK so it was 2015-12-15
>>
>>
>> Now I want to get the rest of the columns. This one works when I hard
>> code the maxdate!
>>
>>
>> scala> ll_18740868.filter(col("transactiondescription").contains(HASHTAG)
>> && col("transactiondate") === "2015-12-15").select("transactiondate",
>> "transactiondescription", "debitamount").show
>> +---+--+---+
>> |transactiondate|transactiondescription|debitamount|
>> +---+--+---+
>> | 2015-12-15|  XYZ LTD CD 4636 |  10.95|
>> +---+--+---+
>>
>> Now if I want to use the var maxdate in place of "2015-12-15", how would
>> I do that?
>>
>> I tried lit(maxdate) etc but they are all giving me error?
>>
>> java.lang.RuntimeException: Unsupported literal type class
>> org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
>> [2015-12-15]
>>
>>
>> Thanks
>>
>
>
>
> --
> Best Regards,
> Ayan Guha
>


Re: DataFrame Min By Column

2016-07-08 Thread Xinh Huynh
Hi Pedro,

I could not think of a way using an aggregate. It's possible with a window
function, partitioned on user and ordered by time:

// Assuming "df" holds your dataframe ...

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
val wSpec = Window.partitionBy("user").orderBy("time")
df.select($"user", $"time", rank().over(wSpec).as("rank"))
  .where($"rank" === 1)

Xinh

On Fri, Jul 8, 2016 at 12:57 PM, Pedro Rodriguez 
wrote:

> Is there a way to on a GroupedData (from groupBy in DataFrame) to have an
> aggregate that returns column A based on a min of column B? For example, I
> have a list of sites visited by a given user and I would like to find the
> event with the minimum time (first event)
>
> Thanks,
> --
> Pedro Rodriguez
> PhD Student in Distributed Machine Learning | CU Boulder
> UC Berkeley AMPLab Alumni
>
> ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
> Github: github.com/EntilZha | LinkedIn:
> https://www.linkedin.com/in/pedrorodriguezscience
>
>


Re: Using R code as part of a Spark Application

2016-06-29 Thread Xinh Huynh
It looks like it. "DataFrame UDFs in R" is resolved in Spark 2.0:
https://issues.apache.org/jira/browse/SPARK-6817

Here's some of the code:
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/r/MapPartitionsRWrapper.scala

/**
* A function wrapper that applies the given R function to each partition.
*/
private[sql] case class MapPartitionsRWrapper(
func: Array[Byte],
packageNames: Array[Byte],
broadcastVars: Array[Broadcast[Object]],
inputSchema: StructType,
outputSchema: StructType) extends (Iterator[Any] => Iterator[Any])

Xinh

On Wed, Jun 29, 2016 at 2:59 PM, Sean Owen <so...@cloudera.com> wrote:

> Here we (or certainly I) am not talking about R Server, but plain vanilla
> R, as used with Spark and SparkR. Currently, SparkR doesn't distribute R
> code at all (it used to, sort of), so I'm wondering if that is changing
> back.
>
> On Wed, Jun 29, 2016 at 10:53 PM, John Aherne <john.ahe...@justenough.com>
> wrote:
>
>> I don't think R server requires R on the executor nodes. I originally set
>> up a SparkR cluster for our Data Scientist on Azure which required that I
>> install R on each node, but for the R Server set up, there is an extra edge
>> node with R server that they connect to. From what little research I was
>> able to do, it seems that there are some special functions in R Server that
>> can distribute the work to the cluster.
>>
>> Documentation is light, and hard to find but I found this helpful:
>>
>> https://blogs.msdn.microsoft.com/uk_faculty_connection/2016/05/10/r-server-for-hdinsight-running-on-microsoft-azure-cloud-data-science-challenges/
>>
>>
>>
>> On Wed, Jun 29, 2016 at 3:29 PM, Sean Owen <so...@cloudera.com> wrote:
>>
>>> Oh, interesting: does this really mean the return of distributing R
>>> code from driver to executors and running it remotely, or do I
>>> misunderstand? this would require having R on the executor nodes like
>>> it used to?
>>>
>>> On Wed, Jun 29, 2016 at 5:53 PM, Xinh Huynh <xinh.hu...@gmail.com>
>>> wrote:
>>> > There is some new SparkR functionality coming in Spark 2.0, such as
>>> > "dapply". You could use SparkR to load a Parquet file and then run
>>> "dapply"
>>> > to apply a function to each partition of a DataFrame.
>>> >
>>> > Info about loading Parquet file:
>>> >
>>> http://people.apache.org/~pwendell/spark-releases/spark-2.0.0-rc1-docs/sparkr.html#from-data-sources
>>> >
>>> > API doc for "dapply":
>>> >
>>> http://people.apache.org/~pwendell/spark-releases/spark-2.0.0-rc1-docs/api/R/index.html
>>> >
>>> > Xinh
>>> >
>>> > On Wed, Jun 29, 2016 at 6:54 AM, sujeet jog <sujeet@gmail.com>
>>> wrote:
>>> >>
>>> >> try Spark pipeRDD's , you can invoke the R script from pipe , push
>>> the
>>> >> stuff you want to do on the Rscript stdin,  p
>>> >>
>>> >>
>>> >> On Wed, Jun 29, 2016 at 7:10 PM, Gilad Landau <
>>> gilad.lan...@clicktale.com>
>>> >> wrote:
>>> >>>
>>> >>> Hello,
>>> >>>
>>> >>>
>>> >>>
>>> >>> I want to use R code as part of spark application (the same way I
>>> would
>>> >>> do with Scala/Python).  I want to be able to run an R syntax as a map
>>> >>> function on a big Spark dataframe loaded from a parquet file.
>>> >>>
>>> >>> Is this even possible or the only way to use R is as part of RStudio
>>> >>> orchestration of our Spark  cluster?
>>> >>>
>>> >>>
>>> >>>
>>> >>> Thanks for the help!
>>> >>>
>>> >>>
>>> >>>
>>> >>> Gilad
>>> >>>
>>> >>>
>>> >>
>>> >>
>>> >
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>>
>>
>> --
>>
>> John Aherne
>> Big Data and SQL Developer
>>
>> [image: JustEnough Logo]
>>
>> Cell:
>> Email:
>> Skype:
>> Web:
>>
>> +1 (303) 809-9718
>> john.ahe...@justenough.com
>> john.aherne.je
>> www.justenough.com
>>
>>
>> Confidentiality Note: The information contained in this email and 
>> document(s) attached are for the exclusive use of the addressee and may 
>> contain confidential, privileged and non-disclosable information. If the 
>> recipient of this email is not the addressee, such recipient is strictly 
>> prohibited from reading, photocopying, distribution or otherwise using this 
>> email or its contents in any way.
>>
>>
>


Re: Using R code as part of a Spark Application

2016-06-29 Thread Xinh Huynh
There is some new SparkR functionality coming in Spark 2.0, such as
"dapply". You could use SparkR to load a Parquet file and then run "dapply"
to apply a function to each partition of a DataFrame.

Info about loading Parquet file:
http://people.apache.org/~pwendell/spark-releases/spark-2.0.0-rc1-docs/sparkr.html#from-data-sources

API doc for "dapply":
http://people.apache.org/~pwendell/spark-releases/spark-2.0.0-rc1-docs/api/R/index.html

Xinh

On Wed, Jun 29, 2016 at 6:54 AM, sujeet jog  wrote:

> try Spark pipeRDD's , you can invoke the R script from pipe , push  the
> stuff you want to do on the Rscript stdin,  p
>
>
> On Wed, Jun 29, 2016 at 7:10 PM, Gilad Landau 
> wrote:
>
>> Hello,
>>
>>
>>
>> I want to use R code as part of spark application (the same way I would
>> do with Scala/Python).  I want to be able to run an R syntax as a map
>> function on a big Spark dataframe loaded from a parquet file.
>>
>> Is this even possible or the only way to use R is as part of RStudio
>> orchestration of our Spark  cluster?
>>
>>
>>
>> Thanks for the help!
>>
>>
>>
>> Gilad
>>
>>
>>
>
>


Re: Best way to tranform string label into long label for classification problem

2016-06-28 Thread Xinh Huynh
Hi Jao,

Here's one option:
http://spark.apache.org/docs/latest/ml-features.html#stringindexer
"StringIndexer encodes a string column of labels to a column of label
indices. The indices are in [0, numLabels), ordered by label frequencies."

Xinh

On Tue, Jun 28, 2016 at 12:29 AM, Jaonary Rabarisoa 
wrote:

> Dear all,
>
> I'm trying to a find a way to transform a DataFrame into a data that is
> more suitable for third party classification algorithm. The DataFrame have
> two columns : "feature" represented by a vector and "label" represented by
> a string. I want the "label" to be a number between [0, number of classes -
> 1].
> Do you have any ideas to do it efficiently ?
>
>  Cheers,
>
> Jao
>


Re: What is the explanation of "ConvertToUnsafe" in "Physical Plan"

2016-06-27 Thread Xinh Huynh
I guess it has to do with the Tungsten explicit memory management that
builds on sun.misc.Unsafe. The "ConvertToUnsafe" class converts
Java-object-based rows into UnsafeRows, which has the Spark internal
memory-efficient format.

Here is the related code in 1.6:

ConvertToUnsafe is defined in:
https://github.com/apache/spark/blob/branch-1.6/sql/core/src/main/scala/org/apache/spark/sql/execution/rowFormatConverters.scala

/**
* Converts Java-object-based rows into [[UnsafeRow]]s.
*/
case class ConvertToUnsafe(child: SparkPlan) extends UnaryNode

And, UnsafeRow is defined in:
https://github.com/apache/spark/blob/branch-1.6/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java

/**
* An Unsafe implementation of Row which is backed by raw memory instead of
Java objects.
*
* Each tuple has three parts: [null bit set] [values] [variable length
portion]
*
* The bit set is used for null tracking and is aligned to 8-byte word
boundaries. It stores
* one bit per field.
*
* In the `values` region, we store one 8-byte word per field. For fields
that hold fixed-length
* primitive types, such as long, double, or int, we store the value
directly in the word. For
* fields with non-primitive or variable-length values, we store a relative
offset (w.r.t. the
* base address of the row) that points to the beginning of the
variable-length field, and length
* (they are combined into a long).
*
* Instances of `UnsafeRow` act as pointers to row data stored in this
format.
*/
public final class UnsafeRow extends MutableRow implements Externalizable,
KryoSerializable

Xinh

On Sun, Jun 26, 2016 at 1:11 PM, Mich Talebzadeh 
wrote:

>
> Hi,
>
> In Spark's Physical Plan what is the explanation for ConvertToUnsafe?
>
> Example:
>
> scala> sorted.filter($"prod_id" ===13).explain
> == Physical Plan ==
> Filter (prod_id#10L = 13)
> +- Sort [prod_id#10L ASC,cust_id#11L ASC,time_id#12 ASC,channel_id#13L
> ASC,promo_id#14L ASC], true, 0
>+- ConvertToUnsafe
>   +- Exchange rangepartitioning(prod_id#10L ASC,cust_id#11L
> ASC,time_id#12 ASC,channel_id#13L ASC,promo_id#14L ASC,200), None
>  +- HiveTableScan
> [prod_id#10L,cust_id#11L,time_id#12,channel_id#13L,promo_id#14L],
> MetastoreRelation oraclehadoop, sales2, None
>
>
> My inclination is that it is a temporary construct like tempTable created
> as part of Physical Plan?
>
>
> Thanks
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>


Re: DataFrame versus Dataset creation and usage

2016-06-24 Thread Xinh Huynh
Hi Martin,

Since your schema is dynamic, how would you use Datasets? Would you know
ahead of time the row type T in a Dataset[T]?

One option is to start with DataFrames in the beginning of your data
pipeline, figure out the field types, and then switch completely over to
RDDs or Dataset in the next stage of the pipeline.

Also, I'm not sure what the custom Java mappers are doing - could you use
them as UDFs within a DataFrame?

Xinh

On Fri, Jun 24, 2016 at 11:42 AM, Martin Serrano  wrote:

> Indeed.  But I'm dealing with 1.6 for now unfortunately.
>
>
> On 06/24/2016 02:30 PM, Ted Yu wrote:
>
> In Spark 2.0, Dataset and DataFrame are unified.
>
> Would this simplify your use case ?
>
> On Fri, Jun 24, 2016 at 7:27 AM, Martin Serrano 
> wrote:
>
>> Hi,
>>
>> I'm exposing a custom source to the Spark environment.  I have a question
>> about the best way to approach this problem.
>>
>> I created a custom relation for my source and it creates a
>> DataFrame.  My custom source knows the data types which are
>> *dynamic* so this seemed to be the appropriate return type.  This works
>> fine.
>>
>> The next step I want to take is to expose some custom mapping functions
>> (written in Java).  But when I look at the APIs, the map method for
>> DataFrame returns an RDD (not a DataFrame).  (Should I use
>> SqlContext.createDataFrame on the result? -- does this result in additional
>> processing overhead?)  The Dataset type seems to be more of what I'd be
>> looking for, it's map method returns the Dataset type.  So chaining them
>> together is a natural exercise.
>>
>> But to create the Dataset from a DataFrame, it appears that I have to
>> provide the types of each field in the Row in the DataFrame.as[...]
>> method.  I would think that the DataFrame would be able to do this
>> automatically since it has all the types already.
>>
>> This leads me to wonder how I should be approaching this effort.  As all
>> the fields and types are dynamic, I cannot use beans as my type when
>> passing data around.  Any advice would be appreciated.
>>
>> Thanks,
>> Martin
>>
>>
>>
>>
>
>


Re: Confusing argument of sql.functions.count

2016-06-22 Thread Xinh Huynh
I can see how the linked documentation could be confusing:
"Aggregate function: returns the number of items in a group."

What it doesn't mention is that it returns the number of rows for which the
given column is non-null.

Xinh

On Wed, Jun 22, 2016 at 9:31 AM, Takeshi Yamamuro 
wrote:

> Hi,
>
> An argument for `functions.count` is needed for per-column counting;
> df.groupBy($"a").agg(count($"b"))
>
> // maropu
>
> On Thu, Jun 23, 2016 at 1:27 AM, Ted Yu  wrote:
>
>> See the first example in:
>>
>> http://www.w3schools.com/sql/sql_func_count.asp
>>
>> On Wed, Jun 22, 2016 at 9:21 AM, Jakub Dubovsky <
>> spark.dubovsky.ja...@gmail.com> wrote:
>>
>>> Hey Ted,
>>>
>>> thanks for reacting.
>>>
>>> I am refering to both of them. They both take column as parameter
>>> regardless of its type. Intuition here is that count should take no
>>> parameter. Or am I missing something?
>>>
>>> Jakub
>>>
>>> On Wed, Jun 22, 2016 at 6:19 PM, Ted Yu  wrote:
>>>
 Are you referring to the following method in
 sql/core/src/main/scala/org/apache/spark/sql/functions.scala :

   def count(e: Column): Column = withAggregateFunction {

 Did you notice this method ?

   def count(columnName: String): TypedColumn[Any, Long] =

 On Wed, Jun 22, 2016 at 9:06 AM, Jakub Dubovsky <
 spark.dubovsky.ja...@gmail.com> wrote:

> Hey sparkers,
>
> an aggregate function *count* in *org.apache.spark.sql.functions*
> package takes a *column* as an argument. Is this needed for
> something? I find it confusing that I need to supply a column there. It
> feels like it might be distinct count or something. This can be seen in 
> latest
> documentation
> 
> .
>
> I am considering filling this in spark bug tracker. Any opinions on
> this?
>
> Thanks
>
> Jakub
>
>

>>>
>>
>
>
> --
> ---
> Takeshi Yamamuro
>


Re: Dataset Select Function after Aggregate Error

2016-06-17 Thread Xinh Huynh
Hi Pedro,

In 1.6.1, you can do:
>> ds.groupBy(_.uid).count().map(_._1)
or
>> ds.groupBy(_.uid).count().select($"value".as[String])

It doesn't have the exact same syntax as for DataFrame.
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Dataset

It might be different in 2.0.

Xinh

On Fri, Jun 17, 2016 at 3:33 PM, Pedro Rodriguez 
wrote:

> Hi All,
>
> I am working on using Datasets in 1.6.1 and eventually 2.0 when its
> released.
>
> I am running the aggregate code below where I have a dataset where the row
> has a field uid:
>
> ds.groupBy(_.uid).count()
> // res0: org.apache.spark.sql.Dataset[(String, Long)] = [_1: string, _2:
> bigint]
>
> This works as expected, however, attempts to run select statements after
> fails:
> ds.groupBy(_.uid).count().select(_._1)
> // error: missing parameter type for expanded function ((x$2) => x$2._1)
> ds.groupBy(_.uid).count().select(_._1)
>
> I have tried several variants, but nothing seems to work. Below is the
> equivalent Dataframe code which works as expected:
> df.groupBy("uid").count().select("uid")
>
> Thanks!
> --
> Pedro Rodriguez
> PhD Student in Distributed Machine Learning | CU Boulder
> UC Berkeley AMPLab Alumni
>
> ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
> Github: github.com/EntilZha | LinkedIn:
> https://www.linkedin.com/in/pedrorodriguezscience
>
>


Re: Spark 2.0: Unify DataFrames and Datasets question

2016-06-14 Thread Xinh Huynh
Hi Arun,

This documentation may be helpful:

The 2.0-preview Scala doc for Dataset class:
http://spark.apache.org/docs/2.0.0-preview/api/scala/index.html#org.apache.spark.sql.Dataset
Note that the Dataset API has completely changed from 1.6.

In 2.0, there is no separate DataFrame class. Rather, it is a type alias
defined here:
http://spark.apache.org/docs/2.0.0-preview/api/scala/index.html#org.apache.spark.sql.package@DataFrame=org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]
"type DataFrame = Dataset

[Row

]"
Unlike in 1.6, a DataFrame is a specific Dataset[T], where T=Row, so
DataFrame shares the same methods as Dataset.

As mentioned earlier, this unification is only available in Scala and Java.

Xinh

On Tue, Jun 14, 2016 at 10:45 AM, Michael Armbrust 
wrote:

> 1) What does this really mean to an Application developer?
>>
>
> It means there are less concepts to learn.
>
>
>> 2) Why this unification was needed in Spark 2.0?
>>
>
> To simplify the API and reduce the number of concepts that needed to be
> learned.  We only didn't do it in 1.6 because we didn't want to break
> binary compatibility in a minor release.
>
>
>> 3) What changes can be observed in Spark 2.0 vs Spark 1.6?
>>
>
> There is no DataFrame class, all methods are still available, except those
> that returned an RDD (now you can call df.rdd.map if that is still what you
> want)
>
>
>> 4) Compile time safety will be there for DataFrames too?
>>
>
> Slide 7
>
>
>> 5) Python API is supported for Datasets in 2.0?
>>
>
> Slide 10
>


Re: How to set the degree of parallelism in Spark SQL?

2016-05-23 Thread Xinh Huynh
To the original question of parallelism and executors: you can have a
parallelism of 200, even with 2 executors. In the Spark UI, you should see
that the number of _tasks_ is 200 when your job involves shuffling.

Executors vs. tasks:
http://spark.apache.org/docs/latest/cluster-overview.html

Xinh

On Mon, May 23, 2016 at 5:48 AM, Mathieu Longtin 
wrote:

> Since the default is 200, I would guess you're only running 2 executors.
> Try to verify how many executor you are actually running with the web
> interface (port 8080 where the master is running).
>
> On Sat, May 21, 2016 at 11:42 PM Ted Yu  wrote:
>
>> Looks like an equal sign is missing between partitions and 200.
>>
>> On Sat, May 21, 2016 at 8:31 PM, SRK  wrote:
>>
>>> Hi,
>>>
>>> How to set the degree of parallelism in Spark SQL? I am using the
>>> following
>>> but it somehow seems to allocate only two executors at a time.
>>>
>>>  sqlContext.sql(" set spark.sql.shuffle.partitions  200  ")
>>>
>>> Thanks,
>>> Swetha
>>>
>>>
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-set-the-degree-of-parallelism-in-Spark-SQL-tp26996.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>> --
> Mathieu Longtin
> 1-514-803-8977
>


Re: Apache Spark Slack

2016-05-16 Thread Xinh Huynh
I just went to IRC. It looks like the correct channel is #apache-spark.
So, is this an "official" chat room for Spark?

Xinh


On Mon, May 16, 2016 at 9:35 AM, Dood@ODDO  wrote:

> On 5/16/2016 9:30 AM, Paweł Szulc wrote:
>
>>
>> Just realized that people have to be invited to this thing. You see,
>> that's why Gitter is just simpler.
>>
>> I will try to figure it out ASAP
>>
>>
> You don't need invitations to IRC and it has been around for decades. You
> can just go to webchat.freenode.net and login into the #spark channel (or
> you can use CLI based clients). In addition, Gitter is owned by a private
> entity, it too requires an account and - what does it give you that is
> advantageous? You wanted real-time chat about Spark - IRC has it and the
> channel has already been around for a while :-)
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Creating Nested dataframe from flat data.

2016-05-13 Thread Xinh Huynh
Hi Prashant,

You can create struct columns using the struct() function in
org.apache.spark.sql.functions --
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.functions$

val df = sc.parallelize(List(("a", "b", "c"))).toDF("A", "B", "C")

import org.apache.spark.sql.functions._
df.withColumn("D", struct($"a", $"b", $"c")).show()

---+---+---+---+ | A| B| C| D| +---+---+---+---+ | a| b| c|[a,b,c]|
+---+---+---+---+

You can repeat to get the inner nesting.

Xinh

On Fri, May 13, 2016 at 4:51 AM, Prashant Bhardwaj <
prashant2006s...@gmail.com> wrote:

> Hi
>
> Let's say I have a flat dataframe with 6 columns like.
> {
> "a": "somevalue",
> "b": "somevalue",
> "c": "somevalue",
> "d": "somevalue",
> "e": "somevalue",
> "f": "somevalue"
> }
>
> Now I want to convert this dataframe to contain nested column like
>
> {
> "nested_obj1": {
> "a": "somevalue",
> "b": "somevalue"
> },
> "nested_obj2": {
> "c": "somevalue",
> "d": "somevalue",
> "nested_obj3": {
> "e": "somevalue",
> "f": "somevalue"
> }
> }
> }
>
> How can I achieve this? I'm using Spark-sql in scala.
>
> Regards
> Prashant
>


Re: Joining a RDD to a Dataframe

2016-05-13 Thread Xinh Huynh
Hi Cyril,

In the case where there are no documents, it looks like there is a typo in
"addresses" (check the number of "d"s):

| scala> df.select(explode(df("addresses.id")).as("aid"), df("id"))  <==
addresses
| org.apache.spark.sql.AnalysisException: Cannot resolve column name "id"
among (adresses); <== adresses

As for your question about joining on a nested array column, I don't know
if it is possible. Is it supported in normal SQL? Exploding seems the right
way because then there is only one join key per row, as opposed to the
array, which could have multiple join keys inside the array.

Xinh

On Thu, May 12, 2016 at 7:32 PM, Cyril Scetbon 
wrote:

> Nobody has the answer ?
>
> Another thing I've seen is that if I have no documents at all :
>
> scala> df.select(explode(df("addresses.id")).as("aid")).collect
> res27: Array[org.apache.spark.sql.Row] = Array()
>
> Then
>
> scala> df.select(explode(df("addresses.id")).as("aid"), df("id"))
> org.apache.spark.sql.AnalysisException: Cannot resolve column name "id"
> among (adresses);
> at
> org.apache.spark.sql.DataFrame$$anonfun$resolve$1.apply(DataFrame.scala:152)
> at
> org.apache.spark.sql.DataFrame$$anonfun$resolve$1.apply(DataFrame.scala:152)
>
> Is there a better way to query nested objects and to join between a DF
> containing nested objects and another regular data frame (yes it's the
> current case)
>
> On May 9, 2016, at 00:42, Cyril Scetbon  wrote:
>
> Hi Ashish,
>
> The issue is not related to converting a RDD to a DF. I did it. I was just
> asking if I should do it differently.
>
> The issue regards the exception when using array_contains with a
> sql.Column instead of a value.
>
> I found another way to do it using explode as follows :
>
> df.select(explode(df("addresses.id")).as("aid"), df("id")).join(df_input,
> $"aid" === df_input("id")).select(df("id"))
>
> However, I'm wondering if it does almost the same or if the query is
> different and worst in term of performance.
>
> If someone can comment on it and maybe give me advices.
>
> Thank you.
>
> On May 8, 2016, at 22:12, Ashish Dubey  wrote:
>
> Is there any reason you dont want to convert this - i dont think join b/w
> RDD and DF is supported.
>
> On Sat, May 7, 2016 at 11:41 PM, Cyril Scetbon 
> wrote:
>
>> Hi,
>>
>> I have a RDD built during a spark streaming job and I'd like to join it
>> to a DataFrame (E/S input) to enrich it.
>> It seems that I can't join the RDD and the DF without converting first
>> the RDD to a DF (Tell me if I'm wrong). Here are the schemas of both DF :
>>
>> scala> df
>> res32: org.apache.spark.sql.DataFrame = [f1: string, addresses:
>> array>, id: string]
>>
>> scala> df_input
>> res33: org.apache.spark.sql.DataFrame = [id: string]
>>
>> scala> df_input.collect
>> res34: Array[org.apache.spark.sql.Row] = Array([idaddress2],
>> [idaddress12])
>>
>> I can get ids I want if I know the value to look for in addresses.id
>> using :
>>
>> scala> df.filter(array_contains(df("addresses.id"),
>> "idaddress2")).select("id").collect
>> res35: Array[org.apache.spark.sql.Row] = Array([], [YY])
>>
>> However when I try to join df_input and df and to use the previous filter
>> as the join condition I get an exception :
>>
>> scala> df.join(df_input, array_contains(df("adresses.id"),
>> df_input("id")))
>> java.lang.RuntimeException: Unsupported literal type class
>> org.apache.spark.sql.Column id
>> at
>> org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:50)
>> at
>> org.apache.spark.sql.functions$.array_contains(functions.scala:2452)
>> ...
>>
>> It seems that array_contains only supports static arguments and does not
>> replace a sql.Column by its value.
>>
>> What's the best way to achieve what I want to do ? (Also speaking in term
>> of performance)
>>
>> Thanks
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>
>
>


Re: apache spark on gitter?

2016-05-12 Thread Xinh Huynh
I agree that it can help build a community and be a place for real-time
conversations.

Xinh

On Thu, May 12, 2016 at 12:28 AM, Paweł Szulc <paul.sz...@gmail.com> wrote:

> Hi,
>
> well I guess the advantage of gitter over maling list is the same as with
> IRC. It's not actually a replacer because mailing list is also important.
> But it is lot easier to build a community around tool with ad-hoc ability
> to connect with each other.
>
> I have gitter running on constantly, I visit my favorite OSS projects on
> it from time to time to read what has recently happened. It allows me to
> stay in touch with the project, help fellow developers to with problems
> they have.
> One might argue that u can achive the same with mailing list, well it's
> hard for me to put this into words.. Malinig list is more of an async
> nature (which is good!) but some times you need more "real-time"
> experience. You know, engage in the conversation in the given moment, not
> conversation that might last few days :)
>
> TLDR: It is not a replacement, it's supplement to build the community
> around OSS. Worth having for real-time conversations.
>
> On Wed, May 11, 2016 at 10:24 PM, Xinh Huynh <xinh.hu...@gmail.com> wrote:
>
>> Hi Pawel,
>>
>> I'd like to hear more about your idea. Could you explain more why you
>> would like to have a gitter channel? What are the advantages over a mailing
>> list (like this one)? Have you had good experiences using gitter on other
>> open source projects?
>>
>> Xinh
>>
>> On Wed, May 11, 2016 at 11:10 AM, Sean Owen <so...@cloudera.com> wrote:
>>
>>> I don't know of a gitter channel and I don't use it myself, FWIW. I
>>> think anyone's welcome to start one.
>>>
>>> I hesitate to recommend this, simply because it's preferable to have
>>> one place for discussion rather than split it over several, and, we
>>> have to keep the @spark.apache.org mailing lists as the "forums of
>>> records" for project discussions.
>>>
>>> If something like gitter doesn't attract any chat, then it doesn't add
>>> any value. If it does though, then suddenly someone needs to subscribe
>>> to user@ and gitter to follow all of the conversations.
>>>
>>> I think there is a bit of a scalability problem on the user@ list at
>>> the moment, just because it covers all of Spark. But adding a
>>> different all-Spark channel doesn't help that.
>>>
>>> Anyway maybe that's "why"
>>>
>>>
>>> On Wed, May 11, 2016 at 6:26 PM, Paweł Szulc <paul.sz...@gmail.com>
>>> wrote:
>>> > no answer, but maybe one more time, a gitter channel for spark users
>>> would
>>> > be a good idea!
>>> >
>>> > On Mon, May 9, 2016 at 1:45 PM, Paweł Szulc <paul.sz...@gmail.com>
>>> wrote:
>>> >>
>>> >> Hi,
>>> >>
>>> >> I was wondering - why Spark does not have a gitter channel?
>>> >>
>>> >> --
>>> >> Regards,
>>> >> Paul Szulc
>>> >>
>>> >> twitter: @rabbitonweb
>>> >> blog: www.rabbitonweb.com
>>> >
>>> >
>>> >
>>> >
>>> > --
>>> > Regards,
>>> > Paul Szulc
>>> >
>>> > twitter: @rabbitonweb
>>> > blog: www.rabbitonweb.com
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>
>
> --
> Regards,
> Paul Szulc
>
> twitter: @rabbitonweb
> blog: www.rabbitonweb.com
>


Re: apache spark on gitter?

2016-05-11 Thread Xinh Huynh
Hi Pawel,

I'd like to hear more about your idea. Could you explain more why you would
like to have a gitter channel? What are the advantages over a mailing list
(like this one)? Have you had good experiences using gitter on other open
source projects?

Xinh

On Wed, May 11, 2016 at 11:10 AM, Sean Owen  wrote:

> I don't know of a gitter channel and I don't use it myself, FWIW. I
> think anyone's welcome to start one.
>
> I hesitate to recommend this, simply because it's preferable to have
> one place for discussion rather than split it over several, and, we
> have to keep the @spark.apache.org mailing lists as the "forums of
> records" for project discussions.
>
> If something like gitter doesn't attract any chat, then it doesn't add
> any value. If it does though, then suddenly someone needs to subscribe
> to user@ and gitter to follow all of the conversations.
>
> I think there is a bit of a scalability problem on the user@ list at
> the moment, just because it covers all of Spark. But adding a
> different all-Spark channel doesn't help that.
>
> Anyway maybe that's "why"
>
>
> On Wed, May 11, 2016 at 6:26 PM, Paweł Szulc  wrote:
> > no answer, but maybe one more time, a gitter channel for spark users
> would
> > be a good idea!
> >
> > On Mon, May 9, 2016 at 1:45 PM, Paweł Szulc 
> wrote:
> >>
> >> Hi,
> >>
> >> I was wondering - why Spark does not have a gitter channel?
> >>
> >> --
> >> Regards,
> >> Paul Szulc
> >>
> >> twitter: @rabbitonweb
> >> blog: www.rabbitonweb.com
> >
> >
> >
> >
> > --
> > Regards,
> > Paul Szulc
> >
> > twitter: @rabbitonweb
> > blog: www.rabbitonweb.com
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Evenly balance the number of items in each RDD partition

2016-05-10 Thread Xinh Huynh
Hi Ayman,

Have you looked at this:
http://stackoverflow.com/questions/23127329/how-to-define-custom-partitioner-for-spark-rdds-of-equally-sized-partition-where

It recommends defining a custom partitioner and (PairRDD) partitionBy
method to accomplish this.

Xinh

On Tue, May 10, 2016 at 1:15 PM, Ayman Khalil  wrote:

> And btw, I'm using the Python API if this makes any difference.
>
> On Tue, May 10, 2016 at 11:14 PM, Ayman Khalil 
> wrote:
>
>> Hi Don,
>>
>> This didn't help. My original rdd is already created using 10 partitions.
>> As a matter of fact, after trying with rdd.coalesce(10, shuffle =
>> true) out of curiosity, the rdd partitions became even more imbalanced:
>> [(0, 5120), (1, 5120), (2, 5120), (3, 5120), (4, *3920*), (5, 4096), (6,
>> 5120), (7, 5120), (8, 5120), (9, *6144*)]
>>
>>
>> On Tue, May 10, 2016 at 10:16 PM, Don Drake  wrote:
>>
>>> You can call rdd.coalesce(10, shuffle = true) and the returning rdd will
>>> be evenly balanced.  This obviously triggers a shuffle, so be advised it
>>> could be an expensive operation depending on your RDD size.
>>>
>>> -Don
>>>
>>> On Tue, May 10, 2016 at 12:38 PM, Ayman Khalil 
>>> wrote:
>>>
 Hello,

 I have 50,000 items parallelized into an RDD with 10 partitions, I
 would like to evenly split the items over the partitions so:
 50,000/10 = 5,000 in each RDD partition.

 What I get instead is the following (partition index, partition count):
 [(0, 4096), (1, 5120), (2, 5120), (3, 5120), (4, 5120), (5, 5120), (6,
 5120), (7, 5120), (8, 5120), (9, 4944)]

 the total is correct (4096 + 4944 + 8*5120 = 50,000) but the partitions
 are imbalanced.

 Is there a way to do that?

 Thank you,
 Ayman

>>>
>>>
>>>
>>> --
>>> Donald Drake
>>> Drake Consulting
>>> http://www.drakeconsulting.com/
>>> https://twitter.com/dondrake 
>>> 800-733-2143
>>>
>>
>>
>


Re: Spark-csv- partitionBy

2016-05-10 Thread Xinh Huynh
Hi Pradeep,

Here is a way to partition your data into different files, by calling
repartition() on the dataframe:
df.repartition(12, $"Month")
  .write
  .format(...)

This is assuming you want to partition by a "month" column where there are
12 different values. Each partition will be stored in a separate file (but
in the same folder).

Xinh

On Tue, May 10, 2016 at 2:10 AM, Mail.com  wrote:

> Hi,
>
> I don't want to reduce partitions. Should write files depending upon the
> column value.
>
> Trying to understand how reducing partition size will make it work.
>
> Regards,
> Pradeep
>
> On May 9, 2016, at 6:42 PM, Gourav Sengupta 
> wrote:
>
> Hi,
>
> its supported, try to use coalesce(1) (the spelling is wrong) and after
> that do the partitions.
>
> Regards,
> Gourav
>
> On Mon, May 9, 2016 at 7:12 PM, Mail.com  <
> pradeep.mi...@mail.com> wrote:
>
>> Hi,
>>
>> I have to write tab delimited file and need to have one directory for
>> each unique value of a column.
>>
>> I tried using spark-csv with partitionBy and seems it is not supported.
>> Is there any other option available for doing this?
>>
>> Regards,
>> Pradeep
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Accessing JSON array in Spark SQL

2016-05-05 Thread Xinh Huynh
Hi,

I am having trouble accessing an array element in JSON data with a
dataframe. Here is the schema:

val json1 = """{"f1":"1", "f1a":[{"f2":"2"}] } }"""
val rdd1 = sc.parallelize(List(json1))
val df1 = sqlContext.read.json(rdd1)
df1.printSchema()

root |-- f1: string (nullable = true) |-- f1a: array (nullable = true) |
|-- element: struct (containsNull = true) | | |-- f2: string (nullable =
true)

I would expect to be able to select the first element of "f1a" this way:
df1.select("f1a[0]").show()

org.apache.spark.sql.AnalysisException: cannot resolve 'f1a[0]' given input
columns f1, f1a;

This is with Spark 1.6.0.

Please help. A follow-up question is: can I access arbitrary levels of
nested JSON array of struct of array of struct?

Thanks,
Xinh


Re: groupBy and store in parquet

2016-05-05 Thread Xinh Huynh
Hi Michal,

Why is your solution so slow? Is it from the file IO caused by storing in a
temp file as JSON and then reading it back in and writing it as Parquet?
How are you getting "events" in the first place?

Do you have the original Kafka messages as an RDD[String]? Then how about:

1. Start with eventsAsRDD : RDD[String] (before converting to DF)
2. eventsAsRDD.map() --> use a RegEx to parse out the event_type of each
event
 For example, search the string for {"event_type"="[.*]"}
3. Now, filter by each event_type to create a separate RDD for each type,
and convert those to DF. You only convert to DF for events of the same
type, so you avoid the NULLs.

Xinh


On Thu, May 5, 2016 at 2:52 AM, Michal Vince <vince.mic...@gmail.com> wrote:

> Hi Xinh
>
> For (1) the biggest problem are those null columns. e.g. DF will have
> ~1000 columns so every partition of that DF will have ~1000 columns, one of
> the partitioned columns can have 996 null columns which is big waste of
> space (in my case more than 80% in avg)
>
> for (2) I can`t really change anything as the source belongs to the 3rd
> party
>
>
> Miso
>
> On 05/04/2016 05:21 PM, Xinh Huynh wrote:
>
> Hi Michal,
>
> For (1), would it be possible to partitionBy two columns to reduce the
> size? Something like partitionBy("event_type", "date").
>
> For (2), is there a way to separate the different event types upstream,
> like on different Kafka topics, and then process them separately?
>
> Xinh
>
> On Wed, May 4, 2016 at 7:47 AM, Michal Vince <vince.mic...@gmail.com>
> wrote:
>
>> Hi guys
>>
>> I`m trying to store kafka stream with ~5k events/s as efficiently as
>> possible in parquet format to hdfs.
>>
>> I can`t make any changes to kafka (belongs to 3rd party)
>>
>>
>> Events in kafka are in json format, but the problem is there are many
>> different event types (from different subsystems with different number of
>> fields, different size etc..) so it doesn`t make any sense to store them in
>> the same file
>>
>>
>> I was trying to read data to DF and then repartition it by event_type and
>> store
>>
>> events.write.partitionBy("event_type").format("parquet").mode(org.apache.spark.sql.SaveMode.Append).save(tmpFolder)
>>
>> which is quite fast but have 2 drawbacks that I`m aware of
>>
>> 1. output folder has only one partition which can be huge
>>
>> 2. all DFs created like this share the same schema, so even dfs with few
>> fields have tons of null fields
>>
>>
>> My second try is bit naive and really really slow (you can see why in
>> code) - filter DF by event type and store them temporarily as json (to get
>> rid of null fields)
>>
>> val event_types = events.select($"event_type").distinct().collect() // get 
>> event_types in this batch
>> for (row <- event_types) {
>>   val currDF = events.filter($"event_type" === row.get(0))
>>   val tmpPath = tmpFolder + row.get(0)
>>   
>> currDF.write.format("json").mode(org.apache.spark.sql.SaveMode.Append).save(tmpPath)
>>   sqlContext.read.json(tmpPath).write.format("parquet").save(basePath)
>>
>> }hdfs.delete(new Path(tmpFolder), true)
>>
>>
>> Do you have any suggestions for any better solution to this?
>>
>> thanks
>>
>>
>>
>
>


Re: groupBy and store in parquet

2016-05-04 Thread Xinh Huynh
Hi Michal,

For (1), would it be possible to partitionBy two columns to reduce the
size? Something like partitionBy("event_type", "date").

For (2), is there a way to separate the different event types upstream,
like on different Kafka topics, and then process them separately?

Xinh

On Wed, May 4, 2016 at 7:47 AM, Michal Vince  wrote:

> Hi guys
>
> I`m trying to store kafka stream with ~5k events/s as efficiently as
> possible in parquet format to hdfs.
>
> I can`t make any changes to kafka (belongs to 3rd party)
>
>
> Events in kafka are in json format, but the problem is there are many
> different event types (from different subsystems with different number of
> fields, different size etc..) so it doesn`t make any sense to store them in
> the same file
>
>
> I was trying to read data to DF and then repartition it by event_type and
> store
>
> events.write.partitionBy("event_type").format("parquet").mode(org.apache.spark.sql.SaveMode.Append).save(tmpFolder)
>
> which is quite fast but have 2 drawbacks that I`m aware of
>
> 1. output folder has only one partition which can be huge
>
> 2. all DFs created like this share the same schema, so even dfs with few
> fields have tons of null fields
>
>
> My second try is bit naive and really really slow (you can see why in
> code) - filter DF by event type and store them temporarily as json (to get
> rid of null fields)
>
> val event_types = events.select($"event_type").distinct().collect() // get 
> event_types in this batch
> for (row <- event_types) {
>   val currDF = events.filter($"event_type" === row.get(0))
>   val tmpPath = tmpFolder + row.get(0)
>   
> currDF.write.format("json").mode(org.apache.spark.sql.SaveMode.Append).save(tmpPath)
>   sqlContext.read.json(tmpPath).write.format("parquet").save(basePath)
>
> }hdfs.delete(new Path(tmpFolder), true)
>
>
> Do you have any suggestions for any better solution to this?
>
> thanks
>
>
>


Re: How can I join two DataSet of same case class?

2016-03-11 Thread Xinh Huynh
I think you have to use an alias. To provide an alias to a Dataset:

val d1 = a.as("d1")
val d2 = b.as("d2")

Then join, using the alias in the column names:
d1.joinWith(d2, $"d1.edid" === $"d2.edid")

Finally, please doublecheck your column names. I did not see "edid" in your
case class.

Xinh

On Thu, Mar 10, 2016 at 9:09 PM, 박주형  wrote:

> Hi. I want to join two DataSet. but below stderr is shown
>
> 16/03/11 13:55:51 WARN ColumnName: Constructing trivially true equals
> predicate, ''edid = 'edid'. Perhaps you need to use aliases.
> Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot
> resolve 'edid' given input columns dataType, avg, sigma, countUnique,
> numRows, recentEdid, categoryId, accCount, statType, categoryId, max,
> accCount, firstQuarter, recentEdid, replicationRateAvg, numRows, min,
> countNotNull, countNotNull, dcid, numDistinctRows, max, firstQuarter, min,
> replicationRateAvg, dcid, statType, avg, sigma, dataType, median,
> thirdQuarter, numDistinctRows, median, countUnique, thirdQuarter;
>
>
> my case class is
> case class Stat(statType: Int, dataType: Int, dcid: Int,
> categoryId: Int, recentEdid: Int, countNotNull: Int, countUnique:
> Int, accCount: Int, replicationRateAvg: Double,
> numDistinctRows: Double, numRows: Double,
> min: Double, max: Double, sigma: Double, avg: Double,
> firstQuarter: Double, thirdQuarter: Double, median: Double)
>
> and my code is
> a.joinWith(b, $"edid" === $"edid").show()
>
> If i use DataFrame, renaming a’s column could solve it. How can I join two
> DataSet of same case class?
>


Re: S3 Zip File Loading Advice

2016-03-09 Thread Xinh Huynh
Could you wrap the ZipInputStream in a List, since a subtype of
TraversableOnce[?] is required?

case (name, content) => List(new ZipInputStream(content.open))

Xinh

On Wed, Mar 9, 2016 at 7:07 AM, Benjamin Kim  wrote:

> Hi Sabarish,
>
> I found a similar posting online where I should use the S3 listKeys.
> http://stackoverflow.com/questions/24029873/how-to-read-multiple-text-files-into-a-single-rdd.
> Is this what you were thinking?
>
> And, your assumption is correct. The zipped CSV file contains only a
> single file. I found this posting.
> http://stackoverflow.com/questions/28969757/zip-support-in-apache-spark.
> I see how to do the unzipping, but I cannot get it to work when running the
> code directly.
>
> ...
> import java.io.{ IOException, FileOutputStream, FileInputStream, File }
> import java.util.zip.{ ZipEntry, ZipInputStream }
> import org.apache.spark.input.PortableDataStream
>
>
> sc.hadoopConfiguration.set("fs.s3n.impl","org.apache.hadoop.fs.s3native.NativeS3FileSystem")
> sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", accessKey)
> sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", secretKey)
>
> val zipFile = "
> s3n://events/2016/03/01/00/event-20160301.00-4877ff81-928f-4da4-89b6-6d40a28d61c7.csv.zip
> "
> val zipFileRDD = sc.binaryFiles(zipFile).flatMap { case (name: String,
> content: PortableDataStream) => new ZipInputStream(content.open) }
>
> :95: error: type mismatch;
>  found   : java.util.zip.ZipInputStream
>  required: TraversableOnce[?]
>  val zipFileRDD = sc.binaryFiles(zipFile).flatMap { case (name,
> content) => new ZipInputStream(content.open) }
>
>   ^
>
> Thanks,
> Ben
>
> On Mar 9, 2016, at 12:03 AM, Sabarish Sasidharan 
> wrote:
>
> You can use S3's listKeys API and do a diff between consecutive listKeys
> to identify what's new.
>
> Are there multiple files in each zip? Single file archives are processed
> just like text as long as it is one of the supported compression formats.
>
> Regards
> Sab
>
> On Wed, Mar 9, 2016 at 10:33 AM, Benjamin Kim  wrote:
>
>> I am wondering if anyone can help.
>>
>> Our company stores zipped CSV files in S3, which has been a big headache
>> from the start. I was wondering if anyone has created a way to iterate
>> through several subdirectories (s3n://events/2016/03/01/00,
>> s3n://2016/03/01/01, etc.) in S3 to find the newest files and load them.
>> It would be a big bonus to include the unzipping of the file in the process
>> so that the CSV can be loaded directly into a dataframe for further
>> processing. I’m pretty sure that the S3 part of this request is not
>> uncommon. I would think the file being zipped is uncommon. If anyone can
>> help, I would truly be grateful for I am new to Scala and Spark. This would
>> be a great help in learning.
>>
>> Thanks,
>> Ben
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>
>


Re: reading the parquet file

2016-03-09 Thread Xinh Huynh
You might want to avoid that unionAll(), which seems to be repeated over
1000 times. Could you do a collect() in each iteration, and collect your
results in a local Array instead of a DataFrame? How many rows are returned
in "temp1"?

Xinh

On Tue, Mar 8, 2016 at 10:00 PM, Angel Angel 
wrote:

> Hello Sir/Madam,
>
>
> I writing the spark application in spark 1.4.0.
>
> I have one text file with the size of 8 GB.
> I save that file in parquet format
>
>
> val df2 =
> sc.textFile("/root/Desktop/database_200/database_200.txt").map(_.split(",")).map(p
> => Table(p(0),p(1).trim.toInt, p(2).trim.toInt, p(3)))toDF
>
>
> df2.write.parquet("hdfs://hadoopm0:8020/tmp/input1/database4.parquet")
>
> After that i did the following operations
>
>
> val df1 =
> sqlContext.read.parquet("dfs://hadoopm0:8020/tmp/input1/database4.parquet")
>
>
> var a=0
>
> var k = df1.filter(df1("Address").equalTo(Array_Ele(0) ))
>
>
> for( a <-2 until 2720 by 2){
>
>
> var temp= df1.filter(df1("Address").equalTo(Array_Ele(a)))
>
>
> var temp1 =
> temp.select(temp("Address"),temp("Couple_time")-Array_Ele(a+1),temp("WT_ID"),temp("WT_Name"))
>
>
> k =k.unionAll(temp1) }
>
>
> val WT_ID_Sort  = k.groupBy("WT_ID").count().sort(desc("count"))
>
>
>
> WT_ID_Sort.show()
>
>
>
> after that I am getting the following warning and my task is disconnected
> again and again.
>
>
>
>
>
> [image: Inline image 1]
>
>
>
>
> I need to do many iterative operations on that df1 file.
>
>
> So can any one help me to solve this problem?
>
> thanks in advance.
>
>
> Thanks.
>
>
>


Re: how to implement and deploy robust streaming apps

2016-03-08 Thread Xinh Huynh
If you would like an overview of Spark Stream and fault tolerance, these
slides are great (Slides 24+ focus on fault tolerance; Slide 52 is on
resilience to traffic spikes):
http://www.lightbend.com/blog/four-things-to-know-about-reliable-spark-streaming-typesafe-databricks

This recent Spark Summit talk is all about backpressure and dynamic
scaling:
https://spark-summit.org/east-2016/events/building-robust-scalable-and-adaptive-applications-on-spark-streaming/

>From the Spark docs, backpressure works by placing a limit on the receiving
rate, and this limit is adjusted dynamically based on processing times. If
there is a burst and the data source generates events at a higher rate,
those extra events will get backed up in the data source. So, how much
buffering is available in the data source? For instance, Kafka can use HDFS
as a huge buffer, with capacity to buffer traffic spikes. Spark itself
doesn't handle the buffering of unprocessed events, so in some cases, Kafka
(or some other storage) is placed between the data source and Spark to
provide a buffer.

Xinh


On Mon, Mar 7, 2016 at 2:10 PM, Andy Davidson  wrote:

> One of the challenges we need to prepare for with streaming apps is bursty
> data. Typically we need to estimate our worst case data load and make sure
> we have enough capacity
>
>
> It not obvious what best practices are with spark streaming.
>
>
>- we have implemented check pointing as described in the prog guide
>- Use stand alone cluster manager and spark-submit
>- We use the mgmt console to kill drives when needed
>
>
>- we plan to configure write ahead spark.streaming.backpressure.enabled
> to true.
>
>
>- our application runs a single unreliable receive
>   - We run multiple implementation configured to partition the input
>
>
> As long as our processing time is < our windowing time everything is fine
>
> In the streaming systems I have worked on in the past we scaled out by
> using load balancers and proxy farms to create buffering capacity. Its not
> clear how to scale out spark
>
> In our limited testing it seems like we have a single app configure to
> receive a predefined portion of the data. Once it is stated we can not add
> additional resources. Adding cores and memory does not seem increase our
> capacity
>
>
> Kind regards
>
> Andy
>
>
>


Re: Do we need schema for Parquet files with Spark?

2016-03-03 Thread Xinh Huynh
Hi Ashok,

On the Spark SQL side, when you create a dataframe, it will have a schema (each 
column has a type such as Int or String). Then when you save that dataframe as 
parquet format, Spark translates the dataframe schema into Parquet data types. 
(See spark.sql.execution.datasources.parquet.) Then Parquet does the dictionary 
encoding automatically (if applicable) based on the data values; this encoding 
is not specified by the user. Parquet figures out the right encoding to use for 
you.

Xinh

> On Mar 3, 2016, at 7:32 PM, ashokkumar rajendran 
>  wrote:
> 
> Hi, 
> 
> I am exploring to use Apache Parquet with Spark SQL in our project. I notice 
> that Apache Parquet uses different encoding for different columns. The 
> dictionary encoding in Parquet will be one of the good ones for our 
> performance. I do not see much documentation in Spark or Parquet on how to 
> configure this. For example, how would Parquet know dictionary of words if 
> there is no schema provided by user? Where/how to specify my schema / config 
> for Parquet format?
> 
> Could not find Apache Parquet mailing list in the official site. It would be 
> great if anyone could share it as well.
> 
> Regards
> Ashok

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: rdd cache name

2016-03-02 Thread Xinh Huynh
Hi Charles,

You can set the RDD name before using it. Just do before caching:
(Scala) myRdd.setName("Charles RDD")
(Python) myRdd.setName('Charles RDD')
Reference: PySpark doc:
http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD

Fraction cached is the percentage of partitions of an RDD that are cached.
>From the code:
(rdd.numCachedPartitions * 100.0 / rdd.numPartitions)
Code is here:
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala
Fraction cached will be less than 100% if there isn't enough room for all
cached RDDs to fit in the cache. If it's a problem, you may want to
increase your in-memory cache size or cache off-heap or to disk.

Xinh

On Wed, Mar 2, 2016 at 1:48 AM, charles li  wrote:

> hi, there, I feel a little confused about the *cache* in spark.
>
> first, is there any way to *customize the cached RDD name*, it's not
> convenient for me when looking at the storage page, there are the kind of
> RDD in the RDD Name column, I hope to make it as my customized name, kinds
> of 'rdd 1', 'rrd of map', 'rdd of groupby' and so on.
>
> second, can some one tell me what exactly the '*Fraction Cached*' mean
> under the hood?
>
> great thanks
>
>
>
> ​
>
> --
> *--*
> a spark lover, a quant, a developer and a good man.
>
> http://github.com/litaotao
>