[GitHub] spark issue #19335: mapPartitions Api

2017-09-25 Thread caneGuy
Github user caneGuy commented on the issue:

https://github.com/apache/spark/pull/19335
  
在spark-user list提问吧:
http://apache-spark-user-list.1001560.n3.nabble.com/

2017-09-25 11:29 GMT+08:00 listenLearning :

> 
您好,最近我在开发的时候遇到一个问题,就是如果我用mappartitions这个api去存储数据到
> 
hbase,会出现一个找不到partition的错误,然后跟着就会出现一个找不到广播变量的错误,请问这个是为什呢???一下是代ç
 ä»¥åŠé”™è¯¯
> def ASpan(span: DataFrame, time: String): Unit = {
> try {
> span.mapPartitions(iter=>{
> iter.map(line => {
> val put = new Put(Bytes.toBytes(CreateRowkey.Bit16(line.getString(0)) +
> "_101301"))
> put.addColumn(Bytes.toBytes("CF"), Bytes.toBytes("CALLDT_TIME1PER_30"),
> Bytes.toBytes(line.getString(1)))
> put.addColumn(Bytes.toBytes("CF"), Bytes.toBytes("CALLDT_TIME2PER_30"),
> Bytes.toBytes(line.getString(2)))
> put.addColumn(Bytes.toBytes("CF"), Bytes.toBytes("CALLDT_TIME3PER_30"),
> Bytes.toBytes(line.getString(3)))
> put.addColumn(Bytes.toBytes("CF"), Bytes.toBytes("CALLDT_TIME4PER_30"),
> Bytes.toBytes(line.getString(4)))
> put.addColumn(Bytes.toBytes("CF"), Bytes.toBytes("CALLDT_HASCALL_1"),
> Bytes.toBytes(line.getLong(5).toString))
> put.addColumn(Bytes.toBytes("CF"), Bytes.toBytes("CALLDT_HASCALL_3"),
> Bytes.toBytes(line.getLong(6).toString))
> put.addColumn(Bytes.toBytes("CF"), Bytes.toBytes("CALLDT_HASCALL_6"),
> Bytes.toBytes(line.getLong(7).toString))
> put.addColumn(Bytes.toBytes("CF"), Bytes.toBytes("CALLDT_NOCALL_1"),
> Bytes.toBytes(line.getLong(8).toString))
> put.addColumn(Bytes.toBytes("CF"), Bytes.toBytes("CALLDT_NOCALL_3"),
> Bytes.toBytes(line.getLong(9).toString))
> put.addColumn(Bytes.toBytes("CF"), Bytes.toBytes("CALLDT_NOCALL_6"),
> Bytes.toBytes(line.getLong(10).toString))
> put.addColumn(Bytes.toBytes("CF"), Bytes.toBytes("DB_TIME"),
> Bytes.toBytes(time))
> (new ImmutableBytesWritable, put)
> })
> }).saveAsNewAPIHadoopDataset(shuliStreaming.indexTable)
> } catch {
> case e: Exception =>
> shuliStreaming.WriteIn.writeLog("shuli", time, "静默期&近几
月是否通话储错误", e)
> e.printStackTrace()
> println("静默期&近几月是否通话储错误" + e)
> }
> }
> error:
> 17/09/24 23:04:17 INFO spark.CacheManager: Partition rdd_11_1 not found,
> computing it
> 17/09/24 23:04:17 INFO rdd.HadoopRDD: Input split:
> hdfs://nameservice1/data/input/common/phlibrary/OFFLINEPHONELIBRARY.dat:
> 1146925+1146926
> 17/09/24 23:04:17 INFO broadcast.TorrentBroadcast: Started reading
> broadcast variable 1
> 17/09/24 23:04:17 ERROR executor.Executor: Exception in task 1.0 in stage
> 250804.0 (TID 3190467)
> java.io.IOException: org.apache.spark.SparkException: Failed to get
> broadcast_1_piece0 of broadcast_1
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1223)
> at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(
> TorrentBroadcast.scala:165)
> at org.apache.spark.broadcast.TorrentBroadcast._value$
> lzycompute(TorrentBroadcast.scala:64)
> at org.apache.spark.broadcast.TorrentBroadcast._value(
> TorrentBroadcast.scala:64)
> at org.apache.spark.broadcast.TorrentBroadcast.getValue(
> TorrentBroadcast.scala:88)
> at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
> at org.apache.spark.rdd.HadoopRDD.getJobConf(HadoopRDD.scala:144)
> at org.apache.spark.rdd.HadoopRDD$$anon$1.(HadoopRDD.scala:212)
> at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:208)
> at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:101)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> --
> You can view, comment on, or merge this pull request online at:
>
>   https://github.com/apache/spark/pull/19335
> Commit Summary
>
>- [SPARK-13969][ML] Add FeatureHasher transformer
>- [SPARK-21656][CORE] spark dynamic allocation should not idle timeout
>executors when tasks still to run
>- [SPARK-21603][SQL] The wholestage codegen will be much slower then
>that is closed when the function is too long
>- [SPARK-21738] Thriftserver doesn't cancel jobs when session is closed
>- [SPARK-21680][ML][MLLIB] optimize Vector compress
>- 

[GitHub] spark issue #19335: mapPartitions Api

2017-09-24 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/19335
  
ping @listenLearning!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19335: mapPartitions Api

2017-09-24 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/19335
  
@listenLearning, If you'd like to ask a question, please ask this to the 
mailing list (see https://spark.apache.org/community.html). 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19335: mapPartitions Api

2017-09-24 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19335
  
Can one of the admins verify this patch?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19335: mapPartitions Api

2017-09-24 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/19335
  
@listenLearning Close this please.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org