I think you might need to turn codegen on also in order for the unsafe stuff to work.
On Thu, Aug 20, 2015 at 4:09 PM, Ulanov, Alexander <alexander.ula...@hp.com> wrote: > Hi Reynold, > > Thank you for suggestion. This code takes around 30 sec on my setup (5 > workers with 32GB). My issue is that I don't see the change in time if I > unset the unsafe flags. Could you explain why it might happen? > > 20 авг. 2015 г., в 15:32, Reynold Xin <r...@databricks.com<mailto: > r...@databricks.com>> написал(а): > > I didn't wait long enough earlier. Actually it did finish when I raised > memory to 8g. > > In 1.5 with Tungsten (which should be the same as 1.4 with your unsafe > flags), the query took 40s with 4G of mem. > > In 1.4, it took 195s with 8G of mem. > > This is not a scientific benchmark and I only ran it once. > > > > On Thu, Aug 20, 2015 at 3:22 PM, Reynold Xin <r...@databricks.com<mailto: > r...@databricks.com>> wrote: > How did you run this? I couldn't run your query with 4G of RAM in 1.4, but > in 1.5 it ran. > > Also I recommend just dumping the data to parquet on disk to evaluate, > rather than using the in-memory cache, which is super slow and we are > thinking of removing/replacing with something else. > > > val size = 100000000 > val partitions = 10 > val repetitions = 5 > val data = sc.parallelize(1 to size, partitions).map(x => > (util.Random.nextInt(size / repetitions), > util.Random.nextDouble)).toDF("key", "value") > > data.write.parquet("/scratch/rxin/tmp/alex") > > > val df = sqlContext.read.parquet("/scratch/rxin/tmp/alex") > val t = System.nanoTime() > val res = df.groupBy("key").agg(sum("value")) > res.count() > println((System.nanoTime() - t) / 1e9) > > > > On Thu, Aug 20, 2015 at 2:57 PM, Ulanov, Alexander < > alexander.ula...@hp.com<mailto:alexander.ula...@hp.com>> wrote: > Dear Spark developers, > > I am trying to benchmark the new Dataframe aggregation implemented under > the project Tungsten and released with Spark 1.4 (I am using the latest > Spark from the repo, i.e. 1.5): > https://github.com/apache/spark/pull/5725 > It tells that the aggregation should be faster due to using the unsafe to > allocate memory and in-place update. It was also presented on Spark Summit > this Summer: > > http://www.slideshare.net/SparkSummit/deep-dive-into-project-tungsten-josh-rosen > The following enables the new aggregation in spark-config: > spark.sql.unsafe.enabled=true > spark.unsafe.offHeap=true > > I wrote a simple code that does aggregation of values by keys. However, > the time needed to execute the code does not depend if the new aggregation > is on or off. Could you suggest how can I observe the improvement that the > aggregation provides? Could you write a code snippet that takes advantage > of the new aggregation? > > case class Counter(key: Int, value: Double) > val size = 100000000 > val partitions = 5 > val repetitions = 5 > val data = sc.parallelize(1 to size, partitions).map(x => > Counter(util.Random.nextInt(size / repetitions), util.Random.nextDouble)) > val df = sqlContext.createDataFrame(data) > df.persist() > df.count() > val t = System.nanoTime() > val res = df.groupBy("key").agg(sum("value")) > res.count() > println((System.nanoTime() - t) / 1e9) > > > Best regards, Alexander > > >