@Sean: This aggregate function does work without an explicit groupBy():

./spark-3.3.1-bin-hadoop2/bin/spark-shell
Spark context Web UI available at http://*********:4040
Spark context available as 'sc' (master = local[*], app id = local-1676237726079).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.3.1
      /_/

Using Scala version 2.12.15 (OpenJDK 64-Bit Server VM, Java 11.0.17)
Type in expressions to have them evaluated.
Type :help for more information.

scala> val df = Seq((1, 10, "one"), (2, 20, "two"), (3, 20, "one"), (4, 10, "one")).toDF("a", "b", "c") scala> df.select(df.columns.map(column => collect_set(col(column)).as(column)): _*).show()
+------------+--------+----------+
|           a|       b|         c|
+------------+--------+----------+
|[1, 2, 3, 4]|[20, 10]|[one, two]|
+------------+--------+----------+

@Sam: I haven't tested the Java code, sorry. I presume you can work it out from the working Scala code.

Enrico


Am 12.02.23 um 21:32 schrieb Sean Owen:
It doesn't work because it's an aggregate function. You have to groupBy() (group by nothing) to make that work, but, you can't assign that as a column. Folks those approaches don't make sense semantically in SQL or Spark or anything. They just mean use threads to collect() distinct values for each col in parallel using threads in your program. You don't have to but you could. What else are we looking for here, the answer has been given a number of times I think.


On Sun, Feb 12, 2023 at 2:28 PM sam smith <qustacksm2123...@gmail.com> wrote:

    OK, what do you mean by " do your outer for loop in parallel "?
    btw this didn't work:
    for (String columnName : df.columns()) {
        df= df.withColumn(columnName,
    collect_set(col(columnName)).as(columnName));
    }


    Le dim. 12 févr. 2023 à 20:36, Enrico Minack
    <i...@enrico.minack.dev> a écrit :

        That is unfortunate, but 3.4.0 is around the corner, really!

        Well, then based on your code, I'd suggest two improvements:
        - cache your dataframe after reading, this way, you don't read
        the entire file for each column
        - do your outer for loop in parallel, then you have N parallel
        Spark jobs (only helps if your Spark cluster is not fully
        occupied by a single column)

        Your withColumn-approach does not work because withColumn
        expects a column as the second argument, but
        df.select(columnName).distinct() is a DataFrame and .col is a
        column in *that* DataFrame, it is not a column of the
        dataframe that you call withColumn on.

        It should read:

        Scala:
        df.select(df.columns.map(column =>
        collect_set(col(column)).as(column)): _*).show()

        Java:
        for (String columnName : df.columns()) {
            df= df.withColumn(columnName,
        collect_set(col(columnName)).as(columnName));
        }

        Then you have a single DataFrame that computes all columns in
        a single Spark job.

        But this reads all distinct values into a single partition,
        which has the same downside as collect, so this is as bad as
        using collect.

        Cheers,
        Enrico


        Am 12.02.23 um 18:05 schrieb sam smith:
        @Enrico Minack <mailto:enrico-min...@gmx.de> Thanks for
        "unpivot" but I am using version 3.3.0 (you are taking it way
        too far as usual :) )
        @Sean Owen <mailto:sro...@gmail.com> Pls then show me how it
        can be improved by code.

        Also, why such an approach (using withColumn() ) doesn't work:

        for (String columnName : df.columns()) {
            df= df.withColumn(columnName,
        df.select(columnName).distinct().col(columnName));
        }

        Le sam. 11 févr. 2023 à 13:11, Enrico Minack
        <i...@enrico.minack.dev> a écrit :

            You could do the entire thing in DataFrame world and
            write the result to disk. All you need is unpivot (to be
            released in Spark 3.4.0, soon).

            Note this is Scala but should be straightforward to
            translate into Java:

            import org.apache.spark.sql.functions.collect_set

            val df = Seq((1, 10, 123), (2, 20, 124), (3, 20, 123),
            (4, 10, 123)).toDF("a", "b", "c")

            df.unpivot(Array.empty, "column", "value")
              .groupBy("column")
            .agg(collect_set("value").as("distinct_values"))

            The unpivot operation turns
            +---+---+---+
            |  a|  b|  c|
            +---+---+---+
            |  1| 10|123|
            |  2| 20|124|
            |  3| 20|123|
            |  4| 10|123|
            +---+---+---+

            into

            +------+-----+
            |column|value|
            +------+-----+
            |     a|    1|
            |     b|   10|
            |     c|  123|
            |     a|    2|
            |     b|   20|
            |     c|  124|
            |     a|    3|
            |     b|   20|
            |     c|  123|
            |     a|    4|
            |     b|   10|
            |     c|  123|
            +------+-----+

            The
            groupBy("column").agg(collect_set("value").as("distinct_values"))
            collects distinct values per column:
            +------+---------------+
            |column|distinct_values|
            +------+---------------+
            |     c|     [123, 124]|
            |     b|       [20, 10]|
            |     a|   [1, 2, 3, 4]|
            +------+---------------+

            Note that unpivot only works if all columns have a
            "common" type. Then all columns are cast to that common
            type. If you have incompatible types like Integer and
            String, you would have to cast them all to String first:

            import org.apache.spark.sql.types.StringType

            df.select(df.columns.map(col(_).cast(StringType)):
            _*).unpivot(...)

            If you want to preserve the type of the values and have
            multiple value types, you cannot put everything into a
            DataFrame with one distinct_values column. You could
            still have multiple DataFrames, one per data type, and
            write those, or collect the DataFrame's values into Maps:

            import scala.collection.immutable

            import org.apache.spark.sql.DataFrame
            import org.apache.spark.sql.functions.collect_set

            // if all you columns have the same type
            def distinctValuesPerColumnOneType(df: DataFrame):
            immutable.Map[String, immutable.Seq[Any]] = {
              df.unpivot(Array.empty, "column", "value")
                .groupBy("column")
            .agg(collect_set("value").as("distinct_values"))
                .collect()
                .map(row => row.getString(0) ->
            row.getSeq[Any](1).toList)
                .toMap
            }


            // if your columns have different types
            def distinctValuesPerColumn(df: DataFrame):
            immutable.Map[String, immutable.Seq[Any]] = {
              df.schema.fields
                .groupBy(_.dataType)
                .mapValues(_.map(_.name))
                .par
                .map { case (dataType, columns) =>
            df.select(columns.map(col): _*) }
                .map(distinctValuesPerColumnOneType)
                .flatten
                .toList
                .toMap
            }

            val df = Seq((1, 10, "one"), (2, 20, "two"), (3, 20,
            "one"), (4, 10, "one")).toDF("a", "b", "c")
            distinctValuesPerColumn(df)

            The result is: (list values are of original type)
            Map(b -> List(20, 10), a -> List(1, 2, 3, 4), c ->
            List(one, two))

            Hope this helps,
            Enrico


            Am 10.02.23 um 22:56 schrieb sam smith:
            Hi Apotolos,
            Can you suggest a better approach while keeping values
            within a dataframe?

            Le ven. 10 févr. 2023 à 22:47, Apostolos N. Papadopoulos
            <papad...@csd.auth.gr> a écrit :

                Dear Sam,

                you are assuming that the data fits in the memory of
                your local machine. You are using as a basis a
                dataframe, which potentially can be very large, and
                then you are storing the data in local lists. Keep
                in mind that that the number of distinct elements in
                a column may be very large (depending on the app). I
                suggest to work on a solution that assumes that the
                number of distinct values is also large. Thus, you
                should keep your data in dataframes or RDDs, and
                store them as csv files, parquet, etc.

                a.p.


                On 10/2/23 23:40, sam smith wrote:
                I want to get the distinct values of each column in
                a List (is it good practice to use List here?),
                that contains as first element the column name, and
                the other element its distinct values so that for a
                dataset we get a list of lists, i do it this way
                (in my opinion no so fast):

                |List<List<String>> finalList = new
                ArrayList<List<String>>(); Dataset<Row> df =
                spark.read().format("csv").option("header",
                "true").load("/pathToCSV"); String[] columnNames =
                df.columns(); for (int
                i=0;i<columnNames.length;i++) { List<String>
                columnList = new ArrayList<String>();
                columnList.add(columnNames[i]); List<Row>
                columnValues =
                
df.filter(org.apache.spark.sql.functions.col(columnNames[i]).isNotNull()).select(columnNames[i]).distinct().collectAsList();
                for (int j=0;j<columnValues.size();j++)
                columnList.add(columnValues.get(j).apply(0).toString());
                finalList.add(columnList);|

                How to improve this?

                Also, can I get the results in JSON format?

-- Apostolos N. Papadopoulos, Associate Professor
                Department of Informatics
                Aristotle University of Thessaloniki
                Thessaloniki, GREECE
                tel: ++0030312310991918
                email:papad...@csd.auth.gr
                twitter: @papadopoulos_ap
                web:http://datalab.csd.auth.gr/~apostol



Reply via email to