OK, what do you mean by " do your outer for loop in parallel "? btw this didn't work: for (String columnName : df.columns()) { df= df.withColumn(columnName, collect_set(col(columnName)).as(columnName)); }
Le dim. 12 févr. 2023 à 20:36, Enrico Minack <i...@enrico.minack.dev> a écrit : > That is unfortunate, but 3.4.0 is around the corner, really! > > Well, then based on your code, I'd suggest two improvements: > - cache your dataframe after reading, this way, you don't read the entire > file for each column > - do your outer for loop in parallel, then you have N parallel Spark jobs > (only helps if your Spark cluster is not fully occupied by a single column) > > Your withColumn-approach does not work because withColumn expects a column > as the second argument, but df.select(columnName).distinct() is a DataFrame > and .col is a column in *that* DataFrame, it is not a column of the > dataframe that you call withColumn on. > > It should read: > > Scala: > df.select(df.columns.map(column => collect_set(col(column)).as(column)): > _*).show() > > Java: > for (String columnName : df.columns()) { > df= df.withColumn(columnName, > collect_set(col(columnName)).as(columnName)); > } > > Then you have a single DataFrame that computes all columns in a single > Spark job. > > But this reads all distinct values into a single partition, which has the > same downside as collect, so this is as bad as using collect. > > Cheers, > Enrico > > > Am 12.02.23 um 18:05 schrieb sam smith: > > @Enrico Minack <enrico-min...@gmx.de> Thanks for "unpivot" but I am using > version 3.3.0 (you are taking it way too far as usual :) ) > @Sean Owen <sro...@gmail.com> Pls then show me how it can be improved by > code. > > Also, why such an approach (using withColumn() ) doesn't work: > > for (String columnName : df.columns()) { > df= df.withColumn(columnName, > df.select(columnName).distinct().col(columnName)); > } > > Le sam. 11 févr. 2023 à 13:11, Enrico Minack <i...@enrico.minack.dev> a > écrit : > >> You could do the entire thing in DataFrame world and write the result to >> disk. All you need is unpivot (to be released in Spark 3.4.0, soon). >> >> Note this is Scala but should be straightforward to translate into Java: >> >> import org.apache.spark.sql.functions.collect_set >> >> val df = Seq((1, 10, 123), (2, 20, 124), (3, 20, 123), (4, 10, >> 123)).toDF("a", "b", "c") >> >> df.unpivot(Array.empty, "column", "value") >> .groupBy("column") >> .agg(collect_set("value").as("distinct_values")) >> >> The unpivot operation turns >> +---+---+---+ >> | a| b| c| >> +---+---+---+ >> | 1| 10|123| >> | 2| 20|124| >> | 3| 20|123| >> | 4| 10|123| >> +---+---+---+ >> >> into >> >> +------+-----+ >> |column|value| >> +------+-----+ >> | a| 1| >> | b| 10| >> | c| 123| >> | a| 2| >> | b| 20| >> | c| 124| >> | a| 3| >> | b| 20| >> | c| 123| >> | a| 4| >> | b| 10| >> | c| 123| >> +------+-----+ >> >> The groupBy("column").agg(collect_set("value").as("distinct_values")) >> collects distinct values per column: >> +------+---------------+ >> >> |column|distinct_values| >> +------+---------------+ >> | c| [123, 124]| >> | b| [20, 10]| >> | a| [1, 2, 3, 4]| >> +------+---------------+ >> >> Note that unpivot only works if all columns have a "common" type. Then >> all columns are cast to that common type. If you have incompatible types >> like Integer and String, you would have to cast them all to String first: >> >> import org.apache.spark.sql.types.StringType >> >> df.select(df.columns.map(col(_).cast(StringType)): _*).unpivot(...) >> >> If you want to preserve the type of the values and have multiple value >> types, you cannot put everything into a DataFrame with one >> distinct_values column. You could still have multiple DataFrames, one >> per data type, and write those, or collect the DataFrame's values into Maps: >> >> import scala.collection.immutable >> >> import org.apache.spark.sql.DataFrame >> import org.apache.spark.sql.functions.collect_set >> >> // if all you columns have the same type >> def distinctValuesPerColumnOneType(df: DataFrame): immutable.Map[String, >> immutable.Seq[Any]] = { >> df.unpivot(Array.empty, "column", "value") >> .groupBy("column") >> .agg(collect_set("value").as("distinct_values")) >> .collect() >> .map(row => row.getString(0) -> row.getSeq[Any](1).toList) >> .toMap >> } >> >> >> // if your columns have different types >> def distinctValuesPerColumn(df: DataFrame): immutable.Map[String, >> immutable.Seq[Any]] = { >> df.schema.fields >> .groupBy(_.dataType) >> .mapValues(_.map(_.name)) >> .par >> .map { case (dataType, columns) => df.select(columns.map(col): _*) } >> .map(distinctValuesPerColumnOneType) >> .flatten >> .toList >> .toMap >> } >> >> val df = Seq((1, 10, "one"), (2, 20, "two"), (3, 20, "one"), (4, 10, >> "one")).toDF("a", "b", "c") >> distinctValuesPerColumn(df) >> >> The result is: (list values are of original type) >> Map(b -> List(20, 10), a -> List(1, 2, 3, 4), c -> List(one, two)) >> >> Hope this helps, >> Enrico >> >> >> Am 10.02.23 um 22:56 schrieb sam smith: >> >> Hi Apotolos, >> Can you suggest a better approach while keeping values within a dataframe? >> >> Le ven. 10 févr. 2023 à 22:47, Apostolos N. Papadopoulos < >> papad...@csd.auth.gr> a écrit : >> >>> Dear Sam, >>> >>> you are assuming that the data fits in the memory of your local machine. >>> You are using as a basis a dataframe, which potentially can be very large, >>> and then you are storing the data in local lists. Keep in mind that that >>> the number of distinct elements in a column may be very large (depending on >>> the app). I suggest to work on a solution that assumes that the number of >>> distinct values is also large. Thus, you should keep your data in >>> dataframes or RDDs, and store them as csv files, parquet, etc. >>> >>> a.p. >>> >>> >>> On 10/2/23 23:40, sam smith wrote: >>> >>> I want to get the distinct values of each column in a List (is it good >>> practice to use List here?), that contains as first element the column >>> name, and the other element its distinct values so that for a dataset we >>> get a list of lists, i do it this way (in my opinion no so fast): >>> >>> List<List<String>> finalList = new ArrayList<List<String>>(); >>> Dataset<Row> df = spark.read().format("csv").option("header", >>> "true").load("/pathToCSV"); >>> String[] columnNames = df.columns(); >>> for (int i=0;i<columnNames.length;i++) { >>> List<String> columnList = new ArrayList<String>(); >>> >>> columnList.add(columnNames[i]); >>> >>> >>> List<Row> columnValues = >>> df.filter(org.apache.spark.sql.functions.col(columnNames[i]).isNotNull()).select(columnNames[i]).distinct().collectAsList(); >>> for (int j=0;j<columnValues.size();j++) >>> columnList.add(columnValues.get(j).apply(0).toString()); >>> >>> finalList.add(columnList); >>> >>> >>> How to improve this? >>> >>> Also, can I get the results in JSON format? >>> >>> -- >>> Apostolos N. Papadopoulos, Associate Professor >>> Department of Informatics >>> Aristotle University of Thessaloniki >>> Thessaloniki, GREECE >>> tel: ++0030312310991918 >>> email: papad...@csd.auth.gr >>> twitter: @papadopoulos_ap >>> web: http://datalab.csd.auth.gr/~apostol >>> >>> >> >