Ricky,

You may need to use map instead of flatMap in your case

*val rowRDD=sc.textFile("/user/spark/short_model").map(_.split("\\t")).map(p
=> Row(...))*


Thanks!

-Terry


On Fri, Aug 28, 2015 at 5:08 PM, our...@cnsuning.com <our...@cnsuning.com>
wrote:

> hi all,
>
> when using  spark sql ,A problem bothering me.
>
> the codeing as following:
>
>      *val schemaString =
> "visitor_id,cust_num,gds_id,l1_gds_group_cd,l4_gds_group_cd,pc_gds_addcart,pc_gds_collect_num,pc_gds_four_page_pv,pc_gds_four_page_time,pc_gds_four_page_fromsearch_pv,pc_gds_four_page_fromlist_pv,pc_gds_four_page_fromrec_pv,pc_gds_four_page_fromcuxiao_pv,pc_four_page_num,pc_group_gds_addcart,pc_group_gds_collect,pc_group_fourpage_pv,pc_group_fourpage_time,pc_visitor_pv,pc_search_pv,pc_list_pv,pc_is_view1,pc_is_view,pc_view_cycle_days,pc_view_days,wap_gds_addcart,wap_gds_collect_num,wap_gds_four_page_pv,wap_gds_four_page_time,wap_gds_four_page_fromsearch_pv,wap_gds_four_page_fromlist_pv,wap_gds_four_page_fromrec_pv,wap_gds_four_page_fromcuxiao_pv,wap_four_page_num,wap_group_gds_addcart,wap_group_gds_collect,wap_group_fourpage_pv,wap_group_fourpage_time,wap_visitor_pv,wap_search_pv,wap_list_pv,wap_is_view1,wap_is_view,wap_view_cycle_days,wap_view_days,app_gds_addcart,app_gds_collect_num,app_gds_four_page_pv,app_gds_four_page_time,app_gds_four_page_fromsearch_pv,app_gds_four_page_fromlist_pv,app_gds_four_page_fromrec_pv,app_gds_four_page_fromcuxiao_pv,app_four_page_num,app_group_gds_addcart,app_group_gds_collect,app_group_fourpage_pv,app_group_fourpage_time,app_visitor_pv,app_search_pv,app_list_pv,app_is_view1,app_is_view,app_view_cycle_days,app_view_days,gds_score_desc,l4_gds_group_rate_n,decision_cycle_days,decision_days,decision_pv,is_order,statis_date"*
> //*schemaString.length=72 *
>
> *import org.apache.spark.sql.Row;*
>
> *import org.apache.spark.sql.types.{StructType,StructField,StringType};*
>
> *val schema =StructType( schemaString.split(",").map(fieldName => 
> StructField(fieldName, StringType, true)))*
>
> *val 
> rowRDD=sc.textFile("/user/spark/short_model").flatMap(_.split("\\t")).map(p 
> => 
> Row(p(0),p(1),p(2),p(3),p(4),p(5),p(6),p(7),p(8),p(9),p(10),p(11),p(12),p(13),p(14),p(15),p(16),p(17),p(18),p(19),p(20),p(21),p(22),p(23),p(24),p(25),p(26),p(27),p(28),p(29),p(30),p(31),p(32),p(33),p(34),p(35),p(36),p(37),p(38),p(39),p(40),p(41),p(42),p(43),p(44),p(45),p(46),p(47),p(48),p(49),p(50),p(51),p(52),p(53),p(54),p(55),p(56),p(57),p(58),p(59),p(60),p(61),p(62),p(63),p(64),p(65),p(66),p(67),p(68),p(69),p(70),p(71)))*
>
> *val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)*
>
> *peopleDataFrame.registerTempTable("alg")*
>
> *val results = sqlContext.sql("SELECT count(*) FROM alg")*
>
> *results.collect()*
>
>
> the error log as following:
>
>               5/08/28 17:00:54 WARN TaskSetManager: Lost task 56.0 in
> stage 9.0 (TID 71, 10.104.74.8): java.lang.StringIndexOutOfBoundsException:
> String index out of range: 18
> at java.lang.String.charAt(String.java:658)
> at
> scala.collection.immutable.StringOps$.apply$extension(StringOps.scala:39)
> at
> $line415.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(<console>:26)
> at
> $line415.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(<console>:26)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at
> org.apache.spark.sql.execution.Aggregate$$anonfun$doExecute$1$$anonfun$6.apply(Aggregate.scala:130)
> at
> org.apache.spark.sql.execution.Aggregate$$anonfun$doExecute$1$$anonfun$6.apply(Aggregate.scala:126)
> at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
> at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:70)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
>
> 15/08/28 17:00:54 INFO TaskSetManager: Starting task 56.1 in stage 9.0
> (TID 72, 10.104.74.8, NODE_LOCAL, 1415 bytes)
> 15/08/28 17:00:54 INFO TaskSetManager: Lost task 56.1 in stage 9.0 (TID
> 72) on executor 10.104.74.8: java.lang.StringIndexOutOfBoundsException
> (String index out of range: 18) [duplicate 1]
> 15/08/28 17:00:54 INFO TaskSetManager: Starting task 56.2 in stage 9.0
> (TID 73, 10.104.74.8, NODE_LOCAL, 1415 bytes)
> 15/08/28 17:00:54 INFO TaskSetManager: Lost task 45.0 in stage 9.0 (TID
> 70) on executor 10.104.74.6: java.lang.StringIndexOutOfBoundsException
> (String index out of range: 18) [duplicate 2]
> 15/08/28 17:00:54 INFO TaskSetManager: Starting task 45.1 in stage 9.0
> (TID 74, 10.104.74.6, NODE_LOCAL, 1415 bytes)
> 15/08/28 17:00:54 INFO TaskSetManager: Lost task 56.2 in stage 9.0 (TID
> 73) on executor 10.104.74.8: java.lang.StringIndexOutOfBoundsException
> (String index out of range: 18) [duplicate 3]
> 15/08/28 17:00:54 INFO TaskSetManager: Starting task 56.3 in stage 9.0
> (TID 75, 10.104.74.8, NODE_LOCAL, 1415 bytes)
> 15/08/28 17:00:54 INFO TaskSetManager: Lost task 56.3 in stage 9.0 (TID
> 75) on executor 10.104.74.8: java.lang.StringIndexOutOfBoundsException
> (String index out of range: 18) [duplicate 4]
> 15/08/28 17:00:54 ERROR TaskSetManager: Task 56 in stage 9.0 failed 4
> times; aborting job
> 15/08/28 17:00:54 INFO TaskSetManager: Lost task 45.1 in stage 9.0 (TID
> 74) on executor 10.104.74.6: java.lang.StringIndexOutOfBoundsException
> (String index out of range: 18) [duplicate 5]
> 15/08/28 17:00:54 INFO TaskSchedulerImpl: Removed TaskSet 9.0, whose tasks
> have all completed, from pool
> 15/08/28 17:00:54 INFO TaskSchedulerImpl: Cancelling stage 9
> 15/08/28 17:00:54 INFO DAGScheduler: ShuffleMapStage 9 (collect at
> <console>:31) failed in 0.206 s
> 15/08/28 17:00:54 INFO DAGScheduler: Job 6 failed: collect at
> <console>:31, took 0.293903 s
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 56
> in stage 9.0 failed 4 times, most recent failure: Lost task 56.3 in stage
> 9.0 (TID 75, 10.104.74.8): java.lang.StringIndexOutOfBoundsException:
> String index out of range: 18
> at java.lang.String.charAt(String.java:658)
> at
> scala.collection.immutable.StringOps$.apply$extension(StringOps.scala:39)
> at
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(<console>:26)
> at
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(<console>:26)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at
> org.apache.spark.sql.execution.Aggregate$$anonfun$doExecute$1$$anonfun$6.apply(Aggregate.scala:130)
> at
> org.apache.spark.sql.execution.Aggregate$$anonfun$doExecute$1$$anonfun$6.apply(Aggregate.scala:126)
> at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
> at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:70)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
>
> Driver stacktrace:
> at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
> at scala.Option.foreach(Option.scala:236)
> at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>
>
>
>
>
>
>
> ------------------------------
> Ricky  Ou(欧   锐)
>
> 部   门:苏宁云商 IT总部技术支撑研发中心大 数据中心数据平台开发部
>
> email  : our...@cnsuning.com <14070...@cnsuning.com>
>
>

Reply via email to