Re: how to add a column for percent

2022-05-23 Thread Raghavendra Ganesh
withColumn takes a column as the second argument, not string.
If you want formatting before show() you can use the round() function.
--
Raghavendra


On Mon, May 23, 2022 at 11:35 AM wilson  wrote:

> hello
>
> how to add a column for percent for the current row of counted data?
>
> scala>
> df2.groupBy("_c1").count.withColumn("percent",f"${col(count)/df2.count}%.2f").show
>
> :30: error: type mismatch;
>
>
> This doesn't work.
>
> so please help. thanks.
>
>
>
>


Spark Push-Based Shuffle causing multiple stage failures

2022-05-23 Thread Han Altae-Tran
Hi,

First of all, I am very thankful for all of the amazing work that goes into
this project! It has opened up so many doors for me! I am a long time Spark
user, and was very excited to start working with the push-based shuffle
service for an academic paper we are working on, but I encountered some
difficulties along the way and am wondering if someone could help me
resolve this new feature. I was able to get the push-based shuffle running
on my yarn setup (I am using Dataproc but I added an additional spark 3.2
installation on top of the dataproc base installations using a custom
image, and then removed the old 3.1.2 spark shuffle yarn jar and replaced
it with the new one for spark 3.2), however the main issue is that when I
actually try to use spark shuffles using the push-based shuffle, I
consistently encounter errors of the following sort:

22/05/23 05:45:01 WARN org.apache.spark.scheduler.TaskSetManager: Lost task
163.0 in stage 3.1 (TID 16729) (cluster-fast-w-0.c.altaeth-biolux.internal
executor 1): FetchFailed(BlockManagerId(2,
cluster-fast-w-1.c.altaeth-biolux.internal, 7337, None), shuffleId=0,
mapIndex=171, mapId=11287, reduceId=808, message=
org.apache.spark.shuffle.FetchFailedException
at
org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:1167)
at
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:903)
at
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:84)
at
org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.agg_doAggregateWithKeys_0$(Unknown
Source)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.agg_doAggregateWithoutKey_0$(Unknown
Source)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown
Source)
at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
at
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.io.IOException: Failed to send RPC
StreamChunkId[streamId=1514743314249,chunkIndex=59] to
cluster-fast-w-1.c.altaeth-biolux.internal/10.128.0.39:7337:
java.io.IOException: Connection reset by peer
at
org.apache.spark.network.client.TransportClient$1.handleFailure(TransportClient.java:146)
at
org.apache.spark.network.client.TransportClient$StdChannelListener.operationComplete(TransportClient.java:369)
at
io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)
at
io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:552)
at
io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491)
at
io.netty.util.concurrent.DefaultPromise.addListener(DefaultPromise.java:184)
at
io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:95)
at
io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:30)
at
org.apache.spark.network.client.TransportClient.fetchChunk(TransportClient.java:151)
at
org.apache.spark.network.shuffle.OneForOneBlockFetcher$1.onSuccess(OneForOneBlockFetcher.java:297)
at
org.apache.spark.network.client.Tr