I got it running by myself On Wed, Aug 5, 2015 at 10:27 PM, Ganelin, Ilya <ilya.gane...@capitalone.com> wrote:
> Have you tried reading the spark documentation? > > http://spark.apache.org/docs/latest/programming-guide.html > > > > Thank you, > Ilya Ganelin > > > > > -----Original Message----- > *From: *ÐΞ€ρ@Ҝ (๏̯͡๏) [deepuj...@gmail.com] > *Sent: *Thursday, August 06, 2015 12:41 AM Eastern Standard Time > *To: *Philip Weaver > *Cc: *user > *Subject: *Re: How to read gzip data in Spark - Simple question > > how do i persist the RDD to HDFS ? > > On Wed, Aug 5, 2015 at 8:32 PM, Philip Weaver <philip.wea...@gmail.com> > wrote: > >> This message means that java.util.Date is not supported by Spark >> DataFrame. You'll need to use java.sql.Date, I believe. >> >> On Wed, Aug 5, 2015 at 8:29 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> >> wrote: >> >>> That seem to be working. however i see a new exception >>> >>> Code: >>> def formatStringAsDate(dateStr: String) = new >>> SimpleDateFormat("yyyy-MM-dd").parse(dateStr) >>> >>> >>> //(2015-07-27,12459,,31242,6,Daily,-999,2099-01-01,2099-01-02,1,0,0.1,0,1,-1,isGeo,,,204,694.0,1.9236856708701322E-4,0.0,-4.48,0.0,0.0,0.0,) >>> val rowStructText = >>> sc.textFile("/user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-00003.gz") >>> case class Summary(f1: Date, f2: Long, f3: Long, f4: Integer, f5 : >>> String, f6: Integer, f7 : Date, f8: Date, f9: Integer, f10: Integer, f11: >>> Float, f12: Integer, f13: Integer, f14: String) >>> >>> val summary = rowStructText.map(s => s.split(",")).map( >>> s => Summary(formatStringAsDate(s(0)), >>> s(1).replaceAll("\"", "").toLong, >>> s(3).replaceAll("\"", "").toLong, >>> s(4).replaceAll("\"", "").toInt, >>> s(5).replaceAll("\"", ""), >>> s(6).replaceAll("\"", "").toInt, >>> formatStringAsDate(s(7)), >>> formatStringAsDate(s(8)), >>> s(9).replaceAll("\"", "").toInt, >>> s(10).replaceAll("\"", "").toInt, >>> s(11).replaceAll("\"", "").toFloat, >>> s(12).replaceAll("\"", "").toInt, >>> s(13).replaceAll("\"", "").toInt, >>> s(14).replaceAll("\"", "") >>> ) >>> ).toDF() >>> bank.registerTempTable("summary") >>> >>> >>> //Output >>> import java.text.SimpleDateFormat import java.util.Calendar import >>> java.util.Date formatStringAsDate: (dateStr: String)java.util.Date >>> rowStructText: org.apache.spark.rdd.RDD[String] = >>> /user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-00003.gz >>> MapPartitionsRDD[105] at textFile at <console>:60 defined class Summary x: >>> org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[106] at map at >>> <console>:61 java.lang.UnsupportedOperationException: Schema for type >>> java.util.Date is not supported at >>> org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:188) >>> at >>> org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:30) >>> at >>> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:164) >>> >>> Any suggestions >>> >>> On Wed, Aug 5, 2015 at 8:18 PM, Philip Weaver <philip.wea...@gmail.com> >>> wrote: >>> >>>> The parallelize method does not read the contents of a file. It simply >>>> takes a collection and distributes it to the cluster. In this case, the >>>> String is a collection 67 characters. >>>> >>>> Use sc.textFile instead of sc.parallelize, and it should work as you >>>> want. >>>> >>>> On Wed, Aug 5, 2015 at 8:12 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> >>>> wrote: >>>> >>>>> I have csv data that is embedded in gzip format on HDFS. >>>>> >>>>> *With Pig* >>>>> >>>>> a = load >>>>> '/user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-00003.gz' >>>>> using >>>>> PigStorage(); >>>>> >>>>> b = limit a 10 >>>>> >>>>> >>>>> (2015-07-27,12459,,31243,6,Daily,-999,2099-01-01,2099-01-02,4,0,0.1,0,1,,,,,203,4810370.0,1.4090459061723766,1.017458,-0.03,-0.11,0.05,0.468666,) >>>>> >>>>> >>>>> (2015-07-27,12459,,31241,6,Daily,-999,2099-01-01,2099-01-02,4,0,0.1,0,1,0,isGeo,,,203,7937613.0,1.1624841995932425,1.11562,-0.06,-0.15,0.03,0.233283,) >>>>> >>>>> >>>>> However with Spark >>>>> >>>>> val rowStructText = >>>>> sc.parallelize("/user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-00000.gz") >>>>> >>>>> val x = rowStructText.map(s => { >>>>> >>>>> println(s) >>>>> >>>>> s} >>>>> >>>>> ) >>>>> >>>>> x.count >>>>> >>>>> Questions >>>>> >>>>> 1) x.count always shows 67 irrespective of the path i change in >>>>> sc.parallelize >>>>> >>>>> 2) It shows x as RDD[Char] instead of String >>>>> >>>>> 3) println() never emits the rows. >>>>> >>>>> Any suggestions >>>>> >>>>> -Deepak >>>>> >>>>> >>>>> >>>>> -- >>>>> Deepak >>>>> >>>>> >>>> >>> >>> >>> -- >>> Deepak >>> >>> >> > > > -- > Deepak > > > ------------------------------ > > The information contained in this e-mail is confidential and/or > proprietary to Capital One and/or its affiliates and may only be used > solely in performance of work or services for Capital One. The information > transmitted herewith is intended only for use by the individual or entity > to which it is addressed. If the reader of this message is not the intended > recipient, you are hereby notified that any review, retransmission, > dissemination, distribution, copying or other use of, or taking of any > action in reliance upon this information is strictly prohibited. If you > have received this communication in error, please contact the sender and > delete the material from your computer. > -- Deepak