k-anonymity with Spark in Java

2022-05-28 Thread marc nicole
Hi Spark devs,

Anybody willing to check my code implementing *k-anonymity*?


public static Dataset < Row > kAnonymizeBySuppression(SparkSession
sparksession, Dataset < Row > initDataset, List < String > qidAtts, Integer
k_anonymity_constant) {

Dataset < Row > anonymizedDF = sparksession.emptyDataFrame();

Dataset < Row > tmpDF = sparksession.emptyDataFrame();
List < Column > groupByQidAttributes = qidAtts.stream().map(functions::
col).collect(Collectors.toList());

// groupBy and count each occurence.
Dataset < Row > groupedRowsDF = initDataset.withColumn("qidsFreqs",
count("*").over(Window.partitionBy(groupByQidAttributes.toArray(new Column[
groupByQidAttributes.size()];
Dataset < Row > rowsDeleteDF =
groupedRowsDF.select(col("*")).where("qidsFreqs
<" + k_anonymity_constant).toDF();
tmpDF = groupedRowsDF.select(col("*")).where("qidsFreqs >=" +
k_anonymity_constant).toDF();


for (String qidAtt: qidAtts) {
Dataset < Row > groupedRowsProcDF = rowsDeleteDF.withColumn(
"attFreq", approx_count_distinct(qidAtt).over(Window.partitionBy(
groupByQidAttributes.toArray(new Column[groupByQidAttributes.size()];

Dataset < Row > rowsDeleteDFUpdate = groupedRowsProcDF.select(col(
"*")).where("attFreq <" + k_anonymity_constant).toDF();

if (anonymizedDF.count() == 0)
anonymizedDF = rowsDeleteDFUpdate;
if (rowsDeleteDF.count() != 0) {
anonymizedDF = anonymizedDF.drop("attFreq").withColumn(qidAtt,
lit("*"));


}
}


return tmpDF.drop("qidsFreqs").union(anonymizedDF.drop("qidsFreqs"));
}



Thanks in advance for your improving comments.

Marc.


Re: Spark Push-Based Shuffle causing multiple stage failures

2022-05-28 Thread Ye Zhou
Hi, Han.
The configuration for External Shuffle Service(ESS) in YARN has to be
configured in yarn-site.xml for NodeManagers, as it is an auxiliary service
in NodeManager. We will try to improve the documentation for enabling push
based shuffle. Thanks for the feedback.

For the straggler issue, is the shuffle data skew heavily? The 20x long
running task is shuffle data generation or shuffle data reading? If
applicable, you can share the application log file through some way, we can
take a look through a local History Server.

Thanks.
Ye.

On Wed, May 25, 2022 at 7:15 PM Han Altae-Tran  wrote:

> Hi Ye,
>
> This is super super helpful! It wasn't obvious to me from the
> documentation that this property needed to be set in the yarn-site.xml
> file, as all other configurations in the main spark configuration page are
> set through spark conf. It was particularly confusing because this
> property, like most other spark related properties also begins with spark
> (i.e. spark.shuffle as opposed to yarn.spark.shuffle). Perhaps it would be
> useful to add some extra documentation explaining this there? In either
> case, thank you! I will test out the push-based service more thoroughly.
> From my initial tests, I can see the median task time finishes faster
> during the reduce stage when using pull-based shuffle, but unlike with the
> standard shuffle, there was an oddly large tail end with the max length
> tasks taking up to 20x longer. Is there a suggestion for what may be
> causing this and if there is a way to reduce this? It is somewhat odd as
> the pull-based shuffle also has the capability of reading data from the
> original blocks, so for some reason the choice of reading from merged
> pull-blocks appears to be slower perhaps. I am planning on running more
> systematics tests to understand these behaviors further. Thank you!
>
> Best,
> Han
>
> On Tue, May 24, 2022 at 8:07 PM Ye Zhou  wrote:
>
>> Hi, Han.
>> Thanks for trying out the push based shuffle.
>> Please make sure you configure both the Spark client side configuration
>> and server side configurations.
>> The client side configuration looks good, and from the error message,
>> looks like you are missing the server side configurations.
>> Please refer to this blog post about how to try out push based shuffle.
>> https://engineering.linkedin.com/blog/2021/push-based-shuffle-in-apache-spark.
>> And also this documentation about how to configure properly for External
>> shuffle service in YARN environment.
>> https://spark.apache.org/docs/latest/running-on-yarn.html#configuring-the-external-shuffle-service
>> In our environment, we added this parameter to enable server side push
>> based shuffle in yarn-site.xml for NodeManager configurations:
>>   
>> spark.shuffle.push.server.mergedShuffleFileManagerImpl
>>
>> org.apache.spark.network.shuffle.RemoteBlockPushResolver
>>   
>>
>> On Tue, May 24, 2022 at 3:30 PM Mridul Muralidharan 
>> wrote:
>>
>>> +CC zhouye...@gmail.com
>>>
>>>
>>> On Mon, May 23, 2022 at 7:11 AM Han Altae-Tran  wrote:
>>>
 Hi,

 First of all, I am very thankful for all of the amazing work that goes
 into this project! It has opened up so many doors for me! I am a long
 time Spark user, and was very excited to start working with the push-based
 shuffle service for an academic paper we are working on, but I encountered
 some difficulties along the way and am wondering if someone could help me
 resolve this new feature. I was able to get the push-based shuffle running
 on my yarn setup (I am using Dataproc but I added an additional spark 3.2
 installation on top of the dataproc base installations using a custom
 image, and then removed the old 3.1.2 spark shuffle yarn jar and replaced
 it with the new one for spark 3.2), however the main issue is that when I
 actually try to use spark shuffles using the push-based shuffle, I
 consistently encounter errors of the following sort:

 22/05/23 05:45:01 WARN org.apache.spark.scheduler.TaskSetManager: Lost
 task 163.0 in stage 3.1 (TID 16729)
 (cluster-fast-w-0.c.altaeth-biolux.internal executor 1):
 FetchFailed(BlockManagerId(2, cluster-fast-w-1.c.altaeth-biolux.internal,
 7337, None), shuffleId=0, mapIndex=171, mapId=11287, reduceId=808, message=
 org.apache.spark.shuffle.FetchFailedException
 at
 org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:1167)
 at
 org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:903)
 at
 org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:84)
 at
 org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29)
 at
 scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
 at
 scala.collection.Iterator$$anon$11.hasNext(Iter