>Let me try this on my own cluster and see if I can reproduce. Are you using text files or have you gotten HBase working? How large is the input?

Using HBase as input (it's TFIDF-vector generated by seq2sparse) if I recall correctly about 30k documents with dimensionality of around 500k dimensions. I don't know how to get the table-size from HBase but the TFIDF-Vector hfile is bit over 20 MB


On 13.10.2014 19:11, Pat Ferrel wrote:
But if it doesn’t work in the Spark config it probably won’t work here either. 
There may be something else causing only one task for nongraph ABt.

Dmitriy? Any ideas

Let me try this on my own cluster and see if I can reproduce. Are you using 
text files or have you gotten HBase working? How large is the input?


On Oct 13, 2014, at 10:03 AM, Patrick Ferrel <p...@occamsmachete.com> wrote:

Best guess is, as I said, to put it in RowSimilarityDriver.start like this:
    sparkConf.set("spark.kryo.referenceTracking", "false")
      .set("spark.kryoserializer.buffer.mb", "200")
      .set("spark.executor.memory",
parser.opts("sparkExecutorMem").asInstanceOf[String])
      .set("spark.default.parallelism", 400)

The multiply happens during lazy evaluation and is executed by the blas
optimizer, when the result is required. This means that many failures will
seem to happen in the write since this is the first place where the values
are accessed in a non-lazy manner.

Actually we should probably not be hard coding the
spark.kryoserializer.buffer.mb either.

On Mon, Oct 13, 2014 at 9:54 AM, Reinis Vicups <mah...@orbit-x.de> wrote:

Hello,

When you set the Spark config as below do you still get one task?
Unfortunately yes.

Currently I am looking for the very first shuffle stage in
SimilarityAnalysis#rowSimilarity but cannot find it. There is a lot of
mapping, wrapping and caching during SimilarityAnalysis#sampleDownAndBinarizeand
I don't get where to look for the code of "%*%" in:

// Compute row similarity cooccurrence matrix AA'
val drmAAt = drmA %*% drmA.t

I would like to hard code partition number in that first shuffle just for
the sake of experiment.


On 13.10.2014 18:29, Pat Ferrel wrote:

I see no place where the spark.default.parallelism is set so your config
can be set it to whatever you wish. When you set the Spark config as below
do you still get one task? The test suite sets the
spark.default.parallelism to 10 before the context is initialized. To do
this with the SimilarityAnalysis.rowSimilarity (here I assume you are
modifying the driver) put the  .set("spark.default.parallelism", 400) in
RowSimilarityDriver.start and see if that changes things.

If this doesn’t work it may be that the blas optimizer is doing something
with the value but I’m lost in that code There is only one place the value
is read, which is in Par.scala

         // auto adjustment, try to scale up to either x1Size or x2Size.
         val clusterSize = rdd.context.getConf.get("spark.default.parallelism",
"1").toInt

         val x1Size = (clusterSize * .95).ceil.toInt
         val x2Size = (clusterSize * 1.9).ceil.toInt

         if (rdd.partitions.size <= x1Size)
           rdd.coalesce(numPartitions = x1Size, shuffle = true)
         else if (rdd.partitions.size <= x2Size)
           rdd.coalesce(numPartitions = x2Size, shuffle = true)
         else
           rdd.coalesce(numPartitions = rdd.partitions.size)


Dmitriy can you shed any light on the use of spark.default.parallelism,
how to increase it or how to get more than one task created when performing
ABt?


On Oct 13, 2014, at 8:56 AM, Reinis Vicups <mah...@orbit-x.de> wrote:

Hi,

I am currently testing SimilarityAnalysis.rowSimilarity and I am
wondering, how could I increase number of tasks to use for distributed
shuffle.

What I currently observe, is that SimilarityAnalysis is requiring almost
20 minutes for my dataset only with this stage:

combineByKey at ABt.scala:126

When I view details for the stage I see that only one task is spawned
running on one node.

I have my own implementation of SimilarityAnalysis and by tuning number
of tasks I have reached HUGE performance gains.

Since I couldn't find how to pass the number of tasks to shuffle
operations directly, I have set following in spark config

configuration = new SparkConf().setAppName(jobConfig.jobName)
        .set("spark.serializer", "org.apache.spark.serializer.
KryoSerializer")
        .set("spark.kryo.registrator", "org.apache.mahout.
sparkbindings.io.MahoutKryoRegistrator")
        .set("spark.kryo.referenceTracking", "false")
        .set("spark.kryoserializer.buffer.mb", "200")
        .set("spark.default.parallelism", 400) // <- this is the line
supposed to set default parallelism to some high number

Thank you for your help
reinis



Reply via email to