Github user caneGuy commented on the issue:
https://github.com/apache/spark/pull/19335
å¨spark-user listæé®å§ï¼
http://apache-spark-user-list.1001560.n3.nabble.com/
2017-09-25 11:29 GMT+08:00 listenLearning :
>
æ¨å¥½ï¼æè¿æå¨å¼åçæ¶åéå°ä¸ä¸ªé®é¢ï¼å°±æ¯å¦ææç¨mappartitionsè¿ä¸ªapiå»åå¨æ°æ®å°
>
hbaseï¼ä¼åºç°ä¸ä¸ªæ¾ä¸å°partitionçé误ï¼ç¶åè·çå°±ä¼åºç°ä¸ä¸ªæ¾ä¸å°å¹¿æåéçé误ï¼è¯·é®è¿ä¸ªæ¯ä¸ºä»å¢ï¼ï¼ï¼ä¸ä¸æ¯ä»£ç
以åé误
> def ASpan(span: DataFrame, time: String): Unit = {
> try {
> span.mapPartitions(iter=>{
> iter.map(line => {
> val put = new Put(Bytes.toBytes(CreateRowkey.Bit16(line.getString(0)) +
> "_101301"))
> put.addColumn(Bytes.toBytes("CF"), Bytes.toBytes("CALLDT_TIME1PER_30"),
> Bytes.toBytes(line.getString(1)))
> put.addColumn(Bytes.toBytes("CF"), Bytes.toBytes("CALLDT_TIME2PER_30"),
> Bytes.toBytes(line.getString(2)))
> put.addColumn(Bytes.toBytes("CF"), Bytes.toBytes("CALLDT_TIME3PER_30"),
> Bytes.toBytes(line.getString(3)))
> put.addColumn(Bytes.toBytes("CF"), Bytes.toBytes("CALLDT_TIME4PER_30"),
> Bytes.toBytes(line.getString(4)))
> put.addColumn(Bytes.toBytes("CF"), Bytes.toBytes("CALLDT_HASCALL_1"),
> Bytes.toBytes(line.getLong(5).toString))
> put.addColumn(Bytes.toBytes("CF"), Bytes.toBytes("CALLDT_HASCALL_3"),
> Bytes.toBytes(line.getLong(6).toString))
> put.addColumn(Bytes.toBytes("CF"), Bytes.toBytes("CALLDT_HASCALL_6"),
> Bytes.toBytes(line.getLong(7).toString))
> put.addColumn(Bytes.toBytes("CF"), Bytes.toBytes("CALLDT_NOCALL_1"),
> Bytes.toBytes(line.getLong(8).toString))
> put.addColumn(Bytes.toBytes("CF"), Bytes.toBytes("CALLDT_NOCALL_3"),
> Bytes.toBytes(line.getLong(9).toString))
> put.addColumn(Bytes.toBytes("CF"), Bytes.toBytes("CALLDT_NOCALL_6"),
> Bytes.toBytes(line.getLong(10).toString))
> put.addColumn(Bytes.toBytes("CF"), Bytes.toBytes("DB_TIME"),
> Bytes.toBytes(time))
> (new ImmutableBytesWritable, put)
> })
> }).saveAsNewAPIHadoopDataset(shuliStreaming.indexTable)
> } catch {
> case e: Exception =>
> shuliStreaming.WriteIn.writeLog("shuli", time, "éé»æ&è¿å
ææ¯å¦éè¯å¨é误", e)
> e.printStackTrace()
> println("éé»æ&è¿å ææ¯å¦éè¯å¨é误" + e)
> }
> }
> errorï¼
> 17/09/24 23:04:17 INFO spark.CacheManager: Partition rdd_11_1 not found,
> computing it
> 17/09/24 23:04:17 INFO rdd.HadoopRDD: Input split:
> hdfs://nameservice1/data/input/common/phlibrary/OFFLINEPHONELIBRARY.dat:
> 1146925+1146926
> 17/09/24 23:04:17 INFO broadcast.TorrentBroadcast: Started reading
> broadcast variable 1
> 17/09/24 23:04:17 ERROR executor.Executor: Exception in task 1.0 in stage
> 250804.0 (TID 3190467)
> java.io.IOException: org.apache.spark.SparkException: Failed to get
> broadcast_1_piece0 of broadcast_1
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1223)
> at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(
> TorrentBroadcast.scala:165)
> at org.apache.spark.broadcast.TorrentBroadcast._value$
> lzycompute(TorrentBroadcast.scala:64)
> at org.apache.spark.broadcast.TorrentBroadcast._value(
> TorrentBroadcast.scala:64)
> at org.apache.spark.broadcast.TorrentBroadcast.getValue(
> TorrentBroadcast.scala:88)
> at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
> at org.apache.spark.rdd.HadoopRDD.getJobConf(HadoopRDD.scala:144)
> at org.apache.spark.rdd.HadoopRDD$$anon$1.(HadoopRDD.scala:212)
> at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:208)
> at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:101)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> --
> You can view, comment on, or merge this pull request online at:
>
> https://github.com/apache/spark/pull/19335
> Commit Summary
>
>- [SPARK-13969][ML] Add FeatureHasher transformer
>- [SPARK-21656][CORE] spark dynamic allocation should not idle timeout
>executors when tasks still to run
>- [SPARK-21603][SQL] The wholestage codegen will be much slower then
>that is closed when the function is too long
>- [SPARK-21738] Thriftserver doesn't cancel jobs when session is closed
>- [SPARK-21680][ML][MLLIB] optimize Vector compress
>-