OK, what do you mean by " do your outer for loop in parallel "?
btw this didn't work:
for (String columnName : df.columns()) {
    df= df.withColumn(columnName,
collect_set(col(columnName)).as(columnName));
}


Le dim. 12 févr. 2023 à 20:36, Enrico Minack <i...@enrico.minack.dev> a
écrit :

> That is unfortunate, but 3.4.0 is around the corner, really!
>
> Well, then based on your code, I'd suggest two improvements:
> - cache your dataframe after reading, this way, you don't read the entire
> file for each column
> - do your outer for loop in parallel, then you have N parallel Spark jobs
> (only helps if your Spark cluster is not fully occupied by a single column)
>
> Your withColumn-approach does not work because withColumn expects a column
> as the second argument, but df.select(columnName).distinct() is a DataFrame
> and .col is a column in *that* DataFrame, it is not a column of the
> dataframe that you call withColumn on.
>
> It should read:
>
> Scala:
> df.select(df.columns.map(column => collect_set(col(column)).as(column)):
> _*).show()
>
> Java:
> for (String columnName : df.columns()) {
>     df= df.withColumn(columnName,
> collect_set(col(columnName)).as(columnName));
> }
>
> Then you have a single DataFrame that computes all columns in a single
> Spark job.
>
> But this reads all distinct values into a single partition, which has the
> same downside as collect, so this is as bad as using collect.
>
> Cheers,
> Enrico
>
>
> Am 12.02.23 um 18:05 schrieb sam smith:
>
> @Enrico Minack <enrico-min...@gmx.de> Thanks for "unpivot" but I am using
> version 3.3.0 (you are taking it way too far as usual :) )
> @Sean Owen <sro...@gmail.com> Pls then show me how it can be improved by
> code.
>
> Also, why such an approach (using withColumn() ) doesn't work:
>
> for (String columnName : df.columns()) {
>     df= df.withColumn(columnName,
> df.select(columnName).distinct().col(columnName));
> }
>
> Le sam. 11 févr. 2023 à 13:11, Enrico Minack <i...@enrico.minack.dev> a
> écrit :
>
>> You could do the entire thing in DataFrame world and write the result to
>> disk. All you need is unpivot (to be released in Spark 3.4.0, soon).
>>
>> Note this is Scala but should be straightforward to translate into Java:
>>
>> import org.apache.spark.sql.functions.collect_set
>>
>> val df = Seq((1, 10, 123), (2, 20, 124), (3, 20, 123), (4, 10,
>> 123)).toDF("a", "b", "c")
>>
>> df.unpivot(Array.empty, "column", "value")
>>   .groupBy("column")
>>   .agg(collect_set("value").as("distinct_values"))
>>
>> The unpivot operation turns
>> +---+---+---+
>> |  a|  b|  c|
>> +---+---+---+
>> |  1| 10|123|
>> |  2| 20|124|
>> |  3| 20|123|
>> |  4| 10|123|
>> +---+---+---+
>>
>> into
>>
>> +------+-----+
>> |column|value|
>> +------+-----+
>> |     a|    1|
>> |     b|   10|
>> |     c|  123|
>> |     a|    2|
>> |     b|   20|
>> |     c|  124|
>> |     a|    3|
>> |     b|   20|
>> |     c|  123|
>> |     a|    4|
>> |     b|   10|
>> |     c|  123|
>> +------+-----+
>>
>> The groupBy("column").agg(collect_set("value").as("distinct_values"))
>> collects distinct values per column:
>> +------+---------------+
>>
>> |column|distinct_values|
>> +------+---------------+
>> |     c|     [123, 124]|
>> |     b|       [20, 10]|
>> |     a|   [1, 2, 3, 4]|
>> +------+---------------+
>>
>> Note that unpivot only works if all columns have a "common" type. Then
>> all columns are cast to that common type. If you have incompatible types
>> like Integer and String, you would have to cast them all to String first:
>>
>> import org.apache.spark.sql.types.StringType
>>
>> df.select(df.columns.map(col(_).cast(StringType)): _*).unpivot(...)
>>
>> If you want to preserve the type of the values and have multiple value
>> types, you cannot put everything into a DataFrame with one
>> distinct_values column. You could still have multiple DataFrames, one
>> per data type, and write those, or collect the DataFrame's values into Maps:
>>
>> import scala.collection.immutable
>>
>> import org.apache.spark.sql.DataFrame
>> import org.apache.spark.sql.functions.collect_set
>>
>> // if all you columns have the same type
>> def distinctValuesPerColumnOneType(df: DataFrame): immutable.Map[String,
>> immutable.Seq[Any]] = {
>>   df.unpivot(Array.empty, "column", "value")
>>     .groupBy("column")
>>     .agg(collect_set("value").as("distinct_values"))
>>     .collect()
>>     .map(row => row.getString(0) -> row.getSeq[Any](1).toList)
>>     .toMap
>> }
>>
>>
>> // if your columns have different types
>> def distinctValuesPerColumn(df: DataFrame): immutable.Map[String,
>> immutable.Seq[Any]] = {
>>   df.schema.fields
>>     .groupBy(_.dataType)
>>     .mapValues(_.map(_.name))
>>     .par
>>     .map { case (dataType, columns) => df.select(columns.map(col): _*) }
>>     .map(distinctValuesPerColumnOneType)
>>     .flatten
>>     .toList
>>     .toMap
>> }
>>
>> val df = Seq((1, 10, "one"), (2, 20, "two"), (3, 20, "one"), (4, 10,
>> "one")).toDF("a", "b", "c")
>> distinctValuesPerColumn(df)
>>
>> The result is: (list values are of original type)
>> Map(b -> List(20, 10), a -> List(1, 2, 3, 4), c -> List(one, two))
>>
>> Hope this helps,
>> Enrico
>>
>>
>> Am 10.02.23 um 22:56 schrieb sam smith:
>>
>> Hi Apotolos,
>> Can you suggest a better approach while keeping values within a dataframe?
>>
>> Le ven. 10 févr. 2023 à 22:47, Apostolos N. Papadopoulos <
>> papad...@csd.auth.gr> a écrit :
>>
>>> Dear Sam,
>>>
>>> you are assuming that the data fits in the memory of your local machine.
>>> You are using as a basis a dataframe, which potentially can be very large,
>>> and then you are storing the data in local lists. Keep in mind that that
>>> the number of distinct elements in a column may be very large (depending on
>>> the app). I suggest to work on a solution that assumes that the number of
>>> distinct values is also large. Thus, you should keep your data in
>>> dataframes or RDDs, and store them as csv files, parquet, etc.
>>>
>>> a.p.
>>>
>>>
>>> On 10/2/23 23:40, sam smith wrote:
>>>
>>> I want to get the distinct values of each column in a List (is it good
>>> practice to use List here?), that contains as first element the column
>>> name, and the other element its distinct values so that for a dataset we
>>> get a list of lists, i do it this way (in my opinion no so fast):
>>>
>>> List<List<String>> finalList = new ArrayList<List<String>>();
>>>     Dataset<Row> df = spark.read().format("csv").option("header", 
>>> "true").load("/pathToCSV");
>>>     String[] columnNames = df.columns();
>>>  for (int i=0;i<columnNames.length;i++) {
>>>     List<String> columnList = new ArrayList<String>();
>>>
>>>     columnList.add(columnNames[i]);
>>>
>>>
>>>     List<Row> columnValues = 
>>> df.filter(org.apache.spark.sql.functions.col(columnNames[i]).isNotNull()).select(columnNames[i]).distinct().collectAsList();
>>>     for (int j=0;j<columnValues.size();j++)
>>>         columnList.add(columnValues.get(j).apply(0).toString());
>>>
>>>     finalList.add(columnList);
>>>
>>>
>>> How to improve this?
>>>
>>> Also, can I get the results in JSON format?
>>>
>>> --
>>> Apostolos N. Papadopoulos, Associate Professor
>>> Department of Informatics
>>> Aristotle University of Thessaloniki
>>> Thessaloniki, GREECE
>>> tel: ++0030312310991918
>>> email: papad...@csd.auth.gr
>>> twitter: @papadopoulos_ap
>>> web: http://datalab.csd.auth.gr/~apostol
>>>
>>>
>>
>

Reply via email to