I encourage you to find the answer this this on your own :). On Wed, Aug 5, 2015 at 9:43 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> wrote:
> Code: > > val summary = rowStructText.map(s => s.split(",")).map( > { > s => > Summary(formatStringAsDate(s(0)), > s(1).replaceAll("\"", "").toLong, > s(3).replaceAll("\"", "").toLong, > s(4).replaceAll("\"", "").toInt, > s(5).replaceAll("\"", ""), > s(6).replaceAll("\"", "").toInt, > formatStringAsDate(s(7)), > formatStringAsDate(s(8)), > s(9).replaceAll("\"", "").toInt, > s(10).replaceAll("\"", "").toInt, > s(11).replaceAll("\"", "").toFloat, > s(12).replaceAll("\"", "").toInt, > s(13).replaceAll("\"", "").toInt, > s(14).replaceAll("\"", "") > ) > } > ) > summary.saveAsTextFile("sparkO") > > Exception: > import java.text.SimpleDateFormat import java.util.Calendar import > java.sql.Date import org.apache.spark.storage.StorageLevel > formatStringAsDate: (dateStr: String)java.sql.Date rowStructText: > org.apache.spark.rdd.RDD[String] = > /user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-00003.gz > MapPartitionsRDD[263] at textFile at <console>:154 defined class Summary > summary: org.apache.spark.rdd.RDD[Summary] = MapPartitionsRDD[265] at map > at <console>:159 sumDF: org.apache.spark.sql.DataFrame = [f1: date, f2: > bigint, f3: bigint, f4: int, f5: string, f6: int, f7: date, f8: date, f9: > int, f10: int, f11: float, f12: int, f13: int, f14: string] > org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 > in stage 45.0 failed 4 times, most recent failure: Lost task 0.3 in stage > 45.0 (TID 1872, datanode-6-3486.phx01.dev.ebayc3.com): > java.lang.ArrayIndexOutOfBoundsException: 1 at > $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(<console>:163) > at > $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(<console>:161) > at scala.collection.Iterator$$anon > > On Wed, Aug 5, 2015 at 9:40 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> wrote: > >> how do i persist the RDD to HDFS ? >> >> On Wed, Aug 5, 2015 at 8:32 PM, Philip Weaver <philip.wea...@gmail.com> >> wrote: >> >>> This message means that java.util.Date is not supported by Spark >>> DataFrame. You'll need to use java.sql.Date, I believe. >>> >>> On Wed, Aug 5, 2015 at 8:29 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> >>> wrote: >>> >>>> That seem to be working. however i see a new exception >>>> >>>> Code: >>>> def formatStringAsDate(dateStr: String) = new >>>> SimpleDateFormat("yyyy-MM-dd").parse(dateStr) >>>> >>>> >>>> //(2015-07-27,12459,,31242,6,Daily,-999,2099-01-01,2099-01-02,1,0,0.1,0,1,-1,isGeo,,,204,694.0,1.9236856708701322E-4,0.0,-4.48,0.0,0.0,0.0,) >>>> val rowStructText = >>>> sc.textFile("/user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-00003.gz") >>>> case class Summary(f1: Date, f2: Long, f3: Long, f4: Integer, f5 : >>>> String, f6: Integer, f7 : Date, f8: Date, f9: Integer, f10: Integer, f11: >>>> Float, f12: Integer, f13: Integer, f14: String) >>>> >>>> val summary = rowStructText.map(s => s.split(",")).map( >>>> s => Summary(formatStringAsDate(s(0)), >>>> s(1).replaceAll("\"", "").toLong, >>>> s(3).replaceAll("\"", "").toLong, >>>> s(4).replaceAll("\"", "").toInt, >>>> s(5).replaceAll("\"", ""), >>>> s(6).replaceAll("\"", "").toInt, >>>> formatStringAsDate(s(7)), >>>> formatStringAsDate(s(8)), >>>> s(9).replaceAll("\"", "").toInt, >>>> s(10).replaceAll("\"", "").toInt, >>>> s(11).replaceAll("\"", "").toFloat, >>>> s(12).replaceAll("\"", "").toInt, >>>> s(13).replaceAll("\"", "").toInt, >>>> s(14).replaceAll("\"", "") >>>> ) >>>> ).toDF() >>>> bank.registerTempTable("summary") >>>> >>>> >>>> //Output >>>> import java.text.SimpleDateFormat import java.util.Calendar import >>>> java.util.Date formatStringAsDate: (dateStr: String)java.util.Date >>>> rowStructText: org.apache.spark.rdd.RDD[String] = >>>> /user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-00003.gz >>>> MapPartitionsRDD[105] at textFile at <console>:60 defined class Summary x: >>>> org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[106] at map at >>>> <console>:61 java.lang.UnsupportedOperationException: Schema for type >>>> java.util.Date is not supported at >>>> org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:188) >>>> at >>>> org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:30) >>>> at >>>> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:164) >>>> >>>> >>>> Any suggestions >>>> >>>> On Wed, Aug 5, 2015 at 8:18 PM, Philip Weaver <philip.wea...@gmail.com> >>>> wrote: >>>> >>>>> The parallelize method does not read the contents of a file. It simply >>>>> takes a collection and distributes it to the cluster. In this case, the >>>>> String is a collection 67 characters. >>>>> >>>>> Use sc.textFile instead of sc.parallelize, and it should work as you >>>>> want. >>>>> >>>>> On Wed, Aug 5, 2015 at 8:12 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> >>>>> wrote: >>>>> >>>>>> I have csv data that is embedded in gzip format on HDFS. >>>>>> >>>>>> *With Pig* >>>>>> >>>>>> a = load >>>>>> '/user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-00003.gz' >>>>>> using >>>>>> PigStorage(); >>>>>> >>>>>> b = limit a 10 >>>>>> >>>>>> >>>>>> (2015-07-27,12459,,31243,6,Daily,-999,2099-01-01,2099-01-02,4,0,0.1,0,1,,,,,203,4810370.0,1.4090459061723766,1.017458,-0.03,-0.11,0.05,0.468666,) >>>>>> >>>>>> >>>>>> (2015-07-27,12459,,31241,6,Daily,-999,2099-01-01,2099-01-02,4,0,0.1,0,1,0,isGeo,,,203,7937613.0,1.1624841995932425,1.11562,-0.06,-0.15,0.03,0.233283,) >>>>>> >>>>>> >>>>>> However with Spark >>>>>> >>>>>> val rowStructText = >>>>>> sc.parallelize("/user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-00000.gz") >>>>>> >>>>>> val x = rowStructText.map(s => { >>>>>> >>>>>> println(s) >>>>>> >>>>>> s} >>>>>> >>>>>> ) >>>>>> >>>>>> x.count >>>>>> >>>>>> Questions >>>>>> >>>>>> 1) x.count always shows 67 irrespective of the path i change in >>>>>> sc.parallelize >>>>>> >>>>>> 2) It shows x as RDD[Char] instead of String >>>>>> >>>>>> 3) println() never emits the rows. >>>>>> >>>>>> Any suggestions >>>>>> >>>>>> -Deepak >>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Deepak >>>>>> >>>>>> >>>>> >>>> >>>> >>>> -- >>>> Deepak >>>> >>>> >>> >> >> >> -- >> Deepak >> >> > > > -- > Deepak > >