You could do the entire thing in DataFrame world and write the result to disk. All you need is unpivot (to be released in Spark 3.4.0, soon).

Note this is Scala but should be straightforward to translate into Java:

import org.apache.spark.sql.functions.collect_set

val df = Seq((1, 10, 123), (2, 20, 124), (3, 20, 123), (4, 10, 123)).toDF("a", "b", "c")

df.unpivot(Array.empty, "column", "value")
  .groupBy("column")
  .agg(collect_set("value").as("distinct_values"))

The unpivot operation turns
+---+---+---+
|  a|  b|  c|
+---+---+---+
|  1| 10|123|
|  2| 20|124|
|  3| 20|123|
|  4| 10|123|
+---+---+---+

into

+------+-----+
|column|value|
+------+-----+
|     a|    1|
|     b|   10|
|     c|  123|
|     a|    2|
|     b|   20|
|     c|  124|
|     a|    3|
|     b|   20|
|     c|  123|
|     a|    4|
|     b|   10|
|     c|  123|
+------+-----+

The groupBy("column").agg(collect_set("value").as("distinct_values")) collects distinct values per column:
+------+---------------+
|column|distinct_values|
+------+---------------+
|     c|     [123, 124]|
|     b|       [20, 10]|
|     a|   [1, 2, 3, 4]|
+------+---------------+

Note that unpivot only works if all columns have a "common" type. Then all columns are cast to that common type. If you have incompatible types like Integer and String, you would have to cast them all to String first:

import org.apache.spark.sql.types.StringType

df.select(df.columns.map(col(_).cast(StringType)): _*).unpivot(...)

If you want to preserve the type of the values and have multiple value types, you cannot put everything into a DataFrame with one distinct_values column. You could still have multiple DataFrames, one per data type, and write those, or collect the DataFrame's values into Maps:

import scala.collection.immutable

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.collect_set

// if all you columns have the same type
def distinctValuesPerColumnOneType(df: DataFrame): immutable.Map[String, immutable.Seq[Any]] = {
  df.unpivot(Array.empty, "column", "value")
    .groupBy("column")
    .agg(collect_set("value").as("distinct_values"))
    .collect()
    .map(row => row.getString(0) -> row.getSeq[Any](1).toList)
    .toMap
}


// if your columns have different types
def distinctValuesPerColumn(df: DataFrame): immutable.Map[String, immutable.Seq[Any]] = {
  df.schema.fields
    .groupBy(_.dataType)
    .mapValues(_.map(_.name))
    .par
    .map { case (dataType, columns) => df.select(columns.map(col): _*) }
    .map(distinctValuesPerColumnOneType)
    .flatten
    .toList
    .toMap
}

val df = Seq((1, 10, "one"), (2, 20, "two"), (3, 20, "one"), (4, 10, "one")).toDF("a", "b", "c")
distinctValuesPerColumn(df)

The result is: (list values are of original type)
Map(b -> List(20, 10), a -> List(1, 2, 3, 4), c -> List(one, two))

Hope this helps,
Enrico


Am 10.02.23 um 22:56 schrieb sam smith:
Hi Apotolos,
Can you suggest a better approach while keeping values within a dataframe?

Le ven. 10 févr. 2023 à 22:47, Apostolos N. Papadopoulos <papad...@csd.auth.gr> a écrit :

    Dear Sam,

    you are assuming that the data fits in the memory of your local
    machine. You are using as a basis a dataframe, which potentially
    can be very large, and then you are storing the data in local
    lists. Keep in mind that that the number of distinct elements in a
    column may be very large (depending on the app). I suggest to work
    on a solution that assumes that the number of distinct values is
    also large. Thus, you should keep your data in dataframes or RDDs,
    and store them as csv files, parquet, etc.

    a.p.


    On 10/2/23 23:40, sam smith wrote:
    I want to get the distinct values of each column in a List (is it
    good practice to use List here?), that contains as first element
    the column name, and the other element its distinct values so
    that for a dataset we get a list of lists, i do it this way (in
    my opinion no so fast):

    |List<List<String>> finalList = new ArrayList<List<String>>();
    Dataset<Row> df = spark.read().format("csv").option("header",
    "true").load("/pathToCSV"); String[] columnNames = df.columns();
    for (int i=0;i<columnNames.length;i++) { List<String> columnList
    = new ArrayList<String>(); columnList.add(columnNames[i]);
    List<Row> columnValues =
    
df.filter(org.apache.spark.sql.functions.col(columnNames[i]).isNotNull()).select(columnNames[i]).distinct().collectAsList();
    for (int j=0;j<columnValues.size();j++)
    columnList.add(columnValues.get(j).apply(0).toString());
    finalList.add(columnList);|

    How to improve this?

    Also, can I get the results in JSON format?

-- Apostolos N. Papadopoulos, Associate Professor
    Department of Informatics
    Aristotle University of Thessaloniki
    Thessaloniki, GREECE
    tel: ++0030312310991918
    email:papad...@csd.auth.gr
    twitter: @papadopoulos_ap
    web:http://datalab.csd.auth.gr/~apostol

Reply via email to