[jira] [Comment Edited] (SPARK-3098) In some cases, operation zipWithIndex get a wrong results

2014-08-20 Thread Guoqiang Li (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14103985#comment-14103985
 ] 

Guoqiang Li edited comment on SPARK-3098 at 8/20/14 3:20 PM:
-

To solve this bug.  we may have to re-implement {{BasicBlockFetcherIterator}}


was (Author: gq):
To solve this bug. Possible to re-implement {{BasicBlockFetcherIterator}}

  In some cases, operation zipWithIndex get a wrong results
 --

 Key: SPARK-3098
 URL: https://issues.apache.org/jira/browse/SPARK-3098
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.0.1
Reporter: Guoqiang Li
Priority: Critical

 The reproduce code:
 {code}
  val c = sc.parallelize(1 to 7899).flatMap { i =
   (1 to 1).toSeq.map(p = i * 6000 + p)
 }.distinct().zipWithIndex() 
 c.join(c).filter(t = t._2._1 != t._2._2).take(3)
 {code}
  = 
 {code}
  Array[(Int, (Long, Long))] = Array((1732608,(11,12)), (45515264,(12,13)), 
 (36579712,(13,14)))
 {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-3098) In some cases, operation zipWithIndex get a wrong results

2014-08-20 Thread Guoqiang Li (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14103985#comment-14103985
 ] 

Guoqiang Li edited comment on SPARK-3098 at 8/20/14 3:21 PM:
-

To solve this bug.  we may have to re-implement {{BasicBlockFetcherIterator}} 
or {{ZippedWithIndexRDD}}


was (Author: gq):
To solve this bug.  we may have to re-implement {{BasicBlockFetcherIterator}}

  In some cases, operation zipWithIndex get a wrong results
 --

 Key: SPARK-3098
 URL: https://issues.apache.org/jira/browse/SPARK-3098
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.0.1
Reporter: Guoqiang Li
Priority: Critical

 The reproduce code:
 {code}
  val c = sc.parallelize(1 to 7899).flatMap { i =
   (1 to 1).toSeq.map(p = i * 6000 + p)
 }.distinct().zipWithIndex() 
 c.join(c).filter(t = t._2._1 != t._2._2).take(3)
 {code}
  = 
 {code}
  Array[(Int, (Long, Long))] = Array((1732608,(11,12)), (45515264,(12,13)), 
 (36579712,(13,14)))
 {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-3098) In some cases, operation zipWithIndex get a wrong results

2014-08-19 Thread Guoqiang Li (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14102024#comment-14102024
 ] 

Guoqiang Li edited comment on SPARK-3098 at 8/19/14 8:55 AM:
-

the (id, value) pairs are generated there zipWithIndex.
The reproduce code:
{code}
   val c = sc.parallelize(1 to 7899).flatMap { i =
  (1 to 1).toSeq.map(p = i * 6000 + p)
}.distinct().zipWithIndex()
val e = c.map(t = (t._1, t._2.toString))
val d = c.filter(t = t._1 % 100  5)
e.join(d).filter(t = t._2._1 != t._2._2.toString).take(3)
{code}


was (Author: gq):
the (id, value) pairs are generated there zipWithIndex.
Reproduce the code:
{code}
   val c = sc.parallelize(1 to 7899).flatMap { i =
  (1 to 1).toSeq.map(p = i * 6000 + p)
}.distinct().zipWithIndex()
val e = c.map(t = (t._1, t._2.toString))
val d = c.filter(t = t._1 % 100  5)
e.join(d).filter(t = t._2._1 != t._2._2.toString).take(3)
{code}

  In some cases, operation zipWithIndex get a wrong results
 --

 Key: SPARK-3098
 URL: https://issues.apache.org/jira/browse/SPARK-3098
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.0.1
Reporter: Guoqiang Li
Priority: Critical

 I do not know how to reproduce the bug.
 This is the case. When I was in operating 10 billion data by groupByKey. the 
 results error:
 {noformat}
 (4696501, 370568)
 (4696501, 376672)
 (4696501, 374880)
 .
 (4696502, 350264)
 (4696502, 358458)
 (4696502, 398502)
 ..
 {noformat} 
 = 
 {noformat}
 (4696501,ArrayBuffer(350264, 358458, 398502 )), 
 (4696502,ArrayBuffer(376621, ..))
 {noformat}
 code :
 {code}
 val dealOuts = clickPreferences(sc, dealOutPath, periodTime)
 val dealOrders = orderPreferences(sc, dealOrderPath, periodTime)
 val favorites = favoritePreferences(sc, favoritePath, periodTime)
 val allBehaviors = (dealOrders ++ favorites ++ dealOuts)
 val peferences= allBehaviors.groupByKey().map { ... } 
 {code}
 spark-defaults.conf:
 {code}
 spark.default.parallelism280
 {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-3098) In some cases, operation zipWithIndex get a wrong results

2014-08-19 Thread Guoqiang Li (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14102024#comment-14102024
 ] 

Guoqiang Li edited comment on SPARK-3098 at 8/19/14 8:58 AM:
-

the (id, value) pairs are generated by zipWithIndex.
The reproduce code:
{code}
   val c = sc.parallelize(1 to 7899).flatMap { i =
  (1 to 1).toSeq.map(p = i * 6000 + p)
}.distinct().zipWithIndex()
val e = c.map(t = (t._1, t._2.toString))
val d = c.filter(t = t._1 % 100  5)
e.join(d).filter(t = t._2._1 != t._2._2.toString).take(3)
{code}


was (Author: gq):
the (id, value) pairs are generated there zipWithIndex.
The reproduce code:
{code}
   val c = sc.parallelize(1 to 7899).flatMap { i =
  (1 to 1).toSeq.map(p = i * 6000 + p)
}.distinct().zipWithIndex()
val e = c.map(t = (t._1, t._2.toString))
val d = c.filter(t = t._1 % 100  5)
e.join(d).filter(t = t._2._1 != t._2._2.toString).take(3)
{code}

  In some cases, operation zipWithIndex get a wrong results
 --

 Key: SPARK-3098
 URL: https://issues.apache.org/jira/browse/SPARK-3098
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.0.1
Reporter: Guoqiang Li
Priority: Critical

 I do not know how to reproduce the bug.
 This is the case. When I was in operating 10 billion data by groupByKey. the 
 results error:
 {noformat}
 (4696501, 370568)
 (4696501, 376672)
 (4696501, 374880)
 .
 (4696502, 350264)
 (4696502, 358458)
 (4696502, 398502)
 ..
 {noformat} 
 = 
 {noformat}
 (4696501,ArrayBuffer(350264, 358458, 398502 )), 
 (4696502,ArrayBuffer(376621, ..))
 {noformat}
 code :
 {code}
 val dealOuts = clickPreferences(sc, dealOutPath, periodTime)
 val dealOrders = orderPreferences(sc, dealOrderPath, periodTime)
 val favorites = favoritePreferences(sc, favoritePath, periodTime)
 val allBehaviors = (dealOrders ++ favorites ++ dealOuts)
 val peferences= allBehaviors.groupByKey().map { ... } 
 {code}
 spark-defaults.conf:
 {code}
 spark.default.parallelism280
 {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-3098) In some cases, operation zipWithIndex get a wrong results

2014-08-19 Thread Guoqiang Li (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14102064#comment-14102064
 ] 

Guoqiang Li edited comment on SPARK-3098 at 8/19/14 9:41 AM:
-

We cluster on yarn.
You can try the following code in cluster mode
{code}
 val c = sc.parallelize(1 to 7899).flatMap { i =
  (1 to 1).toSeq.map(p = i * 6000 + p)
}.distinct().zipWithIndex() 
c.join(c).filter(t = t._2._1 != t._2._2).take(3)
{code}
 = 
{code}
 Array[(Int, (Long, Long))] = Array((1732608,(11,12)), (45515264,(12,13)), 
(36579712,(13,14)))
{code}


was (Author: gq):
We cluster on yarn.
You can try the following code in cluster mode
{code}
 val c = sc.parallelize(1 to 7899).flatMap { i =
  (1 to 1).toSeq.map(p = i * 6000 + p)
}.distinct().zipWithIndex()
val e = c.map(t = (t._1, t._2))
e.join(e).filter(t = t._2._1 != t._2._2).take(3)
{code}
 = 
{code}
 Array[(Int, (Long, Long))] = Array((1732608,(11,12)), (45515264,(12,13)), 
(36579712,(13,14)))
{code}

  In some cases, operation zipWithIndex get a wrong results
 --

 Key: SPARK-3098
 URL: https://issues.apache.org/jira/browse/SPARK-3098
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.0.1
Reporter: Guoqiang Li
Priority: Critical

 I do not know how to reproduce the bug.
 This is the case. When I was in operating 10 billion data by groupByKey. the 
 results error:
 {noformat}
 (4696501, 370568)
 (4696501, 376672)
 (4696501, 374880)
 .
 (4696502, 350264)
 (4696502, 358458)
 (4696502, 398502)
 ..
 {noformat} 
 = 
 {noformat}
 (4696501,ArrayBuffer(350264, 358458, 398502 )), 
 (4696502,ArrayBuffer(376621, ..))
 {noformat}
 code :
 {code}
 val dealOuts = clickPreferences(sc, dealOutPath, periodTime)
 val dealOrders = orderPreferences(sc, dealOrderPath, periodTime)
 val favorites = favoritePreferences(sc, favoritePath, periodTime)
 val allBehaviors = (dealOrders ++ favorites ++ dealOuts)
 val peferences= allBehaviors.groupByKey().map { ... } 
 {code}
 spark-defaults.conf:
 {code}
 spark.default.parallelism280
 {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-3098) In some cases, operation zipWithIndex get a wrong results

2014-08-19 Thread Guoqiang Li (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14102279#comment-14102279
 ] 

Guoqiang Li edited comment on SPARK-3098 at 8/19/14 3:02 PM:
-

this issue caused by the code: 

[BlockFetcherIterator.scala#L221|https://github.com/apache/spark/blob/v1.0.1/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala#L221]
 
{noformat}fetchRequests ++= Utils.randomize(remoteRequests){noformat}
 
=[ShuffledRDD.scala#L65|https://github.com/apache/spark/blob/v1.0.1/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala#L65]{noformat}SparkEnv.get.shuffleFetcher.fetch[P](shuffledId,
 split.index, context, ser){noformat}
=
[PairRDDFunctions.scala#L100|https://github.com/apache/spark/blob/v1.0.1/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L100]
{noformat}
 val partitioned = new ShuffledRDD[K, C, (K, C)](combined, partitioner)
.setSerializer(serializer)
  partitioned.mapPartitionsWithContext((context, iter) = {
new InterruptibleIterator(context, 
aggregator.combineCombinersByKey(iter, context))
  }, preservesPartitioning = true)
{noformat}
=
[PairRDDFunctions.scala#L163|https://github.com/apache/spark/blob/v1.0.1/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L163]
{noformat}
  def reduceByKey(partitioner: Partitioner, func: (V, V) = V): RDD[(K, V)] = {
combineByKey[V]((v: V) = v, func, func, partitioner)
  }
{noformat}
=

[RDD.scala#L288|https://github.com/apache/spark/blob/v1.0.1/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L288]

{noformat}
  def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] =
map(x = (x, null)).reduceByKey((x, y) = x, numPartitions).map(_._1)

{noformat}


was (Author: gq):
this issue caused by the code: 
[BlockFetcherIterator.scala#L221|https://github.com/apache/spark/blob/v1.0.1/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala#L221]
 
{noformat}fetchRequests ++= Utils.randomize(remoteRequests){noformat}
 
=[ShuffledRDD.scala#L65|https://github.com/apache/spark/blob/v1.0.1/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala#L65]{noformat}SparkEnv.get.shuffleFetcher.fetch[P](shuffledId,
 split.index, context, ser){noformat}
=
[PairRDDFunctions.scala#L100|https://github.com/apache/spark/blob/v1.0.1/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L100]
{noformat}
 val partitioned = new ShuffledRDD[K, C, (K, C)](combined, partitioner)
.setSerializer(serializer)
  partitioned.mapPartitionsWithContext((context, iter) = {
new InterruptibleIterator(context, 
aggregator.combineCombinersByKey(iter, context))
  }, preservesPartitioning = true)
{noformat}
=
[PairRDDFunctions.scala#L163|https://github.com/apache/spark/blob/v1.0.1/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L163]
{noformat}
  def reduceByKey(partitioner: Partitioner, func: (V, V) = V): RDD[(K, V)] = {
combineByKey[V]((v: V) = v, func, func, partitioner)
  }
{noformat}
=

[RDD.scala#L288|https://github.com/apache/spark/blob/v1.0.1/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L288]

{noformat}
  def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] =
map(x = (x, null)).reduceByKey((x, y) = x, numPartitions).map(_._1)

{noformat}

  In some cases, operation zipWithIndex get a wrong results
 --

 Key: SPARK-3098
 URL: https://issues.apache.org/jira/browse/SPARK-3098
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.0.1
Reporter: Guoqiang Li
Priority: Critical

 The reproduce code:
 {code}
  val c = sc.parallelize(1 to 7899).flatMap { i =
   (1 to 1).toSeq.map(p = i * 6000 + p)
 }.distinct().zipWithIndex() 
 c.join(c).filter(t = t._2._1 != t._2._2).take(3)
 {code}
  = 
 {code}
  Array[(Int, (Long, Long))] = Array((1732608,(11,12)), (45515264,(12,13)), 
 (36579712,(13,14)))
 {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org