[ https://issues.apache.org/jira/browse/SPARK-2579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14116643#comment-14116643 ]
Chris Fregly commented on SPARK-2579: ------------------------------------- interesting and possibly-related blog post from netflix earlier this year: http://techblog.netflix.com/2014/01/s3mper-consistency-in-cloud.html > Reading from S3 returns an inconsistent number of items with Spark 0.9.1 > ------------------------------------------------------------------------ > > Key: SPARK-2579 > URL: https://issues.apache.org/jira/browse/SPARK-2579 > Project: Spark > Issue Type: Bug > Components: Input/Output > Affects Versions: 0.9.1 > Reporter: Eemil Lagerspetz > Priority: Critical > Labels: hdfs, read, s3, skipping > > I have created a random matrix of 1M rows with 10K items on each row, > semicolon-separated. While reading it with Spark 0.9.1 and doing a count, I > consistently get less than 1M rows, and a different number every time at that > ( !! ). Example below: > head -n 1 tool-generate-random-matrix*log > ==> tool-generate-random-matrix-999158.log <== > Row item counts: 999158 > ==> tool-generate-random-matrix.log <== > Row item counts: 997163 > The data is split into 1000 partitions. When I download it using s3cmd sync, > and run the following AWK on it, I get the correct number of rows in each > partition (1000x1000 = 1M). What is up? > {code:title=checkrows.sh|borderStyle=solid} > for k in part-0* > do > echo $k > awk -F ";" ' > NF != 10000 { > print "Wrong number of items:",NF > } > END { > if (NR != 1000) { > print "Wrong number of rows:",NR > } > }' "$k" > done > {code} > The matrix generation and counting code is below: > {code:title=Matrix.scala|borderStyle=solid} > package fi.helsinki.cs.nodes.matrix > import java.util.Random > import org.apache.spark._ > import org.apache.spark.SparkContext._ > import scala.collection.mutable.ListBuffer > import org.apache.spark.rdd.RDD > import org.apache.spark.storage.StorageLevel._ > object GenerateRandomMatrix { > def NewGeMatrix(rSeed: Int, rdd: RDD[Int], features: Int) = { > rdd.mapPartitions(part => part.map(xarr => { > val rdm = new Random(rSeed + xarr) > val arr = new Array[Double](features) > for (i <- 0 until features) > arr(i) = rdm.nextDouble() > new Row(xarr, arr) > })) > } > case class Row(id: Int, elements: Array[Double]) {} > def rowFromText(line: String) = { > val idarr = line.split(" ") > val arr = idarr(1).split(";") > // -1 to fix saved matrix indexing error > new Row(idarr(0).toInt-1, arr.map(_.toDouble)) > } > def main(args: Array[String]) { > val master = args(0) > val tasks = args(1).toInt > val savePath = args(2) > val read = args.contains("read") > > val datapoints = 1000000 > val features = 10000 > val sc = new SparkContext(master, "RandomMatrix") > if (read) { > val randomMatrix: RDD[Row] = sc.textFile(savePath, > tasks).map(rowFromText).persist(MEMORY_AND_DISK) > println("Row item counts: "+ randomMatrix.count) > } else { > val rdd = sc.parallelize(0 until datapoints, tasks) > val bcSeed = sc.broadcast(128) > /* Generating a matrix of random Doubles */ > val randomMatrix = NewGeMatrix(bcSeed.value, rdd, > features).persist(MEMORY_AND_DISK) > randomMatrix.map(row => row.id + " " + > row.elements.mkString(";")).saveAsTextFile(savePath) > } > > sc.stop > } > } > {code} > I run this with: > appassembler/bin/tool-generate-random-matrix master 1000 > s3n://keys@path/to/data 1>matrix.log 2>matrix.err > Reading from HDFS gives the right count and right number of items on each > row. However, I had to run with the full path with the server name, just > /matrix does not work (it thinks I want file://): > p="hdfs://ec2-54-188-6-77.us-west-2.compute.amazonaws.com:9000/matrix" > appassembler/bin/tool-generate-random-matrix $( cat > /root/spark-ec2/cluster-url ) 1000 "$p" read 1>readmatrix.log 2>readmatrix.err -- This message was sent by Atlassian JIRA (v6.2#6252) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org