Re: How to make new composite columns by combining rows in the same group?

2016-08-26 Thread Xinh Huynh
That looks like a pivot table. Have you looked into using the pivot table method with DataFrames? Xinh > On Aug 26, 2016, at 4:54 AM, Rex X wrote: > > 1. Given following CSV file > $cat data.csv > > ID,City,Zip,Price,Rating > 1,A,95123,100,0 > 1,B,95124,102,1 >

Re: How to filter based on a constant value

2016-07-30 Thread Xinh Huynh
Hi Mitch, I think you were missing a step: [your result] maxdate: org.apache.spark.sql.Row = [2015-12-15] Since maxdate is of type Row, you would want to extract the first column of the Row with: >> val maxdateStr = maxdate.getString(0) assuming the column type is String. API doc is here:

Re: DataFrame Min By Column

2016-07-08 Thread Xinh Huynh
Hi Pedro, I could not think of a way using an aggregate. It's possible with a window function, partitioned on user and ordered by time: // Assuming "df" holds your dataframe ... import org.apache.spark.sql.functions._ import org.apache.spark.sql.expressions.Window val wSpec =

Re: Using R code as part of a Spark Application

2016-06-29 Thread Xinh Huynh
29, 2016 at 3:29 PM, Sean Owen <so...@cloudera.com> wrote: >> >>> Oh, interesting: does this really mean the return of distributing R >>> code from driver to executors and running it remotely, or do I >>> misunderstand? this would require having R on the executor n

Re: Using R code as part of a Spark Application

2016-06-29 Thread Xinh Huynh
There is some new SparkR functionality coming in Spark 2.0, such as "dapply". You could use SparkR to load a Parquet file and then run "dapply" to apply a function to each partition of a DataFrame. Info about loading Parquet file:

Re: Best way to tranform string label into long label for classification problem

2016-06-28 Thread Xinh Huynh
Hi Jao, Here's one option: http://spark.apache.org/docs/latest/ml-features.html#stringindexer "StringIndexer encodes a string column of labels to a column of label indices. The indices are in [0, numLabels), ordered by label frequencies." Xinh On Tue, Jun 28, 2016 at 12:29 AM, Jaonary Rabarisoa

Re: What is the explanation of "ConvertToUnsafe" in "Physical Plan"

2016-06-27 Thread Xinh Huynh
I guess it has to do with the Tungsten explicit memory management that builds on sun.misc.Unsafe. The "ConvertToUnsafe" class converts Java-object-based rows into UnsafeRows, which has the Spark internal memory-efficient format. Here is the related code in 1.6: ConvertToUnsafe is defined in:

Re: DataFrame versus Dataset creation and usage

2016-06-24 Thread Xinh Huynh
Hi Martin, Since your schema is dynamic, how would you use Datasets? Would you know ahead of time the row type T in a Dataset[T]? One option is to start with DataFrames in the beginning of your data pipeline, figure out the field types, and then switch completely over to RDDs or Dataset in the

Re: Confusing argument of sql.functions.count

2016-06-22 Thread Xinh Huynh
I can see how the linked documentation could be confusing: "Aggregate function: returns the number of items in a group." What it doesn't mention is that it returns the number of rows for which the given column is non-null. Xinh On Wed, Jun 22, 2016 at 9:31 AM, Takeshi Yamamuro

Re: Dataset Select Function after Aggregate Error

2016-06-17 Thread Xinh Huynh
Hi Pedro, In 1.6.1, you can do: >> ds.groupBy(_.uid).count().map(_._1) or >> ds.groupBy(_.uid).count().select($"value".as[String]) It doesn't have the exact same syntax as for DataFrame. http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Dataset It might be different

Re: Spark 2.0: Unify DataFrames and Datasets question

2016-06-14 Thread Xinh Huynh
Hi Arun, This documentation may be helpful: The 2.0-preview Scala doc for Dataset class: http://spark.apache.org/docs/2.0.0-preview/api/scala/index.html#org.apache.spark.sql.Dataset Note that the Dataset API has completely changed from 1.6. In 2.0, there is no separate DataFrame class. Rather,

Re: How to set the degree of parallelism in Spark SQL?

2016-05-23 Thread Xinh Huynh
To the original question of parallelism and executors: you can have a parallelism of 200, even with 2 executors. In the Spark UI, you should see that the number of _tasks_ is 200 when your job involves shuffling. Executors vs. tasks: http://spark.apache.org/docs/latest/cluster-overview.html Xinh

Re: Apache Spark Slack

2016-05-16 Thread Xinh Huynh
I just went to IRC. It looks like the correct channel is #apache-spark. So, is this an "official" chat room for Spark? Xinh On Mon, May 16, 2016 at 9:35 AM, Dood@ODDO wrote: > On 5/16/2016 9:30 AM, Paweł Szulc wrote: > >> >> Just realized that people have to be invited

Re: Creating Nested dataframe from flat data.

2016-05-13 Thread Xinh Huynh
Hi Prashant, You can create struct columns using the struct() function in org.apache.spark.sql.functions -- http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.functions$ val df = sc.parallelize(List(("a", "b", "c"))).toDF("A", "B", "C") import

Re: Joining a RDD to a Dataframe

2016-05-13 Thread Xinh Huynh
Hi Cyril, In the case where there are no documents, it looks like there is a typo in "addresses" (check the number of "d"s): | scala> df.select(explode(df("addresses.id")).as("aid"), df("id")) <== addresses | org.apache.spark.sql.AnalysisException: Cannot resolve column name "id" among

Re: apache spark on gitter?

2016-05-12 Thread Xinh Huynh
s good!) but some times you need more "real-time" > experience. You know, engage in the conversation in the given moment, not > conversation that might last few days :) > > TLDR: It is not a replacement, it's supplement to build the community > around OSS. Worth having for real-time

Re: apache spark on gitter?

2016-05-11 Thread Xinh Huynh
Hi Pawel, I'd like to hear more about your idea. Could you explain more why you would like to have a gitter channel? What are the advantages over a mailing list (like this one)? Have you had good experiences using gitter on other open source projects? Xinh On Wed, May 11, 2016 at 11:10 AM, Sean

Re: Evenly balance the number of items in each RDD partition

2016-05-10 Thread Xinh Huynh
Hi Ayman, Have you looked at this: http://stackoverflow.com/questions/23127329/how-to-define-custom-partitioner-for-spark-rdds-of-equally-sized-partition-where It recommends defining a custom partitioner and (PairRDD) partitionBy method to accomplish this. Xinh On Tue, May 10, 2016 at 1:15 PM,

Re: Spark-csv- partitionBy

2016-05-10 Thread Xinh Huynh
Hi Pradeep, Here is a way to partition your data into different files, by calling repartition() on the dataframe: df.repartition(12, $"Month") .write .format(...) This is assuming you want to partition by a "month" column where there are 12 different values. Each partition will be stored in

Accessing JSON array in Spark SQL

2016-05-05 Thread Xinh Huynh
Hi, I am having trouble accessing an array element in JSON data with a dataframe. Here is the schema: val json1 = """{"f1":"1", "f1a":[{"f2":"2"}] } }""" val rdd1 = sc.parallelize(List(json1)) val df1 = sqlContext.read.json(rdd1) df1.printSchema() root |-- f1: string (nullable = true) |-- f1a:

Re: groupBy and store in parquet

2016-05-05 Thread Xinh Huynh
000 columns so every partition of that DF will have ~1000 columns, one of > the partitioned columns can have 996 null columns which is big waste of > space (in my case more than 80% in avg) > > for (2) I can`t really change anything as the source belongs to the 3rd > party >

Re: groupBy and store in parquet

2016-05-04 Thread Xinh Huynh
Hi Michal, For (1), would it be possible to partitionBy two columns to reduce the size? Something like partitionBy("event_type", "date"). For (2), is there a way to separate the different event types upstream, like on different Kafka topics, and then process them separately? Xinh On Wed, May

Re: How can I join two DataSet of same case class?

2016-03-11 Thread Xinh Huynh
I think you have to use an alias. To provide an alias to a Dataset: val d1 = a.as("d1") val d2 = b.as("d2") Then join, using the alias in the column names: d1.joinWith(d2, $"d1.edid" === $"d2.edid") Finally, please doublecheck your column names. I did not see "edid" in your case class. Xinh

Re: S3 Zip File Loading Advice

2016-03-09 Thread Xinh Huynh
Could you wrap the ZipInputStream in a List, since a subtype of TraversableOnce[?] is required? case (name, content) => List(new ZipInputStream(content.open)) Xinh On Wed, Mar 9, 2016 at 7:07 AM, Benjamin Kim wrote: > Hi Sabarish, > > I found a similar posting online where

Re: reading the parquet file

2016-03-09 Thread Xinh Huynh
You might want to avoid that unionAll(), which seems to be repeated over 1000 times. Could you do a collect() in each iteration, and collect your results in a local Array instead of a DataFrame? How many rows are returned in "temp1"? Xinh On Tue, Mar 8, 2016 at 10:00 PM, Angel Angel

Re: how to implement and deploy robust streaming apps

2016-03-08 Thread Xinh Huynh
If you would like an overview of Spark Stream and fault tolerance, these slides are great (Slides 24+ focus on fault tolerance; Slide 52 is on resilience to traffic spikes): http://www.lightbend.com/blog/four-things-to-know-about-reliable-spark-streaming-typesafe-databricks This recent Spark

Re: Do we need schema for Parquet files with Spark?

2016-03-03 Thread Xinh Huynh
Hi Ashok, On the Spark SQL side, when you create a dataframe, it will have a schema (each column has a type such as Int or String). Then when you save that dataframe as parquet format, Spark translates the dataframe schema into Parquet data types. (See spark.sql.execution.datasources.parquet.)

Re: rdd cache name

2016-03-02 Thread Xinh Huynh
Hi Charles, You can set the RDD name before using it. Just do before caching: (Scala) myRdd.setName("Charles RDD") (Python) myRdd.setName('Charles RDD') Reference: PySpark doc: http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD Fraction cached is the percentage of partitions