Re: How to improve efficiency of this piece of code (returning distinct column values)

2023-02-13 Thread sam smith
Alright, this is the working Java version of it:

List listCols = new ArrayList();
> Arrays.asList(dataset.columns()).forEach(column -> {
> listCols.add(org.apache.spark.sql.functions.collect_set(column)); });
> Column[] arrCols = listCols.toArray(new Column[listCols.size()]);
> dataset = dataset.select(arrCols);


But then, I tried to explode the set of values into rows, through the
explode() but the column values repeat to fill the size of the largest
column.

How to set the repeated values to null instead? (thus keeping only one
exploded set of column values in each column).

Thanks.

Le dim. 12 févr. 2023 à 22:43, Enrico Minack  a
écrit :

> @Sean: This aggregate function does work without an explicit groupBy():
>
> ./spark-3.3.1-bin-hadoop2/bin/spark-shell
> Spark context Web UI available at http://*:4040
> Spark context available as 'sc' (master = local[*], app id =
> local-1676237726079).
> Spark session available as 'spark'.
> Welcome to
>     __
>  / __/__  ___ _/ /__
> _\ \/ _ \/ _ `/ __/  '_/
>/___/ .__/\_,_/_/ /_/\_\   version 3.3.1
>   /_/
>
> Using Scala version 2.12.15 (OpenJDK 64-Bit Server VM, Java 11.0.17)
> Type in expressions to have them evaluated.
> Type :help for more information.
>
> scala> val df = Seq((1, 10, "one"), (2, 20, "two"), (3, 20, "one"), (4,
> 10, "one")).toDF("a", "b", "c")
> scala> df.select(df.columns.map(column =>
> collect_set(col(column)).as(column)): _*).show()
> +++--+
>
> |   a|   b| c|
> +++--+
> |[1, 2, 3, 4]|[20, 10]|[one, two]|
> +++--+
>
> @Sam: I haven't tested the Java code, sorry. I presume you can work it out
> from the working Scala code.
>
> Enrico
>
>
> Am 12.02.23 um 21:32 schrieb Sean Owen:
>
> It doesn't work because it's an aggregate function. You have to groupBy()
> (group by nothing) to make that work, but, you can't assign that as a
> column. Folks those approaches don't make sense semantically in SQL or
> Spark or anything.
> They just mean use threads to collect() distinct values for each col in
> parallel using threads in your program. You don't have to but you could.
> What else are we looking for here, the answer has been given a number of
> times I think.
>
>
> On Sun, Feb 12, 2023 at 2:28 PM sam smith 
> wrote:
>
>> OK, what do you mean by " do your outer for loop in parallel "?
>> btw this didn't work:
>> for (String columnName : df.columns()) {
>> df= df.withColumn(columnName,
>> collect_set(col(columnName)).as(columnName));
>> }
>>
>>
>> Le dim. 12 févr. 2023 à 20:36, Enrico Minack  a
>> écrit :
>>
>>> That is unfortunate, but 3.4.0 is around the corner, really!
>>>
>>> Well, then based on your code, I'd suggest two improvements:
>>> - cache your dataframe after reading, this way, you don't read the
>>> entire file for each column
>>> - do your outer for loop in parallel, then you have N parallel Spark
>>> jobs (only helps if your Spark cluster is not fully occupied by a single
>>> column)
>>>
>>> Your withColumn-approach does not work because withColumn expects a
>>> column as the second argument, but df.select(columnName).distinct() is a
>>> DataFrame and .col is a column in *that* DataFrame, it is not a column
>>> of the dataframe that you call withColumn on.
>>>
>>> It should read:
>>>
>>> Scala:
>>> df.select(df.columns.map(column => collect_set(col(column)).as(column)):
>>> _*).show()
>>>
>>> Java:
>>> for (String columnName : df.columns()) {
>>> df= df.withColumn(columnName,
>>> collect_set(col(columnName)).as(columnName));
>>> }
>>>
>>> Then you have a single DataFrame that computes all columns in a single
>>> Spark job.
>>>
>>> But this reads all distinct values into a single partition, which has
>>> the same downside as collect, so this is as bad as using collect.
>>>
>>> Cheers,
>>> Enrico
>>>
>>>
>>> Am 12.02.23 um 18:05 schrieb sam smith:
>>>
>>> @Enrico Minack  Thanks for "unpivot" but I am
>>> using version 3.3.0 (you are taking it way too far as usual :) )
>>> @Sean Owen  Pls then show me how it can be improved
>>> by code.
>>>
>>> Also, why such an approach (using withColumn() ) doesn't work:
>>>
>>> for (String columnName : df.columns()) {
>>> df= df.withColumn(columnName,
>>> df.select(columnName).distinct().col(columnName));
>>> }
>>>
>>> Le sam. 11 févr. 2023 à 13:11, Enrico Minack  a
>>> écrit :
>>>
 You could do the entire thing in DataFrame world and write the result
 to disk. All you need is unpivot (to be released in Spark 3.4.0, soon).

 Note this is Scala but should be straightforward to translate into Java:

 import org.apache.spark.sql.functions.collect_set

 val df = Seq((1, 10, 123), (2, 20, 124), (3, 20, 123), (4, 10,
 123)).toDF("a", "b", "c")

 df.unpivot(Array.empty, "column", "value")
   .groupBy("column")
   .agg(collect_set("value").as("distinct_values"))

 The unpivot 

Re: How to improve efficiency of this piece of code (returning distinct column values)

2023-02-12 Thread Enrico Minack

@Sean: This aggregate function does work without an explicit groupBy():

./spark-3.3.1-bin-hadoop2/bin/spark-shell
Spark context Web UI available at http://*:4040
Spark context available as 'sc' (master = local[*], app id = 
local-1676237726079).

Spark session available as 'spark'.
Welcome to
    __
 / __/__  ___ _/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.3.1
  /_/

Using Scala version 2.12.15 (OpenJDK 64-Bit Server VM, Java 11.0.17)
Type in expressions to have them evaluated.
Type :help for more information.

scala> val df = Seq((1, 10, "one"), (2, 20, "two"), (3, 20, "one"), (4, 
10, "one")).toDF("a", "b", "c")
scala> df.select(df.columns.map(column => 
collect_set(col(column)).as(column)): _*).show()

+++--+
|   a|   b| c|
+++--+
|[1, 2, 3, 4]|[20, 10]|[one, two]|
+++--+

@Sam: I haven't tested the Java code, sorry. I presume you can work it 
out from the working Scala code.


Enrico


Am 12.02.23 um 21:32 schrieb Sean Owen:
It doesn't work because it's an aggregate function. You have to 
groupBy() (group by nothing) to make that work, but, you can't assign 
that as a column. Folks those approaches don't make sense semantically 
in SQL or Spark or anything.
They just mean use threads to collect() distinct values for each col 
in parallel using threads in your program. You don't have to but you 
could.
What else are we looking for here, the answer has been given a number 
of times I think.



On Sun, Feb 12, 2023 at 2:28 PM sam smith  
wrote:


OK, what do you mean by " do your outer for loop in parallel "?
btw this didn't work:
for (String columnName : df.columns()) {
    df= df.withColumn(columnName,
collect_set(col(columnName)).as(columnName));
}


Le dim. 12 févr. 2023 à 20:36, Enrico Minack
 a écrit :

That is unfortunate, but 3.4.0 is around the corner, really!

Well, then based on your code, I'd suggest two improvements:
- cache your dataframe after reading, this way, you don't read
the entire file for each column
- do your outer for loop in parallel, then you have N parallel
Spark jobs (only helps if your Spark cluster is not fully
occupied by a single column)

Your withColumn-approach does not work because withColumn
expects a column as the second argument, but
df.select(columnName).distinct() is a DataFrame and .col is a
column in *that* DataFrame, it is not a column of the
dataframe that you call withColumn on.

It should read:

Scala:
df.select(df.columns.map(column =>
collect_set(col(column)).as(column)): _*).show()

Java:
for (String columnName : df.columns()) {
    df= df.withColumn(columnName,
collect_set(col(columnName)).as(columnName));
}

Then you have a single DataFrame that computes all columns in
a single Spark job.

But this reads all distinct values into a single partition,
which has the same downside as collect, so this is as bad as
using collect.

Cheers,
Enrico


Am 12.02.23 um 18:05 schrieb sam smith:

@Enrico Minack  Thanks for
"unpivot" but I am using version 3.3.0 (you are taking it way
too far as usual :) )
@Sean Owen  Pls then show me how it
can be improved by code.

Also, why such an approach (using withColumn() ) doesn't work:

for (String columnName : df.columns()) {
    df= df.withColumn(columnName,
df.select(columnName).distinct().col(columnName));
}

Le sam. 11 févr. 2023 à 13:11, Enrico Minack
 a écrit :

You could do the entire thing in DataFrame world and
write the result to disk. All you need is unpivot (to be
released in Spark 3.4.0, soon).

Note this is Scala but should be straightforward to
translate into Java:

import org.apache.spark.sql.functions.collect_set

val df = Seq((1, 10, 123), (2, 20, 124), (3, 20, 123),
(4, 10, 123)).toDF("a", "b", "c")

df.unpivot(Array.empty, "column", "value")
  .groupBy("column")
.agg(collect_set("value").as("distinct_values"))

The unpivot operation turns
+---+---+---+
|  a|  b|  c|
+---+---+---+
|  1| 10|123|
|  2| 20|124|
|  3| 20|123|
|  4| 10|123|
+---+---+---+

into

+--+-+
|column|value|
+--+-+
| a|    1|
| b|   10|
| c|  123|
| a|    2|
| b|   20|

Re: How to improve efficiency of this piece of code (returning distinct column values)

2023-02-12 Thread Sean Owen
It doesn't work because it's an aggregate function. You have to groupBy()
(group by nothing) to make that work, but, you can't assign that as a
column. Folks those approaches don't make sense semantically in SQL or
Spark or anything.
They just mean use threads to collect() distinct values for each col in
parallel using threads in your program. You don't have to but you could.
What else are we looking for here, the answer has been given a number of
times I think.


On Sun, Feb 12, 2023 at 2:28 PM sam smith 
wrote:

> OK, what do you mean by " do your outer for loop in parallel "?
> btw this didn't work:
> for (String columnName : df.columns()) {
> df= df.withColumn(columnName,
> collect_set(col(columnName)).as(columnName));
> }
>
>
> Le dim. 12 févr. 2023 à 20:36, Enrico Minack  a
> écrit :
>
>> That is unfortunate, but 3.4.0 is around the corner, really!
>>
>> Well, then based on your code, I'd suggest two improvements:
>> - cache your dataframe after reading, this way, you don't read the entire
>> file for each column
>> - do your outer for loop in parallel, then you have N parallel Spark jobs
>> (only helps if your Spark cluster is not fully occupied by a single column)
>>
>> Your withColumn-approach does not work because withColumn expects a
>> column as the second argument, but df.select(columnName).distinct() is a
>> DataFrame and .col is a column in *that* DataFrame, it is not a column
>> of the dataframe that you call withColumn on.
>>
>> It should read:
>>
>> Scala:
>> df.select(df.columns.map(column => collect_set(col(column)).as(column)):
>> _*).show()
>>
>> Java:
>> for (String columnName : df.columns()) {
>> df= df.withColumn(columnName,
>> collect_set(col(columnName)).as(columnName));
>> }
>>
>> Then you have a single DataFrame that computes all columns in a single
>> Spark job.
>>
>> But this reads all distinct values into a single partition, which has the
>> same downside as collect, so this is as bad as using collect.
>>
>> Cheers,
>> Enrico
>>
>>
>> Am 12.02.23 um 18:05 schrieb sam smith:
>>
>> @Enrico Minack  Thanks for "unpivot" but I am
>> using version 3.3.0 (you are taking it way too far as usual :) )
>> @Sean Owen  Pls then show me how it can be improved by
>> code.
>>
>> Also, why such an approach (using withColumn() ) doesn't work:
>>
>> for (String columnName : df.columns()) {
>> df= df.withColumn(columnName,
>> df.select(columnName).distinct().col(columnName));
>> }
>>
>> Le sam. 11 févr. 2023 à 13:11, Enrico Minack  a
>> écrit :
>>
>>> You could do the entire thing in DataFrame world and write the result to
>>> disk. All you need is unpivot (to be released in Spark 3.4.0, soon).
>>>
>>> Note this is Scala but should be straightforward to translate into Java:
>>>
>>> import org.apache.spark.sql.functions.collect_set
>>>
>>> val df = Seq((1, 10, 123), (2, 20, 124), (3, 20, 123), (4, 10,
>>> 123)).toDF("a", "b", "c")
>>>
>>> df.unpivot(Array.empty, "column", "value")
>>>   .groupBy("column")
>>>   .agg(collect_set("value").as("distinct_values"))
>>>
>>> The unpivot operation turns
>>> +---+---+---+
>>> |  a|  b|  c|
>>> +---+---+---+
>>> |  1| 10|123|
>>> |  2| 20|124|
>>> |  3| 20|123|
>>> |  4| 10|123|
>>> +---+---+---+
>>>
>>> into
>>>
>>> +--+-+
>>> |column|value|
>>> +--+-+
>>> | a|1|
>>> | b|   10|
>>> | c|  123|
>>> | a|2|
>>> | b|   20|
>>> | c|  124|
>>> | a|3|
>>> | b|   20|
>>> | c|  123|
>>> | a|4|
>>> | b|   10|
>>> | c|  123|
>>> +--+-+
>>>
>>> The groupBy("column").agg(collect_set("value").as("distinct_values"))
>>> collects distinct values per column:
>>> +--+---+
>>>
>>> |column|distinct_values|
>>> +--+---+
>>> | c| [123, 124]|
>>> | b|   [20, 10]|
>>> | a|   [1, 2, 3, 4]|
>>> +--+---+
>>>
>>> Note that unpivot only works if all columns have a "common" type. Then
>>> all columns are cast to that common type. If you have incompatible types
>>> like Integer and String, you would have to cast them all to String first:
>>>
>>> import org.apache.spark.sql.types.StringType
>>>
>>> df.select(df.columns.map(col(_).cast(StringType)): _*).unpivot(...)
>>>
>>> If you want to preserve the type of the values and have multiple value
>>> types, you cannot put everything into a DataFrame with one
>>> distinct_values column. You could still have multiple DataFrames, one
>>> per data type, and write those, or collect the DataFrame's values into Maps:
>>>
>>> import scala.collection.immutable
>>>
>>> import org.apache.spark.sql.DataFrame
>>> import org.apache.spark.sql.functions.collect_set
>>>
>>> // if all you columns have the same type
>>> def distinctValuesPerColumnOneType(df: DataFrame): immutable.Map[String,
>>> immutable.Seq[Any]] = {
>>>   df.unpivot(Array.empty, "column", "value")
>>> .groupBy("column")
>>> .agg(collect_set("value").as("distinct_values"))
>>> .collect()
>>> .map(row => 

Re: How to improve efficiency of this piece of code (returning distinct column values)

2023-02-12 Thread sam smith
OK, what do you mean by " do your outer for loop in parallel "?
btw this didn't work:
for (String columnName : df.columns()) {
df= df.withColumn(columnName,
collect_set(col(columnName)).as(columnName));
}


Le dim. 12 févr. 2023 à 20:36, Enrico Minack  a
écrit :

> That is unfortunate, but 3.4.0 is around the corner, really!
>
> Well, then based on your code, I'd suggest two improvements:
> - cache your dataframe after reading, this way, you don't read the entire
> file for each column
> - do your outer for loop in parallel, then you have N parallel Spark jobs
> (only helps if your Spark cluster is not fully occupied by a single column)
>
> Your withColumn-approach does not work because withColumn expects a column
> as the second argument, but df.select(columnName).distinct() is a DataFrame
> and .col is a column in *that* DataFrame, it is not a column of the
> dataframe that you call withColumn on.
>
> It should read:
>
> Scala:
> df.select(df.columns.map(column => collect_set(col(column)).as(column)):
> _*).show()
>
> Java:
> for (String columnName : df.columns()) {
> df= df.withColumn(columnName,
> collect_set(col(columnName)).as(columnName));
> }
>
> Then you have a single DataFrame that computes all columns in a single
> Spark job.
>
> But this reads all distinct values into a single partition, which has the
> same downside as collect, so this is as bad as using collect.
>
> Cheers,
> Enrico
>
>
> Am 12.02.23 um 18:05 schrieb sam smith:
>
> @Enrico Minack  Thanks for "unpivot" but I am using
> version 3.3.0 (you are taking it way too far as usual :) )
> @Sean Owen  Pls then show me how it can be improved by
> code.
>
> Also, why such an approach (using withColumn() ) doesn't work:
>
> for (String columnName : df.columns()) {
> df= df.withColumn(columnName,
> df.select(columnName).distinct().col(columnName));
> }
>
> Le sam. 11 févr. 2023 à 13:11, Enrico Minack  a
> écrit :
>
>> You could do the entire thing in DataFrame world and write the result to
>> disk. All you need is unpivot (to be released in Spark 3.4.0, soon).
>>
>> Note this is Scala but should be straightforward to translate into Java:
>>
>> import org.apache.spark.sql.functions.collect_set
>>
>> val df = Seq((1, 10, 123), (2, 20, 124), (3, 20, 123), (4, 10,
>> 123)).toDF("a", "b", "c")
>>
>> df.unpivot(Array.empty, "column", "value")
>>   .groupBy("column")
>>   .agg(collect_set("value").as("distinct_values"))
>>
>> The unpivot operation turns
>> +---+---+---+
>> |  a|  b|  c|
>> +---+---+---+
>> |  1| 10|123|
>> |  2| 20|124|
>> |  3| 20|123|
>> |  4| 10|123|
>> +---+---+---+
>>
>> into
>>
>> +--+-+
>> |column|value|
>> +--+-+
>> | a|1|
>> | b|   10|
>> | c|  123|
>> | a|2|
>> | b|   20|
>> | c|  124|
>> | a|3|
>> | b|   20|
>> | c|  123|
>> | a|4|
>> | b|   10|
>> | c|  123|
>> +--+-+
>>
>> The groupBy("column").agg(collect_set("value").as("distinct_values"))
>> collects distinct values per column:
>> +--+---+
>>
>> |column|distinct_values|
>> +--+---+
>> | c| [123, 124]|
>> | b|   [20, 10]|
>> | a|   [1, 2, 3, 4]|
>> +--+---+
>>
>> Note that unpivot only works if all columns have a "common" type. Then
>> all columns are cast to that common type. If you have incompatible types
>> like Integer and String, you would have to cast them all to String first:
>>
>> import org.apache.spark.sql.types.StringType
>>
>> df.select(df.columns.map(col(_).cast(StringType)): _*).unpivot(...)
>>
>> If you want to preserve the type of the values and have multiple value
>> types, you cannot put everything into a DataFrame with one
>> distinct_values column. You could still have multiple DataFrames, one
>> per data type, and write those, or collect the DataFrame's values into Maps:
>>
>> import scala.collection.immutable
>>
>> import org.apache.spark.sql.DataFrame
>> import org.apache.spark.sql.functions.collect_set
>>
>> // if all you columns have the same type
>> def distinctValuesPerColumnOneType(df: DataFrame): immutable.Map[String,
>> immutable.Seq[Any]] = {
>>   df.unpivot(Array.empty, "column", "value")
>> .groupBy("column")
>> .agg(collect_set("value").as("distinct_values"))
>> .collect()
>> .map(row => row.getString(0) -> row.getSeq[Any](1).toList)
>> .toMap
>> }
>>
>>
>> // if your columns have different types
>> def distinctValuesPerColumn(df: DataFrame): immutable.Map[String,
>> immutable.Seq[Any]] = {
>>   df.schema.fields
>> .groupBy(_.dataType)
>> .mapValues(_.map(_.name))
>> .par
>> .map { case (dataType, columns) => df.select(columns.map(col): _*) }
>> .map(distinctValuesPerColumnOneType)
>> .flatten
>> .toList
>> .toMap
>> }
>>
>> val df = Seq((1, 10, "one"), (2, 20, "two"), (3, 20, "one"), (4, 10,
>> "one")).toDF("a", "b", "c")
>> distinctValuesPerColumn(df)
>>
>> The result is: (list values are of original type)
>> Map(b 

Re: How to improve efficiency of this piece of code (returning distinct column values)

2023-02-12 Thread Enrico Minack

That is unfortunate, but 3.4.0 is around the corner, really!

Well, then based on your code, I'd suggest two improvements:
- cache your dataframe after reading, this way, you don't read the 
entire file for each column
- do your outer for loop in parallel, then you have N parallel Spark 
jobs (only helps if your Spark cluster is not fully occupied by a single 
column)


Your withColumn-approach does not work because withColumn expects a 
column as the second argument, but df.select(columnName).distinct() is a 
DataFrame and .col is a column in *that* DataFrame, it is not a column 
of the dataframe that you call withColumn on.


It should read:

Scala:
df.select(df.columns.map(column => collect_set(col(column)).as(column)): 
_*).show()


Java:
for (String columnName : df.columns()) {
    df= df.withColumn(columnName, 
collect_set(col(columnName)).as(columnName));

}

Then you have a single DataFrame that computes all columns in a single 
Spark job.


But this reads all distinct values into a single partition, which has 
the same downside as collect, so this is as bad as using collect.


Cheers,
Enrico


Am 12.02.23 um 18:05 schrieb sam smith:
@Enrico Minack  Thanks for "unpivot" but 
I am using version 3.3.0 (you are taking it way too far as usual :) )
@Sean Owen  Pls then show me how it can be 
improved by code.


Also, why such an approach (using withColumn() ) doesn't work:

for (String columnName : df.columns()) {
    df= df.withColumn(columnName, 
df.select(columnName).distinct().col(columnName));

}

Le sam. 11 févr. 2023 à 13:11, Enrico Minack  
a écrit :


You could do the entire thing in DataFrame world and write the
result to disk. All you need is unpivot (to be released in Spark
3.4.0, soon).

Note this is Scala but should be straightforward to translate into
Java:

import org.apache.spark.sql.functions.collect_set

val df = Seq((1, 10, 123), (2, 20, 124), (3, 20, 123), (4, 10,
123)).toDF("a", "b", "c")

df.unpivot(Array.empty, "column", "value")
  .groupBy("column")
  .agg(collect_set("value").as("distinct_values"))

The unpivot operation turns
+---+---+---+
|  a|  b|  c|
+---+---+---+
|  1| 10|123|
|  2| 20|124|
|  3| 20|123|
|  4| 10|123|
+---+---+---+

into

+--+-+
|column|value|
+--+-+
| a|    1|
| b|   10|
| c|  123|
| a|    2|
| b|   20|
| c|  124|
| a|    3|
| b|   20|
| c|  123|
| a|    4|
| b|   10|
| c|  123|
+--+-+

The
groupBy("column").agg(collect_set("value").as("distinct_values"))
collects distinct values per column:
+--+---+
|column|distinct_values|
+--+---+
| c| [123, 124]|
| b|   [20, 10]|
| a|   [1, 2, 3, 4]|
+--+---+

Note that unpivot only works if all columns have a "common" type.
Then all columns are cast to that common type. If you have
incompatible types like Integer and String, you would have to cast
them all to String first:

import org.apache.spark.sql.types.StringType

df.select(df.columns.map(col(_).cast(StringType)): _*).unpivot(...)

If you want to preserve the type of the values and have multiple
value types, you cannot put everything into a DataFrame with one
distinct_values column. You could still have multiple DataFrames,
one per data type, and write those, or collect the DataFrame's
values into Maps:

import scala.collection.immutable

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.collect_set

// if all you columns have the same type
def distinctValuesPerColumnOneType(df: DataFrame):
immutable.Map[String, immutable.Seq[Any]] = {
  df.unpivot(Array.empty, "column", "value")
    .groupBy("column")
    .agg(collect_set("value").as("distinct_values"))
    .collect()
    .map(row => row.getString(0) -> row.getSeq[Any](1).toList)
    .toMap
}


// if your columns have different types
def distinctValuesPerColumn(df: DataFrame): immutable.Map[String,
immutable.Seq[Any]] = {
  df.schema.fields
    .groupBy(_.dataType)
    .mapValues(_.map(_.name))
    .par
    .map { case (dataType, columns) => df.select(columns.map(col):
_*) }
    .map(distinctValuesPerColumnOneType)
    .flatten
    .toList
    .toMap
}

val df = Seq((1, 10, "one"), (2, 20, "two"), (3, 20, "one"), (4,
10, "one")).toDF("a", "b", "c")
distinctValuesPerColumn(df)

The result is: (list values are of original type)
Map(b -> List(20, 10), a -> List(1, 2, 3, 4), c -> List(one, two))

Hope this helps,
Enrico


Am 10.02.23 um 22:56 schrieb sam smith:

Hi Apotolos,
Can you suggest a better approach while keeping 

Re: How to improve efficiency of this piece of code (returning distinct column values)

2023-02-12 Thread Mich Talebzadeh
Hi Sam,


I am curious to know the business use case for this solution if any?


HTH


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sun, 12 Feb 2023 at 17:37, sam smith  wrote:

> @Sean Correct. But I was hoping to improve my solution even more.
>
> Le dim. 12 févr. 2023 à 18:03, Sean Owen  a écrit :
>
>> That's the answer, except, you can never select a result set into a
>> column right? you just collect() each of those results. Or, what do you
>> want? I'm not clear.
>>
>> On Sun, Feb 12, 2023 at 10:59 AM sam smith 
>> wrote:
>>
>>> @Enrico Minack  Thanks for "unpivot" but I am
>>> using version 3.3.0 (you are taking it way too far as usual :) )
>>> @Sean Owen  Pls then show me how it can be improved
>>> by code.
>>>
>>> Also, why such an approach (using withColumn() ) doesn't work:
>>>
>>> for (String columnName : df.columns()) {
>>> df= df.withColumn(columnName,
>>> df.select(columnName).distinct().col(columnName));
>>> }
>>>
>>> Le sam. 11 févr. 2023 à 13:11, Enrico Minack  a
>>> écrit :
>>>
 You could do the entire thing in DataFrame world and write the result
 to disk. All you need is unpivot (to be released in Spark 3.4.0, soon).

 Note this is Scala but should be straightforward to translate into Java:

 import org.apache.spark.sql.functions.collect_set

 val df = Seq((1, 10, 123), (2, 20, 124), (3, 20, 123), (4, 10,
 123)).toDF("a", "b", "c")

 df.unpivot(Array.empty, "column", "value")
   .groupBy("column")
   .agg(collect_set("value").as("distinct_values"))

 The unpivot operation turns
 +---+---+---+
 |  a|  b|  c|
 +---+---+---+
 |  1| 10|123|
 |  2| 20|124|
 |  3| 20|123|
 |  4| 10|123|
 +---+---+---+

 into

 +--+-+
 |column|value|
 +--+-+
 | a|1|
 | b|   10|
 | c|  123|
 | a|2|
 | b|   20|
 | c|  124|
 | a|3|
 | b|   20|
 | c|  123|
 | a|4|
 | b|   10|
 | c|  123|
 +--+-+

 The groupBy("column").agg(collect_set("value").as("distinct_values"))
 collects distinct values per column:
 +--+---+

 |column|distinct_values|
 +--+---+
 | c| [123, 124]|
 | b|   [20, 10]|
 | a|   [1, 2, 3, 4]|
 +--+---+

 Note that unpivot only works if all columns have a "common" type. Then
 all columns are cast to that common type. If you have incompatible types
 like Integer and String, you would have to cast them all to String first:

 import org.apache.spark.sql.types.StringType

 df.select(df.columns.map(col(_).cast(StringType)): _*).unpivot(...)

 If you want to preserve the type of the values and have multiple value
 types, you cannot put everything into a DataFrame with one
 distinct_values column. You could still have multiple DataFrames, one
 per data type, and write those, or collect the DataFrame's values into 
 Maps:

 import scala.collection.immutable

 import org.apache.spark.sql.DataFrame
 import org.apache.spark.sql.functions.collect_set

 // if all you columns have the same type
 def distinctValuesPerColumnOneType(df: DataFrame):
 immutable.Map[String, immutable.Seq[Any]] = {
   df.unpivot(Array.empty, "column", "value")
 .groupBy("column")
 .agg(collect_set("value").as("distinct_values"))
 .collect()
 .map(row => row.getString(0) -> row.getSeq[Any](1).toList)
 .toMap
 }


 // if your columns have different types
 def distinctValuesPerColumn(df: DataFrame): immutable.Map[String,
 immutable.Seq[Any]] = {
   df.schema.fields
 .groupBy(_.dataType)
 .mapValues(_.map(_.name))
 .par
 .map { case (dataType, columns) => df.select(columns.map(col): _*) }
 .map(distinctValuesPerColumnOneType)
 .flatten
 .toList
 .toMap
 }

 val df = Seq((1, 10, "one"), (2, 20, "two"), (3, 20, "one"), (4, 10,
 "one")).toDF("a", "b", "c")
 distinctValuesPerColumn(df)

 The result is: (list values are of original type)
 Map(b -> List(20, 10), a -> List(1, 2, 3, 4), c -> List(one, two))

 Hope this helps,
 Enrico


 Am 10.02.23 um 22:56 schrieb sam smith:

 Hi Apotolos,
 Can you suggest a better approach while keeping values within a
 dataframe?

Re: How to improve efficiency of this piece of code (returning distinct column values)

2023-02-12 Thread sam smith
@Sean Correct. But I was hoping to improve my solution even more.

Le dim. 12 févr. 2023 à 18:03, Sean Owen  a écrit :

> That's the answer, except, you can never select a result set into a column
> right? you just collect() each of those results. Or, what do you want? I'm
> not clear.
>
> On Sun, Feb 12, 2023 at 10:59 AM sam smith 
> wrote:
>
>> @Enrico Minack  Thanks for "unpivot" but I am
>> using version 3.3.0 (you are taking it way too far as usual :) )
>> @Sean Owen  Pls then show me how it can be improved by
>> code.
>>
>> Also, why such an approach (using withColumn() ) doesn't work:
>>
>> for (String columnName : df.columns()) {
>> df= df.withColumn(columnName,
>> df.select(columnName).distinct().col(columnName));
>> }
>>
>> Le sam. 11 févr. 2023 à 13:11, Enrico Minack  a
>> écrit :
>>
>>> You could do the entire thing in DataFrame world and write the result to
>>> disk. All you need is unpivot (to be released in Spark 3.4.0, soon).
>>>
>>> Note this is Scala but should be straightforward to translate into Java:
>>>
>>> import org.apache.spark.sql.functions.collect_set
>>>
>>> val df = Seq((1, 10, 123), (2, 20, 124), (3, 20, 123), (4, 10,
>>> 123)).toDF("a", "b", "c")
>>>
>>> df.unpivot(Array.empty, "column", "value")
>>>   .groupBy("column")
>>>   .agg(collect_set("value").as("distinct_values"))
>>>
>>> The unpivot operation turns
>>> +---+---+---+
>>> |  a|  b|  c|
>>> +---+---+---+
>>> |  1| 10|123|
>>> |  2| 20|124|
>>> |  3| 20|123|
>>> |  4| 10|123|
>>> +---+---+---+
>>>
>>> into
>>>
>>> +--+-+
>>> |column|value|
>>> +--+-+
>>> | a|1|
>>> | b|   10|
>>> | c|  123|
>>> | a|2|
>>> | b|   20|
>>> | c|  124|
>>> | a|3|
>>> | b|   20|
>>> | c|  123|
>>> | a|4|
>>> | b|   10|
>>> | c|  123|
>>> +--+-+
>>>
>>> The groupBy("column").agg(collect_set("value").as("distinct_values"))
>>> collects distinct values per column:
>>> +--+---+
>>>
>>> |column|distinct_values|
>>> +--+---+
>>> | c| [123, 124]|
>>> | b|   [20, 10]|
>>> | a|   [1, 2, 3, 4]|
>>> +--+---+
>>>
>>> Note that unpivot only works if all columns have a "common" type. Then
>>> all columns are cast to that common type. If you have incompatible types
>>> like Integer and String, you would have to cast them all to String first:
>>>
>>> import org.apache.spark.sql.types.StringType
>>>
>>> df.select(df.columns.map(col(_).cast(StringType)): _*).unpivot(...)
>>>
>>> If you want to preserve the type of the values and have multiple value
>>> types, you cannot put everything into a DataFrame with one
>>> distinct_values column. You could still have multiple DataFrames, one
>>> per data type, and write those, or collect the DataFrame's values into Maps:
>>>
>>> import scala.collection.immutable
>>>
>>> import org.apache.spark.sql.DataFrame
>>> import org.apache.spark.sql.functions.collect_set
>>>
>>> // if all you columns have the same type
>>> def distinctValuesPerColumnOneType(df: DataFrame): immutable.Map[String,
>>> immutable.Seq[Any]] = {
>>>   df.unpivot(Array.empty, "column", "value")
>>> .groupBy("column")
>>> .agg(collect_set("value").as("distinct_values"))
>>> .collect()
>>> .map(row => row.getString(0) -> row.getSeq[Any](1).toList)
>>> .toMap
>>> }
>>>
>>>
>>> // if your columns have different types
>>> def distinctValuesPerColumn(df: DataFrame): immutable.Map[String,
>>> immutable.Seq[Any]] = {
>>>   df.schema.fields
>>> .groupBy(_.dataType)
>>> .mapValues(_.map(_.name))
>>> .par
>>> .map { case (dataType, columns) => df.select(columns.map(col): _*) }
>>> .map(distinctValuesPerColumnOneType)
>>> .flatten
>>> .toList
>>> .toMap
>>> }
>>>
>>> val df = Seq((1, 10, "one"), (2, 20, "two"), (3, 20, "one"), (4, 10,
>>> "one")).toDF("a", "b", "c")
>>> distinctValuesPerColumn(df)
>>>
>>> The result is: (list values are of original type)
>>> Map(b -> List(20, 10), a -> List(1, 2, 3, 4), c -> List(one, two))
>>>
>>> Hope this helps,
>>> Enrico
>>>
>>>
>>> Am 10.02.23 um 22:56 schrieb sam smith:
>>>
>>> Hi Apotolos,
>>> Can you suggest a better approach while keeping values within a
>>> dataframe?
>>>
>>> Le ven. 10 févr. 2023 à 22:47, Apostolos N. Papadopoulos <
>>> papad...@csd.auth.gr> a écrit :
>>>
 Dear Sam,

 you are assuming that the data fits in the memory of your local
 machine. You are using as a basis a dataframe, which potentially can be
 very large, and then you are storing the data in local lists. Keep in mind
 that that the number of distinct elements in a column may be very large
 (depending on the app). I suggest to work on a solution that assumes that
 the number of distinct values is also large. Thus, you should keep your
 data in dataframes or RDDs, and store them as csv files, parquet, etc.

 a.p.


 On 10/2/23 23:40, sam smith wrote:

 I want to get the 

Re: How to improve efficiency of this piece of code (returning distinct column values)

2023-02-12 Thread Sean Owen
That's the answer, except, you can never select a result set into a column
right? you just collect() each of those results. Or, what do you want? I'm
not clear.

On Sun, Feb 12, 2023 at 10:59 AM sam smith 
wrote:

> @Enrico Minack  Thanks for "unpivot" but I am using
> version 3.3.0 (you are taking it way too far as usual :) )
> @Sean Owen  Pls then show me how it can be improved by
> code.
>
> Also, why such an approach (using withColumn() ) doesn't work:
>
> for (String columnName : df.columns()) {
> df= df.withColumn(columnName,
> df.select(columnName).distinct().col(columnName));
> }
>
> Le sam. 11 févr. 2023 à 13:11, Enrico Minack  a
> écrit :
>
>> You could do the entire thing in DataFrame world and write the result to
>> disk. All you need is unpivot (to be released in Spark 3.4.0, soon).
>>
>> Note this is Scala but should be straightforward to translate into Java:
>>
>> import org.apache.spark.sql.functions.collect_set
>>
>> val df = Seq((1, 10, 123), (2, 20, 124), (3, 20, 123), (4, 10,
>> 123)).toDF("a", "b", "c")
>>
>> df.unpivot(Array.empty, "column", "value")
>>   .groupBy("column")
>>   .agg(collect_set("value").as("distinct_values"))
>>
>> The unpivot operation turns
>> +---+---+---+
>> |  a|  b|  c|
>> +---+---+---+
>> |  1| 10|123|
>> |  2| 20|124|
>> |  3| 20|123|
>> |  4| 10|123|
>> +---+---+---+
>>
>> into
>>
>> +--+-+
>> |column|value|
>> +--+-+
>> | a|1|
>> | b|   10|
>> | c|  123|
>> | a|2|
>> | b|   20|
>> | c|  124|
>> | a|3|
>> | b|   20|
>> | c|  123|
>> | a|4|
>> | b|   10|
>> | c|  123|
>> +--+-+
>>
>> The groupBy("column").agg(collect_set("value").as("distinct_values"))
>> collects distinct values per column:
>> +--+---+
>>
>> |column|distinct_values|
>> +--+---+
>> | c| [123, 124]|
>> | b|   [20, 10]|
>> | a|   [1, 2, 3, 4]|
>> +--+---+
>>
>> Note that unpivot only works if all columns have a "common" type. Then
>> all columns are cast to that common type. If you have incompatible types
>> like Integer and String, you would have to cast them all to String first:
>>
>> import org.apache.spark.sql.types.StringType
>>
>> df.select(df.columns.map(col(_).cast(StringType)): _*).unpivot(...)
>>
>> If you want to preserve the type of the values and have multiple value
>> types, you cannot put everything into a DataFrame with one
>> distinct_values column. You could still have multiple DataFrames, one
>> per data type, and write those, or collect the DataFrame's values into Maps:
>>
>> import scala.collection.immutable
>>
>> import org.apache.spark.sql.DataFrame
>> import org.apache.spark.sql.functions.collect_set
>>
>> // if all you columns have the same type
>> def distinctValuesPerColumnOneType(df: DataFrame): immutable.Map[String,
>> immutable.Seq[Any]] = {
>>   df.unpivot(Array.empty, "column", "value")
>> .groupBy("column")
>> .agg(collect_set("value").as("distinct_values"))
>> .collect()
>> .map(row => row.getString(0) -> row.getSeq[Any](1).toList)
>> .toMap
>> }
>>
>>
>> // if your columns have different types
>> def distinctValuesPerColumn(df: DataFrame): immutable.Map[String,
>> immutable.Seq[Any]] = {
>>   df.schema.fields
>> .groupBy(_.dataType)
>> .mapValues(_.map(_.name))
>> .par
>> .map { case (dataType, columns) => df.select(columns.map(col): _*) }
>> .map(distinctValuesPerColumnOneType)
>> .flatten
>> .toList
>> .toMap
>> }
>>
>> val df = Seq((1, 10, "one"), (2, 20, "two"), (3, 20, "one"), (4, 10,
>> "one")).toDF("a", "b", "c")
>> distinctValuesPerColumn(df)
>>
>> The result is: (list values are of original type)
>> Map(b -> List(20, 10), a -> List(1, 2, 3, 4), c -> List(one, two))
>>
>> Hope this helps,
>> Enrico
>>
>>
>> Am 10.02.23 um 22:56 schrieb sam smith:
>>
>> Hi Apotolos,
>> Can you suggest a better approach while keeping values within a dataframe?
>>
>> Le ven. 10 févr. 2023 à 22:47, Apostolos N. Papadopoulos <
>> papad...@csd.auth.gr> a écrit :
>>
>>> Dear Sam,
>>>
>>> you are assuming that the data fits in the memory of your local machine.
>>> You are using as a basis a dataframe, which potentially can be very large,
>>> and then you are storing the data in local lists. Keep in mind that that
>>> the number of distinct elements in a column may be very large (depending on
>>> the app). I suggest to work on a solution that assumes that the number of
>>> distinct values is also large. Thus, you should keep your data in
>>> dataframes or RDDs, and store them as csv files, parquet, etc.
>>>
>>> a.p.
>>>
>>>
>>> On 10/2/23 23:40, sam smith wrote:
>>>
>>> I want to get the distinct values of each column in a List (is it good
>>> practice to use List here?), that contains as first element the column
>>> name, and the other element its distinct values so that for a dataset we
>>> get a list of lists, i do it this way (in my opinion no so fast):
>>>
>>> 

Re: How to improve efficiency of this piece of code (returning distinct column values)

2023-02-12 Thread sam smith
@Enrico Minack  Thanks for "unpivot" but I am using
version 3.3.0 (you are taking it way too far as usual :) )
@Sean Owen  Pls then show me how it can be improved by
code.

Also, why such an approach (using withColumn() ) doesn't work:

for (String columnName : df.columns()) {
df= df.withColumn(columnName,
df.select(columnName).distinct().col(columnName));
}

Le sam. 11 févr. 2023 à 13:11, Enrico Minack  a
écrit :

> You could do the entire thing in DataFrame world and write the result to
> disk. All you need is unpivot (to be released in Spark 3.4.0, soon).
>
> Note this is Scala but should be straightforward to translate into Java:
>
> import org.apache.spark.sql.functions.collect_set
>
> val df = Seq((1, 10, 123), (2, 20, 124), (3, 20, 123), (4, 10,
> 123)).toDF("a", "b", "c")
>
> df.unpivot(Array.empty, "column", "value")
>   .groupBy("column")
>   .agg(collect_set("value").as("distinct_values"))
>
> The unpivot operation turns
> +---+---+---+
> |  a|  b|  c|
> +---+---+---+
> |  1| 10|123|
> |  2| 20|124|
> |  3| 20|123|
> |  4| 10|123|
> +---+---+---+
>
> into
>
> +--+-+
> |column|value|
> +--+-+
> | a|1|
> | b|   10|
> | c|  123|
> | a|2|
> | b|   20|
> | c|  124|
> | a|3|
> | b|   20|
> | c|  123|
> | a|4|
> | b|   10|
> | c|  123|
> +--+-+
>
> The groupBy("column").agg(collect_set("value").as("distinct_values"))
> collects distinct values per column:
> +--+---+
>
> |column|distinct_values|
> +--+---+
> | c| [123, 124]|
> | b|   [20, 10]|
> | a|   [1, 2, 3, 4]|
> +--+---+
>
> Note that unpivot only works if all columns have a "common" type. Then all
> columns are cast to that common type. If you have incompatible types like
> Integer and String, you would have to cast them all to String first:
>
> import org.apache.spark.sql.types.StringType
>
> df.select(df.columns.map(col(_).cast(StringType)): _*).unpivot(...)
>
> If you want to preserve the type of the values and have multiple value
> types, you cannot put everything into a DataFrame with one distinct_values
> column. You could still have multiple DataFrames, one per data type, and
> write those, or collect the DataFrame's values into Maps:
>
> import scala.collection.immutable
>
> import org.apache.spark.sql.DataFrame
> import org.apache.spark.sql.functions.collect_set
>
> // if all you columns have the same type
> def distinctValuesPerColumnOneType(df: DataFrame): immutable.Map[String,
> immutable.Seq[Any]] = {
>   df.unpivot(Array.empty, "column", "value")
> .groupBy("column")
> .agg(collect_set("value").as("distinct_values"))
> .collect()
> .map(row => row.getString(0) -> row.getSeq[Any](1).toList)
> .toMap
> }
>
>
> // if your columns have different types
> def distinctValuesPerColumn(df: DataFrame): immutable.Map[String,
> immutable.Seq[Any]] = {
>   df.schema.fields
> .groupBy(_.dataType)
> .mapValues(_.map(_.name))
> .par
> .map { case (dataType, columns) => df.select(columns.map(col): _*) }
> .map(distinctValuesPerColumnOneType)
> .flatten
> .toList
> .toMap
> }
>
> val df = Seq((1, 10, "one"), (2, 20, "two"), (3, 20, "one"), (4, 10,
> "one")).toDF("a", "b", "c")
> distinctValuesPerColumn(df)
>
> The result is: (list values are of original type)
> Map(b -> List(20, 10), a -> List(1, 2, 3, 4), c -> List(one, two))
>
> Hope this helps,
> Enrico
>
>
> Am 10.02.23 um 22:56 schrieb sam smith:
>
> Hi Apotolos,
> Can you suggest a better approach while keeping values within a dataframe?
>
> Le ven. 10 févr. 2023 à 22:47, Apostolos N. Papadopoulos <
> papad...@csd.auth.gr> a écrit :
>
>> Dear Sam,
>>
>> you are assuming that the data fits in the memory of your local machine.
>> You are using as a basis a dataframe, which potentially can be very large,
>> and then you are storing the data in local lists. Keep in mind that that
>> the number of distinct elements in a column may be very large (depending on
>> the app). I suggest to work on a solution that assumes that the number of
>> distinct values is also large. Thus, you should keep your data in
>> dataframes or RDDs, and store them as csv files, parquet, etc.
>>
>> a.p.
>>
>>
>> On 10/2/23 23:40, sam smith wrote:
>>
>> I want to get the distinct values of each column in a List (is it good
>> practice to use List here?), that contains as first element the column
>> name, and the other element its distinct values so that for a dataset we
>> get a list of lists, i do it this way (in my opinion no so fast):
>>
>> List> finalList = new ArrayList>();
>> Dataset df = spark.read().format("csv").option("header", 
>> "true").load("/pathToCSV");
>> String[] columnNames = df.columns();
>>  for (int i=0;i> List columnList = new ArrayList();
>>
>> columnList.add(columnNames[i]);
>>
>>
>> List columnValues = 
>> 

Re: How to improve efficiency of this piece of code (returning distinct column values)

2023-02-11 Thread Enrico Minack
You could do the entire thing in DataFrame world and write the result to 
disk. All you need is unpivot (to be released in Spark 3.4.0, soon).


Note this is Scala but should be straightforward to translate into Java:

import org.apache.spark.sql.functions.collect_set

val df = Seq((1, 10, 123), (2, 20, 124), (3, 20, 123), (4, 10, 
123)).toDF("a", "b", "c")


df.unpivot(Array.empty, "column", "value")
  .groupBy("column")
  .agg(collect_set("value").as("distinct_values"))

The unpivot operation turns
+---+---+---+
|  a|  b|  c|
+---+---+---+
|  1| 10|123|
|  2| 20|124|
|  3| 20|123|
|  4| 10|123|
+---+---+---+

into

+--+-+
|column|value|
+--+-+
| a|    1|
| b|   10|
| c|  123|
| a|    2|
| b|   20|
| c|  124|
| a|    3|
| b|   20|
| c|  123|
| a|    4|
| b|   10|
| c|  123|
+--+-+

The groupBy("column").agg(collect_set("value").as("distinct_values")) 
collects distinct values per column:

+--+---+
|column|distinct_values|
+--+---+
| c| [123, 124]|
| b|   [20, 10]|
| a|   [1, 2, 3, 4]|
+--+---+

Note that unpivot only works if all columns have a "common" type. Then 
all columns are cast to that common type. If you have incompatible types 
like Integer and String, you would have to cast them all to String first:


import org.apache.spark.sql.types.StringType

df.select(df.columns.map(col(_).cast(StringType)): _*).unpivot(...)

If you want to preserve the type of the values and have multiple value 
types, you cannot put everything into a DataFrame with one 
distinct_values column. You could still have multiple DataFrames, one 
per data type, and write those, or collect the DataFrame's values into Maps:


import scala.collection.immutable

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.collect_set

// if all you columns have the same type
def distinctValuesPerColumnOneType(df: DataFrame): immutable.Map[String, 
immutable.Seq[Any]] = {

  df.unpivot(Array.empty, "column", "value")
    .groupBy("column")
    .agg(collect_set("value").as("distinct_values"))
    .collect()
    .map(row => row.getString(0) -> row.getSeq[Any](1).toList)
    .toMap
}


// if your columns have different types
def distinctValuesPerColumn(df: DataFrame): immutable.Map[String, 
immutable.Seq[Any]] = {

  df.schema.fields
    .groupBy(_.dataType)
    .mapValues(_.map(_.name))
    .par
    .map { case (dataType, columns) => df.select(columns.map(col): _*) }
    .map(distinctValuesPerColumnOneType)
    .flatten
    .toList
    .toMap
}

val df = Seq((1, 10, "one"), (2, 20, "two"), (3, 20, "one"), (4, 10, 
"one")).toDF("a", "b", "c")

distinctValuesPerColumn(df)

The result is: (list values are of original type)
Map(b -> List(20, 10), a -> List(1, 2, 3, 4), c -> List(one, two))

Hope this helps,
Enrico


Am 10.02.23 um 22:56 schrieb sam smith:

Hi Apotolos,
Can you suggest a better approach while keeping values within a dataframe?

Le ven. 10 févr. 2023 à 22:47, Apostolos N. Papadopoulos 
 a écrit :


Dear Sam,

you are assuming that the data fits in the memory of your local
machine. You are using as a basis a dataframe, which potentially
can be very large, and then you are storing the data in local
lists. Keep in mind that that the number of distinct elements in a
column may be very large (depending on the app). I suggest to work
on a solution that assumes that the number of distinct values is
also large. Thus, you should keep your data in dataframes or RDDs,
and store them as csv files, parquet, etc.

a.p.


On 10/2/23 23:40, sam smith wrote:

I want to get the distinct values of each column in a List (is it
good practice to use List here?), that contains as first element
the column name, and the other element its distinct values so
that for a dataset we get a list of lists, i do it this way (in
my opinion no so fast):

|List> finalList = new ArrayList>();
Dataset df = spark.read().format("csv").option("header",
"true").load("/pathToCSV"); String[] columnNames = df.columns();
for (int i=0;i columnList
= new ArrayList(); columnList.add(columnNames[i]);
List columnValues =

df.filter(org.apache.spark.sql.functions.col(columnNames[i]).isNotNull()).select(columnNames[i]).distinct().collectAsList();
for (int j=0;j

-- 
Apostolos N. Papadopoulos, Associate Professor

Department of Informatics
Aristotle University of Thessaloniki
Thessaloniki, GREECE
tel: ++0030312310991918
email:papad...@csd.auth.gr
twitter: @papadopoulos_ap
web:http://datalab.csd.auth.gr/~apostol



Re: How to improve efficiency of this piece of code (returning distinct column values)

2023-02-10 Thread Sean Owen
Why would csv or a temp table change anything here? You don't need
windowing for distinct values either

On Fri, Feb 10, 2023, 6:01 PM Mich Talebzadeh 
wrote:

> on top of my head, create a dataframe reading CSV file.
>
> This is python
>
>  listing_df =
> spark.read.format("com.databricks.spark.csv").option("inferSchema",
> "true").option("header", "true").load(csv_file)
>  listing_df.printSchema()
>  listing_df.createOrReplaceTempView("temp")
>
> ## do your distinct columns using windowing functions on temp table with
> SQL
>
>  HTH
>
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Fri, 10 Feb 2023 at 21:59, sam smith 
> wrote:
>
>> I am not sure i understand well " Just need to do the cols one at a
>> time". Plus I think Apostolos is right, this needs a dataframe approach not
>> a list approach.
>>
>> Le ven. 10 févr. 2023 à 22:47, Sean Owen  a écrit :
>>
>>> For each column, select only that call and get distinct values. Similar
>>> to what you do here. Just need to do the cols one at a time. Your current
>>> code doesn't do what you want.
>>>
>>> On Fri, Feb 10, 2023, 3:46 PM sam smith 
>>> wrote:
>>>
 Hi Sean,

 "You need to select the distinct values of each col one at a time", how
 ?

 Le ven. 10 févr. 2023 à 22:40, Sean Owen  a écrit :

> That gives you all distinct tuples of those col values. You need to
> select the distinct values of each col one at a time. Sure just collect()
> the result as you do here.
>
> On Fri, Feb 10, 2023, 3:34 PM sam smith 
> wrote:
>
>> I want to get the distinct values of each column in a List (is it
>> good practice to use List here?), that contains as first element the 
>> column
>> name, and the other element its distinct values so that for a dataset we
>> get a list of lists, i do it this way (in my opinion no so fast):
>>
>> List> finalList = new ArrayList>();
>> Dataset df = spark.read().format("csv").option("header", 
>> "true").load("/pathToCSV");
>> String[] columnNames = df.columns();
>>  for (int i=0;i> List columnList = new ArrayList();
>>
>> columnList.add(columnNames[i]);
>>
>>
>> List columnValues = 
>> df.filter(org.apache.spark.sql.functions.col(columnNames[i]).isNotNull()).select(columnNames[i]).distinct().collectAsList();
>> for (int j=0;j> columnList.add(columnValues.get(j).apply(0).toString());
>>
>> finalList.add(columnList);
>>
>>
>> How to improve this?
>>
>> Also, can I get the results in JSON format?
>>
>


Re: How to improve efficiency of this piece of code (returning distinct column values)

2023-02-10 Thread Mich Talebzadeh
on top of my head, create a dataframe reading CSV file.

This is python

 listing_df =
spark.read.format("com.databricks.spark.csv").option("inferSchema",
"true").option("header", "true").load(csv_file)
 listing_df.printSchema()
 listing_df.createOrReplaceTempView("temp")

## do your distinct columns using windowing functions on temp table with SQL

 HTH



   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Fri, 10 Feb 2023 at 21:59, sam smith  wrote:

> I am not sure i understand well " Just need to do the cols one at a time".
> Plus I think Apostolos is right, this needs a dataframe approach not a list
> approach.
>
> Le ven. 10 févr. 2023 à 22:47, Sean Owen  a écrit :
>
>> For each column, select only that call and get distinct values. Similar
>> to what you do here. Just need to do the cols one at a time. Your current
>> code doesn't do what you want.
>>
>> On Fri, Feb 10, 2023, 3:46 PM sam smith 
>> wrote:
>>
>>> Hi Sean,
>>>
>>> "You need to select the distinct values of each col one at a time", how
>>> ?
>>>
>>> Le ven. 10 févr. 2023 à 22:40, Sean Owen  a écrit :
>>>
 That gives you all distinct tuples of those col values. You need to
 select the distinct values of each col one at a time. Sure just collect()
 the result as you do here.

 On Fri, Feb 10, 2023, 3:34 PM sam smith 
 wrote:

> I want to get the distinct values of each column in a List (is it good
> practice to use List here?), that contains as first element the column
> name, and the other element its distinct values so that for a dataset we
> get a list of lists, i do it this way (in my opinion no so fast):
>
> List> finalList = new ArrayList>();
> Dataset df = spark.read().format("csv").option("header", 
> "true").load("/pathToCSV");
> String[] columnNames = df.columns();
>  for (int i=0;i List columnList = new ArrayList();
>
> columnList.add(columnNames[i]);
>
>
> List columnValues = 
> df.filter(org.apache.spark.sql.functions.col(columnNames[i]).isNotNull()).select(columnNames[i]).distinct().collectAsList();
> for (int j=0;j columnList.add(columnValues.get(j).apply(0).toString());
>
> finalList.add(columnList);
>
>
> How to improve this?
>
> Also, can I get the results in JSON format?
>



Re: How to improve efficiency of this piece of code (returning distinct column values)

2023-02-10 Thread sam smith
I am not sure i understand well " Just need to do the cols one at a time".
Plus I think Apostolos is right, this needs a dataframe approach not a list
approach.

Le ven. 10 févr. 2023 à 22:47, Sean Owen  a écrit :

> For each column, select only that call and get distinct values. Similar to
> what you do here. Just need to do the cols one at a time. Your current code
> doesn't do what you want.
>
> On Fri, Feb 10, 2023, 3:46 PM sam smith 
> wrote:
>
>> Hi Sean,
>>
>> "You need to select the distinct values of each col one at a time", how ?
>>
>> Le ven. 10 févr. 2023 à 22:40, Sean Owen  a écrit :
>>
>>> That gives you all distinct tuples of those col values. You need to
>>> select the distinct values of each col one at a time. Sure just collect()
>>> the result as you do here.
>>>
>>> On Fri, Feb 10, 2023, 3:34 PM sam smith 
>>> wrote:
>>>
 I want to get the distinct values of each column in a List (is it good
 practice to use List here?), that contains as first element the column
 name, and the other element its distinct values so that for a dataset we
 get a list of lists, i do it this way (in my opinion no so fast):

 List> finalList = new ArrayList>();
 Dataset df = spark.read().format("csv").option("header", 
 "true").load("/pathToCSV");
 String[] columnNames = df.columns();
  for (int i=0;i>>> List columnList = new ArrayList();

 columnList.add(columnNames[i]);


 List columnValues = 
 df.filter(org.apache.spark.sql.functions.col(columnNames[i]).isNotNull()).select(columnNames[i]).distinct().collectAsList();
 for (int j=0;j>>> columnList.add(columnValues.get(j).apply(0).toString());

 finalList.add(columnList);


 How to improve this?

 Also, can I get the results in JSON format?

>>>


Re: How to improve efficiency of this piece of code (returning distinct column values)

2023-02-10 Thread sam smith
Hi Apotolos,
Can you suggest a better approach while keeping values within a dataframe?

Le ven. 10 févr. 2023 à 22:47, Apostolos N. Papadopoulos <
papad...@csd.auth.gr> a écrit :

> Dear Sam,
>
> you are assuming that the data fits in the memory of your local machine.
> You are using as a basis a dataframe, which potentially can be very large,
> and then you are storing the data in local lists. Keep in mind that that
> the number of distinct elements in a column may be very large (depending on
> the app). I suggest to work on a solution that assumes that the number of
> distinct values is also large. Thus, you should keep your data in
> dataframes or RDDs, and store them as csv files, parquet, etc.
>
> a.p.
>
>
> On 10/2/23 23:40, sam smith wrote:
>
> I want to get the distinct values of each column in a List (is it good
> practice to use List here?), that contains as first element the column
> name, and the other element its distinct values so that for a dataset we
> get a list of lists, i do it this way (in my opinion no so fast):
>
> List> finalList = new ArrayList>();
> Dataset df = spark.read().format("csv").option("header", 
> "true").load("/pathToCSV");
> String[] columnNames = df.columns();
>  for (int i=0;i List columnList = new ArrayList();
>
> columnList.add(columnNames[i]);
>
>
> List columnValues = 
> df.filter(org.apache.spark.sql.functions.col(columnNames[i]).isNotNull()).select(columnNames[i]).distinct().collectAsList();
> for (int j=0;j columnList.add(columnValues.get(j).apply(0).toString());
>
> finalList.add(columnList);
>
>
> How to improve this?
>
> Also, can I get the results in JSON format?
>
> --
> Apostolos N. Papadopoulos, Associate Professor
> Department of Informatics
> Aristotle University of Thessaloniki
> Thessaloniki, GREECE
> tel: ++0030312310991918
> email: papad...@csd.auth.gr
> twitter: @papadopoulos_ap
> web: http://datalab.csd.auth.gr/~apostol
>
>


Re: How to improve efficiency of this piece of code (returning distinct column values)

2023-02-10 Thread sam smith
Hi Sean,

"You need to select the distinct values of each col one at a time", how ?

Le ven. 10 févr. 2023 à 22:40, Sean Owen  a écrit :

> That gives you all distinct tuples of those col values. You need to select
> the distinct values of each col one at a time. Sure just collect() the
> result as you do here.
>
> On Fri, Feb 10, 2023, 3:34 PM sam smith 
> wrote:
>
>> I want to get the distinct values of each column in a List (is it good
>> practice to use List here?), that contains as first element the column
>> name, and the other element its distinct values so that for a dataset we
>> get a list of lists, i do it this way (in my opinion no so fast):
>>
>> List> finalList = new ArrayList>();
>> Dataset df = spark.read().format("csv").option("header", 
>> "true").load("/pathToCSV");
>> String[] columnNames = df.columns();
>>  for (int i=0;i> List columnList = new ArrayList();
>>
>> columnList.add(columnNames[i]);
>>
>>
>> List columnValues = 
>> df.filter(org.apache.spark.sql.functions.col(columnNames[i]).isNotNull()).select(columnNames[i]).distinct().collectAsList();
>> for (int j=0;j> columnList.add(columnValues.get(j).apply(0).toString());
>>
>> finalList.add(columnList);
>>
>>
>> How to improve this?
>>
>> Also, can I get the results in JSON format?
>>
>


Re: How to improve efficiency of this piece of code (returning distinct column values)

2023-02-10 Thread Apostolos N. Papadopoulos

Dear Sam,

you are assuming that the data fits in the memory of your local machine. 
You are using as a basis a dataframe, which potentially can be very 
large, and then you are storing the data in local lists. Keep in mind 
that that the number of distinct elements in a column may be very large 
(depending on the app). I suggest to work on a solution that assumes 
that the number of distinct values is also large. Thus, you should keep 
your data in dataframes or RDDs, and store them as csv files, parquet, etc.


a.p.


On 10/2/23 23:40, sam smith wrote:
I want to get the distinct values of each column in a List (is it good 
practice to use List here?), that contains as first element the column 
name, and the other element its distinct values so that for a dataset 
we get a list of lists, i do it this way (in my opinion no so fast):


|List> finalList = new ArrayList>(); 
Dataset df = spark.read().format("csv").option("header", 
"true").load("/pathToCSV"); String[] columnNames = df.columns(); for 
(int i=0;i columnList = new 
ArrayList(); columnList.add(columnNames[i]); List 
columnValues = 
df.filter(org.apache.spark.sql.functions.col(columnNames[i]).isNotNull()).select(columnNames[i]).distinct().collectAsList(); 
for (int j=0;jcolumnList.add(columnValues.get(j).apply(0).toString()); 
finalList.add(columnList);|


How to improve this?

Also, can I get the results in JSON format?


--
Apostolos N. Papadopoulos, Associate Professor
Department of Informatics
Aristotle University of Thessaloniki
Thessaloniki, GREECE
tel: ++0030312310991918
email:papad...@csd.auth.gr
twitter: @papadopoulos_ap
web:http://datalab.csd.auth.gr/~apostol


Re: How to improve efficiency of this piece of code (returning distinct column values)

2023-02-10 Thread Sean Owen
That gives you all distinct tuples of those col values. You need to select
the distinct values of each col one at a time. Sure just collect() the
result as you do here.

On Fri, Feb 10, 2023, 3:34 PM sam smith  wrote:

> I want to get the distinct values of each column in a List (is it good
> practice to use List here?), that contains as first element the column
> name, and the other element its distinct values so that for a dataset we
> get a list of lists, i do it this way (in my opinion no so fast):
>
> List> finalList = new ArrayList>();
> Dataset df = spark.read().format("csv").option("header", 
> "true").load("/pathToCSV");
> String[] columnNames = df.columns();
>  for (int i=0;i List columnList = new ArrayList();
>
> columnList.add(columnNames[i]);
>
>
> List columnValues = 
> df.filter(org.apache.spark.sql.functions.col(columnNames[i]).isNotNull()).select(columnNames[i]).distinct().collectAsList();
> for (int j=0;j columnList.add(columnValues.get(j).apply(0).toString());
>
> finalList.add(columnList);
>
>
> How to improve this?
>
> Also, can I get the results in JSON format?
>


How to improve efficiency of this piece of code (returning distinct column values)

2023-02-10 Thread sam smith
I want to get the distinct values of each column in a List (is it good
practice to use List here?), that contains as first element the column
name, and the other element its distinct values so that for a dataset we
get a list of lists, i do it this way (in my opinion no so fast):

List> finalList = new ArrayList>();
Dataset df = spark.read().format("csv").option("header",
"true").load("/pathToCSV");
String[] columnNames = df.columns();
 for (int i=0;i columnList = new ArrayList();

columnList.add(columnNames[i]);


List columnValues =
df.filter(org.apache.spark.sql.functions.col(columnNames[i]).isNotNull()).select(columnNames[i]).distinct().collectAsList();
for (int j=0;j