[jira] [Created] (SPARK-22241) Apache spark giving InvalidSchemaException: Cannot write a schema with an empty group: optional group element {

2017-10-10 Thread Ritika Maheshwari (JIRA)
Ritika Maheshwari created SPARK-22241:
-

 Summary: Apache spark giving InvalidSchemaException: Cannot write 
a schema with an empty group: optional group element {
 Key: SPARK-22241
 URL: https://issues.apache.org/jira/browse/SPARK-22241
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 2.2.0
Reporter: Ritika Maheshwari
Priority: Minor






--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22241) Apache spark giving InvalidSchemaException: Cannot write a schema with an empty group: optional group element {

2017-10-10 Thread Ritika Maheshwari (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22241?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16199724#comment-16199724
 ] 

Ritika Maheshwari commented on SPARK-22241:
---

I know parquet does not allow empty struct types. But is this something that 
the Encoder should handle when generating the schema

> Apache spark giving InvalidSchemaException: Cannot write a schema with an 
> empty group: optional group element {
> ---
>
> Key: SPARK-22241
> URL: https://issues.apache.org/jira/browse/SPARK-22241
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0
>Reporter: Ritika Maheshwari
>Priority: Minor
>
> I have a bean which has field of type Arraylist of Doubles. Then I do the 
> following
> Encoder beanEncoder = Encoders.bean(jClass);
> Dataset df = spark.createDataset( 
> Collections.singletonList((T)extractedObj),beanEncoder);
> The schema generated says:
> |-- pixelSpacing: array (nullable = true)
> |-- element: struct (containsNull = true)
> Now I try to save this Dataset as parquet 
> df.write().mode(SaveMode.Append).parquet(jClass.getName()+"_parquet");
> and I get the error Caused by: org.apache.parquet.schema.
> InvalidSchemaException: Cannot write a schema with an empty group: optional 
> group element {}
> Kindly direct how to specify an ArrayList of Strings or Doubles in the Bean 
> passed to the encoders so that the schema generated can be saved in paraquet



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-22241) Apache spark giving InvalidSchemaException: Cannot write a schema with an empty group: optional group element {

2017-10-10 Thread Ritika Maheshwari (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22241?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ritika Maheshwari updated SPARK-22241:
--
Description: 
I have a bean which has field of type Arraylist of Doubles. Then I do the 
following

Encoder beanEncoder = Encoders.bean(jClass);
Dataset df = spark.createDataset( 
Collections.singletonList((T)extractedObj),beanEncoder);
The schema generated says:

|-- pixelSpacing: array (nullable = true)
|-- element: struct (containsNull = true)

Now I try to save this Dataset as parquet 

df.write().mode(SaveMode.Append).parquet(jClass.getName()+"_parquet");

and I get the error Caused by: org.apache.parquet.schema.

InvalidSchemaException: Cannot write a schema with an empty group: optional 
group element {}

Kindly direct how to specify an ArrayList of Strings or Doubles in the Bean 
passed to the encoders so that the schema generated can be saved in paraquet



> Apache spark giving InvalidSchemaException: Cannot write a schema with an 
> empty group: optional group element {
> ---
>
> Key: SPARK-22241
> URL: https://issues.apache.org/jira/browse/SPARK-22241
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0
>Reporter: Ritika Maheshwari
>Priority: Minor
>
> I have a bean which has field of type Arraylist of Doubles. Then I do the 
> following
> Encoder beanEncoder = Encoders.bean(jClass);
> Dataset df = spark.createDataset( 
> Collections.singletonList((T)extractedObj),beanEncoder);
> The schema generated says:
> |-- pixelSpacing: array (nullable = true)
> |-- element: struct (containsNull = true)
> Now I try to save this Dataset as parquet 
> df.write().mode(SaveMode.Append).parquet(jClass.getName()+"_parquet");
> and I get the error Caused by: org.apache.parquet.schema.
> InvalidSchemaException: Cannot write a schema with an empty group: optional 
> group element {}
> Kindly direct how to specify an ArrayList of Strings or Doubles in the Bean 
> passed to the encoders so that the schema generated can be saved in paraquet



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-45414) spark-xml misplaces string tag content

2023-11-07 Thread Ritika Maheshwari (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-45414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17783832#comment-17783832
 ] 

Ritika Maheshwari commented on SPARK-45414:
---

Change _SerialNumberFlag to. SerialNumberFlag and _Action to Action. Then the 
query runs without error.

***


123
123

MyDescription












Value1














GroupA
CodeA
1





Add





> spark-xml misplaces string tag content
> --
>
> Key: SPARK-45414
> URL: https://issues.apache.org/jira/browse/SPARK-45414
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark, Spark Core
>Affects Versions: 3.3.0
>Reporter: Giuseppe Ceravolo
>Priority: Critical
> Attachments: IllegalArgumentException.txt
>
>
> h1. Intro
> Hi all! Please expect some degree of incompleteness in this issue as this is 
> the very first one I post, and feel free to edit it as you like - I welcome 
> your feedback.
> My goal is to provide you with as many details and indications as I can on 
> this issue that I am currently facing with a Client of mine on its Production 
> environment (we use Azure Databricks DBR 11.3 LTS).
> I was told by Sean Owen [[srowen (Sean Owen) 
> (github.com)|https://github.com/srowen]], who maintains the spark-xml maven 
> repository on GitHub [[https://github.com/srowen/spark-xml]] to post an issue 
> here because "This code has been ported to Apache Spark now anyway so won't 
> be updated here" (refer to his comment [here|#issuecomment-1744792958]).
> h1. Issue
> When I write a DataFrame into xml format via the spark-xml library either (1) 
> I get an error if empty string columns are in between non-string nested ones 
> or (2) if I put all string columns at the end then I get a wrong xml where 
> the content of string tags are misplaced into the following ones.
> h1. Code to reproduce the issue
> Please find below the end-to-end code snippet that results into the error
> h2. CASE (1): ERROR
> When empty strings are in between non-string nested ones, the write fails 
> with the following error.
> _Caused by: java.lang.IllegalArgumentException: Failed to convert value 
> MyDescription (class of class java.lang.String) in type 
> ArrayType(StructType(StructField(_ID,StringType,true),StructField(_Level,StringType,true)),true)
>  to XML._
> Please find attached the full trace of the error.
> {code:python}
> fake_file_df = spark \
>     .sql(
>         """SELECT
>             CAST(STRUCT('ItemId' AS `_Type`, '123' AS `_VALUE`) AS 
> STRUCT<_Type: STRING, _VALUE: STRING>) AS ItemID,
>             CAST(STRUCT('UPC' AS `_Type`, '123' AS `_VALUE`) AS STRUCT<_Type: 
> STRING, _VALUE: STRING>) AS UPC,
>             CAST('' AS STRING) AS _SerialNumberFlag,
>             CAST('MyDescription' AS STRING) AS Description,
>             CAST(ARRAY(STRUCT(NULL AS `_ID`, NULL AS `_Level`)) AS 
> ARRAY>) AS MerchandiseHierarchy,
>             CAST(ARRAY(STRUCT(NULL AS `_ValueTypeCode`, NULL AS `_VALUE`)) AS 
> ARRAY>) AS ItemPrice,
>             CAST('' AS STRING) AS Color,
>             CAST('' AS STRING) AS IntendedIndustry,
>             CAST(STRUCT(NULL AS `Name`) AS STRUCT) AS 
> Manufacturer,
>             CAST(STRUCT(NULL AS `Season`) AS STRUCT) AS 
> Marketing,
>             CAST(STRUCT(NULL AS `_Name`) AS STRUCT<_Name: STRING>) AS 
> BrandOwner,
>             CAST(ARRAY(STRUCT('Attribute1' AS `_Name`, 'Value1' AS `_VALUE`)) 
> AS ARRAY>) AS 
> ItemAttribute_culinary,
>             CAST(ARRAY(STRUCT(NULL AS `_Name`, ARRAY(ARRAY(STRUCT(NULL AS 
> `AttributeCode`, NULL AS `AttributeValue`))) AS `_VALUE`)) AS 
> ARRAY AttributeValue: STRING>) AS ItemAttribute_noculinary,
>             CAST(STRUCT(STRUCT(NULL AS `_UnitOfMeasure`, NULL AS `_VALUE`) AS 
> `Depth`, STRUCT(NULL AS `_UnitOfMeasure`, NULL AS `_VALUE`) AS `Height`, 
> STRUCT(NULL AS `_UnitOfMeasure`, NULL AS `_VALUE`) AS `Width`, STRUCT(NULL AS 
> `_UnitOfMeasure`, NULL AS `_VALUE`) AS `Diameter`) AS STRUCT STRUCT<_UnitOfMeasure: STRING, _VALUE: STRING>, Height: 
> STRUCT<_UnitOfMeasure: STRING, _VALUE: STRING>, Width: STRUCT<_UnitOfMeasure: 
> STRING, _VALUE: STRING>, Diameter: STRUCT<_UnitOfMeasure: STRING, _VALUE: 
> STRING>>) AS ItemMeasurements,
>             CAST(STRUCT('GroupA' AS `TaxGroupID`, 'CodeA' AS `TaxExemptCode`, 
> '1' AS `TaxAmount`) AS STRUCT TaxAmount: STRING>) AS TaxInformation,
>             CAST('' AS STRING) AS ItemImageUrl,
>             

[jira] [Commented] (SPARK-45414) spark-xml misplaces string tag content

2023-11-07 Thread Ritika Maheshwari (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-45414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17783841#comment-17783841
 ] 

Ritika Maheshwari commented on SPARK-45414:
---

In that case have both of these in the end like this
CAST(ARRAY(ARRAY(STRUCT(NULL AS `_action`, NULL AS `_franchiseeId`, NULL AS 
`_franchiseeName`))) AS 
 ARRAY>>) AS ItemFranchisees,
CAST('Add' AS STRING) AS _Action,
CAST('' AS STRING) AS _SerialNumberFlag
*

123
123
MyDescription












Value1














GroupA
CodeA
1










> spark-xml misplaces string tag content
> --
>
> Key: SPARK-45414
> URL: https://issues.apache.org/jira/browse/SPARK-45414
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark, Spark Core
>Affects Versions: 3.3.0
>Reporter: Giuseppe Ceravolo
>Priority: Critical
> Attachments: IllegalArgumentException.txt
>
>
> h1. Intro
> Hi all! Please expect some degree of incompleteness in this issue as this is 
> the very first one I post, and feel free to edit it as you like - I welcome 
> your feedback.
> My goal is to provide you with as many details and indications as I can on 
> this issue that I am currently facing with a Client of mine on its Production 
> environment (we use Azure Databricks DBR 11.3 LTS).
> I was told by Sean Owen [[srowen (Sean Owen) 
> (github.com)|https://github.com/srowen]], who maintains the spark-xml maven 
> repository on GitHub [[https://github.com/srowen/spark-xml]] to post an issue 
> here because "This code has been ported to Apache Spark now anyway so won't 
> be updated here" (refer to his comment [here|#issuecomment-1744792958]).
> h1. Issue
> When I write a DataFrame into xml format via the spark-xml library either (1) 
> I get an error if empty string columns are in between non-string nested ones 
> or (2) if I put all string columns at the end then I get a wrong xml where 
> the content of string tags are misplaced into the following ones.
> h1. Code to reproduce the issue
> Please find below the end-to-end code snippet that results into the error
> h2. CASE (1): ERROR
> When empty strings are in between non-string nested ones, the write fails 
> with the following error.
> _Caused by: java.lang.IllegalArgumentException: Failed to convert value 
> MyDescription (class of class java.lang.String) in type 
> ArrayType(StructType(StructField(_ID,StringType,true),StructField(_Level,StringType,true)),true)
>  to XML._
> Please find attached the full trace of the error.
> {code:python}
> fake_file_df = spark \
>     .sql(
>         """SELECT
>             CAST(STRUCT('ItemId' AS `_Type`, '123' AS `_VALUE`) AS 
> STRUCT<_Type: STRING, _VALUE: STRING>) AS ItemID,
>             CAST(STRUCT('UPC' AS `_Type`, '123' AS `_VALUE`) AS STRUCT<_Type: 
> STRING, _VALUE: STRING>) AS UPC,
>             CAST('' AS STRING) AS _SerialNumberFlag,
>             CAST('MyDescription' AS STRING) AS Description,
>             CAST(ARRAY(STRUCT(NULL AS `_ID`, NULL AS `_Level`)) AS 
> ARRAY>) AS MerchandiseHierarchy,
>             CAST(ARRAY(STRUCT(NULL AS `_ValueTypeCode`, NULL AS `_VALUE`)) AS 
> ARRAY>) AS ItemPrice,
>             CAST('' AS STRING) AS Color,
>             CAST('' AS STRING) AS IntendedIndustry,
>             CAST(STRUCT(NULL AS `Name`) AS STRUCT) AS 
> Manufacturer,
>             CAST(STRUCT(NULL AS `Season`) AS STRUCT) AS 
> Marketing,
>             CAST(STRUCT(NULL AS `_Name`) AS STRUCT<_Name: STRING>) AS 
> BrandOwner,
>             CAST(ARRAY(STRUCT('Attribute1' AS `_Name`, 'Value1' AS `_VALUE`)) 
> AS ARRAY>) AS 
> ItemAttribute_culinary,
>             CAST(ARRAY(STRUCT(NULL AS `_Name`, ARRAY(ARRAY(STRUCT(NULL AS 
> `AttributeCode`, NULL AS `AttributeValue`))) AS `_VALUE`)) AS 
> ARRAY AttributeValue: STRING>) AS ItemAttribute_noculinary,
>             CAST(STRUCT(STRUCT(NULL AS `_UnitOfMeasure`, NULL AS `_VALUE`) AS 
> `Depth`, STRUCT(NULL AS `_UnitOfMeasure`, NULL AS `_VALUE`) AS `Height`, 
> STRUCT(NULL AS `_UnitOfMeasure`, NULL AS `_VALUE`) AS `Width`, STRUCT(NULL AS 
> `_UnitOfMeasure`, NULL AS `_VALUE`) AS `Diameter`) AS STRUCT STRUCT<_UnitOfMeasure: STRING, _VALUE: STRING>, Height: 
> STRUCT<_UnitOfMeasure: STRING, _VALUE: STRING>, Width: STRUCT<_UnitOfMeasure: 
> STRING, _VALUE: STRING>, Diameter: STRUCT<_UnitOfMeasure: STRING, _VALUE: 
> STRING>>) AS ItemMeasurements,
>             CAST(STRUCT('GroupA' AS `TaxGroupID`, 'CodeA' AS 

[jira] [Commented] (SPARK-41236) The renamed field name cannot be recognized after group filtering

2022-11-30 Thread Ritika Maheshwari (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-41236?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17641576#comment-17641576
 ] 

Ritika Maheshwari commented on SPARK-41236:
---

Hello Zhong ,

Try to rename the field as a different name than the original column name.
select collect_set(age) as ageCol
from db_table.table1
group by name
having size(ageCol) > 1 
 

 

> The renamed field name cannot be recognized after group filtering
> -
>
> Key: SPARK-41236
> URL: https://issues.apache.org/jira/browse/SPARK-41236
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.2.0
>Reporter: jingxiong zhong
>Priority: Major
>
> {code:java}
> select collect_set(age) as age
> from db_table.table1
> group by name
> having size(age) > 1 
> {code}
> a simple sql, it work well in spark2.4, but doesn't work in spark3.2.0
> Is it a bug or a new standard?
> h3. *like this:*
> {code:sql}
> create db1.table1(age int, name string);
> insert into db1.table1 values(1, 'a');
> insert into db1.table1 values(2, 'b');
> insert into db1.table1 values(3, 'c');
> --then run sql like this 
> select collect_set(age) as age from db1.table1 group by name having size(age) 
> > 1 ;
> {code}
> h3. Stack Information
> org.apache.spark.sql.AnalysisException: cannot resolve 'age' given input 
> columns: [age]; line 4 pos 12;
> 'Filter (size('age, true) > 1)
> +- Aggregate [name#2], [collect_set(age#1, 0, 0) AS age#0]
>+- SubqueryAlias spark_catalog.db1.table1
>   +- HiveTableRelation [`db1`.`table1`, 
> org.apache.hadoop.hive.ql.io.orc.OrcSerde, Data Cols: [age#1, name#2], 
> Partition Cols: []]
>   at 
> org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:54)
>   at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:179)
>   at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:175)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$2(TreeNode.scala:535)
>   at 
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:535)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$1(TreeNode.scala:532)
>   at 
> org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1128)
>   at 
> org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1127)
>   at 
> org.apache.spark.sql.catalyst.expressions.UnaryExpression.mapChildren(Expression.scala:467)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:532)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$1(TreeNode.scala:532)
>   at 
> org.apache.spark.sql.catalyst.trees.BinaryLike.mapChildren(TreeNode.scala:1154)
>   at 
> org.apache.spark.sql.catalyst.trees.BinaryLike.mapChildren$(TreeNode.scala:1153)
>   at 
> org.apache.spark.sql.catalyst.expressions.BinaryExpression.mapChildren(Expression.scala:555)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:532)
>   at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$transformExpressionsUpWithPruning$1(QueryPlan.scala:181)
>   at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$1(QueryPlan.scala:193)
>   at 
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
>   at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:193)
>   at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:204)
>   at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$4(QueryPlan.scala:214)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:323)
>   at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:214)
>   at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUpWithPruning(QueryPlan.scala:181)
>   at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:161)
>   at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$1(CheckAnalysis.scala:175)
>   at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$1$adapted(CheckAnalysis.scala:94)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:263)
>   at 
> 

[jira] [Comment Edited] (SPARK-41236) The renamed field name cannot be recognized after group filtering

2022-11-30 Thread Ritika Maheshwari (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-41236?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17641576#comment-17641576
 ] 

Ritika Maheshwari edited comment on SPARK-41236 at 11/30/22 10:51 PM:
--

Hello Zhong ,

Try to rename the field as a different name than the original column name.
select collect_set(age) as ageCol
from db_table.table1
group by name
having size(ageCol) > 1 
 

Although your result will be zero rows. Because you have only one age for each 
of your names "a","b" and "c"

Therefore size(ageCol) >1 will fail.

But if you have your table as 

age name

1      "a"

2      "a"

3     "a"

4      "b"

5      "c"

6      "c"

 

Then you will get a result

[1,2,3]

[5,6]

 

 


was (Author: ritikam):
Hello Zhong ,

Try to rename the field as a different name than the original column name.
select collect_set(age) as ageCol
from db_table.table1
group by name
having size(ageCol) > 1 
 

 

> The renamed field name cannot be recognized after group filtering
> -
>
> Key: SPARK-41236
> URL: https://issues.apache.org/jira/browse/SPARK-41236
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.2.0
>Reporter: jingxiong zhong
>Priority: Major
>
> {code:java}
> select collect_set(age) as age
> from db_table.table1
> group by name
> having size(age) > 1 
> {code}
> a simple sql, it work well in spark2.4, but doesn't work in spark3.2.0
> Is it a bug or a new standard?
> h3. *like this:*
> {code:sql}
> create db1.table1(age int, name string);
> insert into db1.table1 values(1, 'a');
> insert into db1.table1 values(2, 'b');
> insert into db1.table1 values(3, 'c');
> --then run sql like this 
> select collect_set(age) as age from db1.table1 group by name having size(age) 
> > 1 ;
> {code}
> h3. Stack Information
> org.apache.spark.sql.AnalysisException: cannot resolve 'age' given input 
> columns: [age]; line 4 pos 12;
> 'Filter (size('age, true) > 1)
> +- Aggregate [name#2], [collect_set(age#1, 0, 0) AS age#0]
>+- SubqueryAlias spark_catalog.db1.table1
>   +- HiveTableRelation [`db1`.`table1`, 
> org.apache.hadoop.hive.ql.io.orc.OrcSerde, Data Cols: [age#1, name#2], 
> Partition Cols: []]
>   at 
> org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:54)
>   at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:179)
>   at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:175)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$2(TreeNode.scala:535)
>   at 
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:535)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$1(TreeNode.scala:532)
>   at 
> org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1128)
>   at 
> org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1127)
>   at 
> org.apache.spark.sql.catalyst.expressions.UnaryExpression.mapChildren(Expression.scala:467)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:532)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$1(TreeNode.scala:532)
>   at 
> org.apache.spark.sql.catalyst.trees.BinaryLike.mapChildren(TreeNode.scala:1154)
>   at 
> org.apache.spark.sql.catalyst.trees.BinaryLike.mapChildren$(TreeNode.scala:1153)
>   at 
> org.apache.spark.sql.catalyst.expressions.BinaryExpression.mapChildren(Expression.scala:555)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:532)
>   at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$transformExpressionsUpWithPruning$1(QueryPlan.scala:181)
>   at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$1(QueryPlan.scala:193)
>   at 
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
>   at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:193)
>   at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:204)
>   at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$4(QueryPlan.scala:214)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:323)
>   at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:214)
>   at 
> 

[jira] (SPARK-41236) The renamed field name cannot be recognized after group filtering

2022-12-01 Thread Ritika Maheshwari (Jira)


[ https://issues.apache.org/jira/browse/SPARK-41236 ]


Ritika Maheshwari deleted comment on SPARK-41236:
---

was (Author: ritikam):
Hello Zhong ,

Try to rename the field as a different name than the original column name.
select collect_set(age) as ageCol
from db_table.table1
group by name
having size(ageCol) > 1 
 

Although your result will be zero rows. Because you have only one age for each 
of your names "a","b" and "c"

Therefore size(ageCol) >1 will fail.

But if you have your table as 

age name

1      "a"

2      "a"

3     "a"

4      "b"

5      "c"

6      "c"

 

Then you will get a result

[1,2,3]

[5,6]

 

 

> The renamed field name cannot be recognized after group filtering
> -
>
> Key: SPARK-41236
> URL: https://issues.apache.org/jira/browse/SPARK-41236
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.2.0
>Reporter: jingxiong zhong
>Priority: Major
>
> {code:java}
> select collect_set(age) as age
> from db_table.table1
> group by name
> having size(age) > 1 
> {code}
> a simple sql, it work well in spark2.4, but doesn't work in spark3.2.0
> Is it a bug or a new standard?
> h3. *like this:*
> {code:sql}
> create db1.table1(age int, name string);
> insert into db1.table1 values(1, 'a');
> insert into db1.table1 values(2, 'b');
> insert into db1.table1 values(3, 'c');
> --then run sql like this 
> select collect_set(age) as age from db1.table1 group by name having size(age) 
> > 1 ;
> {code}
> h3. Stack Information
> org.apache.spark.sql.AnalysisException: cannot resolve 'age' given input 
> columns: [age]; line 4 pos 12;
> 'Filter (size('age, true) > 1)
> +- Aggregate [name#2], [collect_set(age#1, 0, 0) AS age#0]
>+- SubqueryAlias spark_catalog.db1.table1
>   +- HiveTableRelation [`db1`.`table1`, 
> org.apache.hadoop.hive.ql.io.orc.OrcSerde, Data Cols: [age#1, name#2], 
> Partition Cols: []]
>   at 
> org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:54)
>   at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:179)
>   at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:175)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$2(TreeNode.scala:535)
>   at 
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:535)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$1(TreeNode.scala:532)
>   at 
> org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1128)
>   at 
> org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1127)
>   at 
> org.apache.spark.sql.catalyst.expressions.UnaryExpression.mapChildren(Expression.scala:467)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:532)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$1(TreeNode.scala:532)
>   at 
> org.apache.spark.sql.catalyst.trees.BinaryLike.mapChildren(TreeNode.scala:1154)
>   at 
> org.apache.spark.sql.catalyst.trees.BinaryLike.mapChildren$(TreeNode.scala:1153)
>   at 
> org.apache.spark.sql.catalyst.expressions.BinaryExpression.mapChildren(Expression.scala:555)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:532)
>   at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$transformExpressionsUpWithPruning$1(QueryPlan.scala:181)
>   at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$1(QueryPlan.scala:193)
>   at 
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
>   at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:193)
>   at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:204)
>   at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$4(QueryPlan.scala:214)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:323)
>   at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:214)
>   at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUpWithPruning(QueryPlan.scala:181)
>   at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:161)
>   at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$1(CheckAnalysis.scala:175)
>   at 

[jira] [Commented] (SPARK-41236) The renamed field name cannot be recognized after group filtering

2022-12-01 Thread Ritika Maheshwari (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-41236?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17642332#comment-17642332
 ] 

Ritika Maheshwari commented on SPARK-41236:
---

Running the following query against Spark 3.3.0 code that was downloaded. The 
error message is improved.

spark-sql> select collect_set(age) as age

         > from test2.ageGroups

         > GROUP BY name

         > having size(age) >1;

Error in query: cannot resolve 'size(age)' due to data type mismatch: argument 
1 requires (array or map) type, however, 'spark_catalog.test2.agegroups.age' is 
of int type.; line 4 pos 7;

'Filter (size('age, true) > 1)

+- Aggregate [name#29], [collect_set(age#30, 0, 0) AS age#27]

   +- SubqueryAlias spark_catalog.test2.agegroups

      +- HiveTableRelation [`test2`.`agegroups`, 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [eid#28, 
name#29, age#30], Partition Cols: []]

But this is confusing if it recognizes age as int the following query should 
not have failed. It fails complaining that age is an array as it it getting 
bound to the renamed column.

spark-sql> select collect_set(age) as age

         > from test2.ageGroups

         > GROUP BY name

         > Having age >1;

Error in query: cannot resolve '(age > 1)' due to data type mismatch: differing 
types in '(age > 1)' (array and int).; line 4 pos 7;

'Filter (age#62 > 1)

+- Aggregate [name#64], [collect_set(age#65, 0, 0) AS age#62]

   +- SubqueryAlias spark_catalog.test2.agegroups

      +- HiveTableRelation [`test2`.`agegroups`, 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [eid#63, 
name#64, age#65], Partition Cols: []]

 

 

> The renamed field name cannot be recognized after group filtering
> -
>
> Key: SPARK-41236
> URL: https://issues.apache.org/jira/browse/SPARK-41236
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.2.0
>Reporter: jingxiong zhong
>Priority: Major
>
> {code:java}
> select collect_set(age) as age
> from db_table.table1
> group by name
> having size(age) > 1 
> {code}
> a simple sql, it work well in spark2.4, but doesn't work in spark3.2.0
> Is it a bug or a new standard?
> h3. *like this:*
> {code:sql}
> create db1.table1(age int, name string);
> insert into db1.table1 values(1, 'a');
> insert into db1.table1 values(2, 'b');
> insert into db1.table1 values(3, 'c');
> --then run sql like this 
> select collect_set(age) as age from db1.table1 group by name having size(age) 
> > 1 ;
> {code}
> h3. Stack Information
> org.apache.spark.sql.AnalysisException: cannot resolve 'age' given input 
> columns: [age]; line 4 pos 12;
> 'Filter (size('age, true) > 1)
> +- Aggregate [name#2], [collect_set(age#1, 0, 0) AS age#0]
>+- SubqueryAlias spark_catalog.db1.table1
>   +- HiveTableRelation [`db1`.`table1`, 
> org.apache.hadoop.hive.ql.io.orc.OrcSerde, Data Cols: [age#1, name#2], 
> Partition Cols: []]
>   at 
> org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:54)
>   at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:179)
>   at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:175)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$2(TreeNode.scala:535)
>   at 
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:535)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$1(TreeNode.scala:532)
>   at 
> org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1128)
>   at 
> org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1127)
>   at 
> org.apache.spark.sql.catalyst.expressions.UnaryExpression.mapChildren(Expression.scala:467)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:532)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$1(TreeNode.scala:532)
>   at 
> org.apache.spark.sql.catalyst.trees.BinaryLike.mapChildren(TreeNode.scala:1154)
>   at 
> org.apache.spark.sql.catalyst.trees.BinaryLike.mapChildren$(TreeNode.scala:1153)
>   at 
> org.apache.spark.sql.catalyst.expressions.BinaryExpression.mapChildren(Expression.scala:555)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:532)
>   at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$transformExpressionsUpWithPruning$1(QueryPlan.scala:181)
>   

[jira] [Comment Edited] (SPARK-41391) The output column name of `groupBy.agg(count_distinct)` is incorrect

2023-02-21 Thread Ritika Maheshwari (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-41391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17691760#comment-17691760
 ] 

Ritika Maheshwari edited comment on SPARK-41391 at 2/21/23 7:35 PM:


A better solution

 

In *RelationalGroupedDataset* changed the following method to add another case

private[this] def alias(expr: Expression): NamedExpression = expr match

{     case expr: NamedExpression => expr     case a: AggregateExpression if 
a.aggregateFunction.isInstanceOf[TypedAggregateExpression] =>    

  UnresolvedAlias(a, Some(Column.generateAlias))

{color:#FF}   case ag: UnresolvedFunction if (containsStar(Seq(ag))) =>    
UnresolvedAlias(expr, None){color}   

  case expr: Expression =>       Alias(expr, toPrettySQL(expr))()   }

In *SQLImplicit.scala* MADE CHANGE TO FOLLOWING METHOD

implicit class StringToColumn(val sc: StringContext) {
def $(args: Any*): ColumnName = {
{*}if (sc.parts.length == 1 && sc.parts.contains("{*}")) {*
{*}new ColumnName("{*}")*
*}* else

{ new ColumnName(sc.s(args: _*)) }

This seems to work and this create a tree structure similar to what you get 
when you use spark sql for aggrgate queries. Major difference between scala and 
spark sql was that spark sql was creating an unresolvedalias for count 
(distinct * ) expressions where as scala was creating an alias where the alias 
was count(unresolvedstar()) 

scala> val df = spark.range(1, 10).withColumn("value", lit(1))

{*}df{*}: *org.apache.spark.sql.DataFrame* = [id: bigint, value: int]

 

scala> df.createOrReplaceTempView("table")

 

scala>  df.groupBy("id").agg(count_distinct($"*"))

{*}res1{*}: *org.apache.spark.sql.DataFrame* = [id: bigint, count(DISTINCT id, 
value): bigint]

 

scala> spark.sql(" SELECT id, COUNT(DISTINCT *) FROM table GROUP BY id ")

{*}res2{*}: *org.apache.spark.sql.DataFrame* = [id: bigint, count(DISTINCT id, 
value): bigint]

 

scala> df.groupBy("id").agg(count_distinct($"value"))

{*}res3{*}: *org.apache.spark.sql.DataFrame* = [id: bigint, COUNT(value): 
bigint]

 

scala> 

 


was (Author: ritikam):
A better solution

 

In *RelationalGroupedDataset* changed the following method to add another case

private[this] def alias(expr: Expression): NamedExpression = expr match

{     case expr: NamedExpression => expr     case a: AggregateExpression if 
a.aggregateFunction.isInstanceOf[TypedAggregateExpression] =>       
UnresolvedAlias(a, Some(Column.generateAlias))    

*case ag: UnresolvedFunction if (containsStar(Seq(ag))) =>    
UnresolvedAlias(expr, None)*    

case expr: Expression =>       Alias(expr, toPrettySQL(expr))()   }

In *SQLImplicit.scala* MADE CHANGE TO FOLLOWING METHOD

implicit class StringToColumn(val sc: StringContext) {
def $(args: Any*): ColumnName = {
{*}if (sc.parts.length == 1 && sc.parts.contains("{*}")) {*
{*}new ColumnName("{*}")*
*}* else

{ new ColumnName(sc.s(args: _*)) }

This seems to work and this create a tree structure similar to what you get 
when you use spark sql for aggrgate queries. Major difference between scala and 
spark sql was that spark sql was creating an unresolvedalias for count 
(distinct * ) expressions where as scala was creating an alias where the alias 
was count(unresolvedstar()) 

scala> val df = spark.range(1, 10).withColumn("value", lit(1))

{*}df{*}: *org.apache.spark.sql.DataFrame* = [id: bigint, value: int]

 

scala> df.createOrReplaceTempView("table")

 

scala>  df.groupBy("id").agg(count_distinct($"*"))

{*}res1{*}: *org.apache.spark.sql.DataFrame* = [id: bigint, count(DISTINCT id, 
value): bigint]

 

scala> spark.sql(" SELECT id, COUNT(DISTINCT *) FROM table GROUP BY id ")

{*}res2{*}: *org.apache.spark.sql.DataFrame* = [id: bigint, count(DISTINCT id, 
value): bigint]

 

scala> df.groupBy("id").agg(count_distinct($"value"))

{*}res3{*}: *org.apache.spark.sql.DataFrame* = [id: bigint, COUNT(value): 
bigint]

 

scala> 

 

> The output column name of `groupBy.agg(count_distinct)` is incorrect
> 
>
> Key: SPARK-41391
> URL: https://issues.apache.org/jira/browse/SPARK-41391
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.2.0, 3.3.0, 3.4.0
>Reporter: Ruifeng Zheng
>Priority: Major
>
> scala> val df = spark.range(1, 10).withColumn("value", lit(1))
> df: org.apache.spark.sql.DataFrame = [id: bigint, value: int]
> scala> df.createOrReplaceTempView("table")
> scala> df.groupBy("id").agg(count_distinct($"value"))
> res1: org.apache.spark.sql.DataFrame = [id: bigint, count(value): bigint]
> scala> spark.sql(" SELECT id, COUNT(DISTINCT value) FROM table GROUP BY id ")
> res2: org.apache.spark.sql.DataFrame = [id: bigint, count(DISTINCT value): 
> bigint]
> scala> df.groupBy("id").agg(count_distinct($"*"))
> res3: org.apache.spark.sql.DataFrame = 

[jira] [Comment Edited] (SPARK-41391) The output column name of `groupBy.agg(count_distinct)` is incorrect

2023-02-21 Thread Ritika Maheshwari (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-41391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17691760#comment-17691760
 ] 

Ritika Maheshwari edited comment on SPARK-41391 at 2/21/23 7:56 PM:


A better solution

 

In *RelationalGroupedDataset* changed the following method to add another case

private[this] def alias(expr: Expression): NamedExpression = expr match

{     case expr: NamedExpression => expr     case a: AggregateExpression if 
a.aggregateFunction.isInstanceOf[TypedAggregateExpression] =>    

  UnresolvedAlias(a, Some(Column.generateAlias))

{color:#ff}   case ag: UnresolvedFunction if (containsStar(Seq(ag))) =>    
UnresolvedAlias(expr, None){color}   

  case expr: Expression =>       Alias(expr, toPrettySQL(expr))()   }

In *SQLImplicit.scala* MADE CHANGE TO FOLLOWING METHOD

implicit class StringToColumn(val sc: StringContext) {
def $(args: Any*): ColumnName = {
{color:#FF}if (sc.parts.length == 1 && sc.parts.contains("")) {{color}
{color:#FF}new ColumnName(name= "*"){color}
{color:#FF}}{color} else

{ new ColumnName(sc.s(args: _*)) }

This seems to work and this create a tree structure similar to what you get 
when you use spark sql for aggrgate queries. Major difference between scala and 
spark sql was that spark sql was creating an unresolvedalias for count 
(distinct * ) expressions where as scala was creating an alias where the alias 
was count(unresolvedstar()) 

scala> val df = spark.range(1, 10).withColumn("value", lit(1))

{*}df{*}: *org.apache.spark.sql.DataFrame* = [id: bigint, value: int]

 

scala> df.createOrReplaceTempView("table")

 

scala>  df.groupBy("id").agg(count_distinct($"*"))

{*}res1{*}: *org.apache.spark.sql.DataFrame* = [id: bigint, count(DISTINCT id, 
value): bigint]

 

scala> spark.sql(" SELECT id, COUNT(DISTINCT *) FROM table GROUP BY id ")

{*}res2{*}: *org.apache.spark.sql.DataFrame* = [id: bigint, count(DISTINCT id, 
value): bigint]

 

scala> df.groupBy("id").agg(count_distinct($"value"))

{*}res3{*}: *org.apache.spark.sql.DataFrame* = [id: bigint, COUNT(value): 
bigint]

 

scala> 

 


was (Author: ritikam):
A better solution

 

In *RelationalGroupedDataset* changed the following method to add another case

private[this] def alias(expr: Expression): NamedExpression = expr match

{     case expr: NamedExpression => expr     case a: AggregateExpression if 
a.aggregateFunction.isInstanceOf[TypedAggregateExpression] =>    

  UnresolvedAlias(a, Some(Column.generateAlias))

{color:#FF}   case ag: UnresolvedFunction if (containsStar(Seq(ag))) =>    
UnresolvedAlias(expr, None){color}   

  case expr: Expression =>       Alias(expr, toPrettySQL(expr))()   }

In *SQLImplicit.scala* MADE CHANGE TO FOLLOWING METHOD

implicit class StringToColumn(val sc: StringContext) {
def $(args: Any*): ColumnName = {
{*}if (sc.parts.length == 1 && sc.parts.contains("{*}")) {*
{*}new ColumnName("{*}")*
*}* else

{ new ColumnName(sc.s(args: _*)) }

This seems to work and this create a tree structure similar to what you get 
when you use spark sql for aggrgate queries. Major difference between scala and 
spark sql was that spark sql was creating an unresolvedalias for count 
(distinct * ) expressions where as scala was creating an alias where the alias 
was count(unresolvedstar()) 

scala> val df = spark.range(1, 10).withColumn("value", lit(1))

{*}df{*}: *org.apache.spark.sql.DataFrame* = [id: bigint, value: int]

 

scala> df.createOrReplaceTempView("table")

 

scala>  df.groupBy("id").agg(count_distinct($"*"))

{*}res1{*}: *org.apache.spark.sql.DataFrame* = [id: bigint, count(DISTINCT id, 
value): bigint]

 

scala> spark.sql(" SELECT id, COUNT(DISTINCT *) FROM table GROUP BY id ")

{*}res2{*}: *org.apache.spark.sql.DataFrame* = [id: bigint, count(DISTINCT id, 
value): bigint]

 

scala> df.groupBy("id").agg(count_distinct($"value"))

{*}res3{*}: *org.apache.spark.sql.DataFrame* = [id: bigint, COUNT(value): 
bigint]

 

scala> 

 

> The output column name of `groupBy.agg(count_distinct)` is incorrect
> 
>
> Key: SPARK-41391
> URL: https://issues.apache.org/jira/browse/SPARK-41391
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.2.0, 3.3.0, 3.4.0
>Reporter: Ruifeng Zheng
>Priority: Major
>
> scala> val df = spark.range(1, 10).withColumn("value", lit(1))
> df: org.apache.spark.sql.DataFrame = [id: bigint, value: int]
> scala> df.createOrReplaceTempView("table")
> scala> df.groupBy("id").agg(count_distinct($"value"))
> res1: org.apache.spark.sql.DataFrame = [id: bigint, count(value): bigint]
> scala> spark.sql(" SELECT id, COUNT(DISTINCT value) FROM table GROUP BY id ")
> res2: org.apache.spark.sql.DataFrame = [id: bigint, count(DISTINCT value): 
> bigint]
> scala> 

[jira] [Comment Edited] (SPARK-41391) The output column name of `groupBy.agg(count_distinct)` is incorrect

2023-02-21 Thread Ritika Maheshwari (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-41391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17691760#comment-17691760
 ] 

Ritika Maheshwari edited comment on SPARK-41391 at 2/21/23 7:33 PM:


A better solution

 

In *RelationalGroupedDataset* changed the following method to add another case

private[this] def alias(expr: Expression): NamedExpression = expr match {

    case expr: NamedExpression => expr

    case a: AggregateExpression if 
a.aggregateFunction.isInstanceOf[TypedAggregateExpression] =>

      UnresolvedAlias(a, Some(Column.generateAlias))

    *case ag: UnresolvedFunction if (containsStar(Seq(ag))) =>*

      *UnresolvedAlias(expr, None)*

    case expr: Expression =>

      Alias(expr, toPrettySQL(expr))()

  }

In *SQLImplicit.scala* MADE CHANGE TO FOLLOWING METHOD

implicit class StringToColumn(val sc: StringContext) {
def $(args: Any*): ColumnName = {
{*}if (sc.parts.length == 1 && sc.parts.contains("{*}")) {*
{*}new ColumnName("{*}")*
*}* else

{ new ColumnName(sc.s(args: _*)) }

This seems to work and this create a tree structure similar to what you get 
when you use spark sql for aggrgate queries. Major difference between scala and 
spark sql was that spark sql was creating an unresolvedalias for count 
(distinct * ) expressions where as scala was creating an alias where the alias 
was count(unresolvedstar()) 

scala> val df = spark.range(1, 10).withColumn("value", lit(1))

{*}df{*}: *org.apache.spark.sql.DataFrame* = [id: bigint, value: int]

 

scala> df.createOrReplaceTempView("table")

 

scala>  df.groupBy("id").agg(count_distinct($"*"))

{*}res1{*}: *org.apache.spark.sql.DataFrame* = [id: bigint, count(DISTINCT id, 
value): bigint]

 

scala> spark.sql(" SELECT id, COUNT(DISTINCT *) FROM table GROUP BY id ")

{*}res2{*}: *org.apache.spark.sql.DataFrame* = [id: bigint, count(DISTINCT id, 
value): bigint]

 

scala> df.groupBy("id").agg(count_distinct($"value"))

{*}res3{*}: *org.apache.spark.sql.DataFrame* = [id: bigint, COUNT(value): 
bigint]

 

scala> 

 


was (Author: ritikam):
A better solution

 

In *RelationalGroupedDataset* changed the following method to add another case

private[this] def alias(expr: Expression): NamedExpression = expr match {
case expr: NamedExpression => expr
case a: AggregateExpression if 
a.aggregateFunction.isInstanceOf[TypedAggregateExpression] =>
UnresolvedAlias(a, Some(Column.generateAlias))
*case ag: UnresolvedFunction if (containsStar(Seq(ag))) =>*
 *UnresolvedAlias(expr, None)*
case expr: Expression =>
Alias(expr, toPrettySQL(expr))()
}

In *SQLImplicit.scala* MADE CHANGE TO FOLLOWING METHOD

implicit class StringToColumn(val sc: StringContext) {
def $(args: Any*): ColumnName = {
*if (sc.parts.length == 1 && sc.parts.contains("*")) {*
 *new ColumnName("*")*
 *}* else {
new ColumnName(sc.s(args: _*))
}

This seems to work and this create a tree structure similar to what you get 
when you use spark sql for aggrgate queries. Major difference between scala and 
spark sql was that spark sql was creating an unresolvedalias for count 
(distinct * ) expressions where as scala was creating an alias where the alias 
was count(unresolvedstar()) 

scala> val df = spark.range(1, 10).withColumn("value", lit(1))

*df*: *org.apache.spark.sql.DataFrame* = [id: bigint, value: int]

 

scala> df.createOrReplaceTempView("table")

 

scala>  df.groupBy("id").agg(count_distinct($"*"))

*res1*: *org.apache.spark.sql.DataFrame* = [id: bigint, count(DISTINCT id, 
value): bigint]

 

scala> spark.sql(" SELECT id, COUNT(DISTINCT *) FROM table GROUP BY id ")

*res2*: *org.apache.spark.sql.DataFrame* = [id: bigint, count(DISTINCT id, 
value): bigint]

 

scala> df.groupBy("id").agg(count_distinct($"value"))

*res3*: *org.apache.spark.sql.DataFrame* = [id: bigint, COUNT(value): bigint]

 

scala> 

 

> The output column name of `groupBy.agg(count_distinct)` is incorrect
> 
>
> Key: SPARK-41391
> URL: https://issues.apache.org/jira/browse/SPARK-41391
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.2.0, 3.3.0, 3.4.0
>Reporter: Ruifeng Zheng
>Priority: Major
>
> scala> val df = spark.range(1, 10).withColumn("value", lit(1))
> df: org.apache.spark.sql.DataFrame = [id: bigint, value: int]
> scala> df.createOrReplaceTempView("table")
> scala> df.groupBy("id").agg(count_distinct($"value"))
> res1: org.apache.spark.sql.DataFrame = [id: bigint, count(value): bigint]
> scala> spark.sql(" SELECT id, COUNT(DISTINCT value) FROM table GROUP BY id ")
> res2: org.apache.spark.sql.DataFrame = [id: bigint, count(DISTINCT value): 
> bigint]
> scala> df.groupBy("id").agg(count_distinct($"*"))
> res3: org.apache.spark.sql.DataFrame = [id: bigint, count(unresolvedstar()): 
> bigint]
> scala> spark.sql(" 

[jira] [Comment Edited] (SPARK-41391) The output column name of `groupBy.agg(count_distinct)` is incorrect

2023-02-21 Thread Ritika Maheshwari (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-41391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17691760#comment-17691760
 ] 

Ritika Maheshwari edited comment on SPARK-41391 at 2/21/23 7:34 PM:


A better solution

 

In *RelationalGroupedDataset* changed the following method to add another case

private[this] def alias(expr: Expression): NamedExpression = expr match

{     case expr: NamedExpression => expr     case a: AggregateExpression if 
a.aggregateFunction.isInstanceOf[TypedAggregateExpression] =>       
UnresolvedAlias(a, Some(Column.generateAlias))    

*case ag: UnresolvedFunction if (containsStar(Seq(ag))) =>    
UnresolvedAlias(expr, None)*    

case expr: Expression =>       Alias(expr, toPrettySQL(expr))()   }

In *SQLImplicit.scala* MADE CHANGE TO FOLLOWING METHOD

implicit class StringToColumn(val sc: StringContext) {
def $(args: Any*): ColumnName = {
{*}if (sc.parts.length == 1 && sc.parts.contains("{*}")) {*
{*}new ColumnName("{*}")*
*}* else

{ new ColumnName(sc.s(args: _*)) }

This seems to work and this create a tree structure similar to what you get 
when you use spark sql for aggrgate queries. Major difference between scala and 
spark sql was that spark sql was creating an unresolvedalias for count 
(distinct * ) expressions where as scala was creating an alias where the alias 
was count(unresolvedstar()) 

scala> val df = spark.range(1, 10).withColumn("value", lit(1))

{*}df{*}: *org.apache.spark.sql.DataFrame* = [id: bigint, value: int]

 

scala> df.createOrReplaceTempView("table")

 

scala>  df.groupBy("id").agg(count_distinct($"*"))

{*}res1{*}: *org.apache.spark.sql.DataFrame* = [id: bigint, count(DISTINCT id, 
value): bigint]

 

scala> spark.sql(" SELECT id, COUNT(DISTINCT *) FROM table GROUP BY id ")

{*}res2{*}: *org.apache.spark.sql.DataFrame* = [id: bigint, count(DISTINCT id, 
value): bigint]

 

scala> df.groupBy("id").agg(count_distinct($"value"))

{*}res3{*}: *org.apache.spark.sql.DataFrame* = [id: bigint, COUNT(value): 
bigint]

 

scala> 

 


was (Author: ritikam):
A better solution

 

In *RelationalGroupedDataset* changed the following method to add another case

private[this] def alias(expr: Expression): NamedExpression = expr match {

    case expr: NamedExpression => expr

    case a: AggregateExpression if 
a.aggregateFunction.isInstanceOf[TypedAggregateExpression] =>

      UnresolvedAlias(a, Some(Column.generateAlias))

    *case ag: UnresolvedFunction if (containsStar(Seq(ag))) =>*

      *UnresolvedAlias(expr, None)*

    case expr: Expression =>

      Alias(expr, toPrettySQL(expr))()

  }

In *SQLImplicit.scala* MADE CHANGE TO FOLLOWING METHOD

implicit class StringToColumn(val sc: StringContext) {
def $(args: Any*): ColumnName = {
{*}if (sc.parts.length == 1 && sc.parts.contains("{*}")) {*
{*}new ColumnName("{*}")*
*}* else

{ new ColumnName(sc.s(args: _*)) }

This seems to work and this create a tree structure similar to what you get 
when you use spark sql for aggrgate queries. Major difference between scala and 
spark sql was that spark sql was creating an unresolvedalias for count 
(distinct * ) expressions where as scala was creating an alias where the alias 
was count(unresolvedstar()) 

scala> val df = spark.range(1, 10).withColumn("value", lit(1))

{*}df{*}: *org.apache.spark.sql.DataFrame* = [id: bigint, value: int]

 

scala> df.createOrReplaceTempView("table")

 

scala>  df.groupBy("id").agg(count_distinct($"*"))

{*}res1{*}: *org.apache.spark.sql.DataFrame* = [id: bigint, count(DISTINCT id, 
value): bigint]

 

scala> spark.sql(" SELECT id, COUNT(DISTINCT *) FROM table GROUP BY id ")

{*}res2{*}: *org.apache.spark.sql.DataFrame* = [id: bigint, count(DISTINCT id, 
value): bigint]

 

scala> df.groupBy("id").agg(count_distinct($"value"))

{*}res3{*}: *org.apache.spark.sql.DataFrame* = [id: bigint, COUNT(value): 
bigint]

 

scala> 

 

> The output column name of `groupBy.agg(count_distinct)` is incorrect
> 
>
> Key: SPARK-41391
> URL: https://issues.apache.org/jira/browse/SPARK-41391
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.2.0, 3.3.0, 3.4.0
>Reporter: Ruifeng Zheng
>Priority: Major
>
> scala> val df = spark.range(1, 10).withColumn("value", lit(1))
> df: org.apache.spark.sql.DataFrame = [id: bigint, value: int]
> scala> df.createOrReplaceTempView("table")
> scala> df.groupBy("id").agg(count_distinct($"value"))
> res1: org.apache.spark.sql.DataFrame = [id: bigint, count(value): bigint]
> scala> spark.sql(" SELECT id, COUNT(DISTINCT value) FROM table GROUP BY id ")
> res2: org.apache.spark.sql.DataFrame = [id: bigint, count(DISTINCT value): 
> bigint]
> scala> df.groupBy("id").agg(count_distinct($"*"))
> res3: org.apache.spark.sql.DataFrame = [id: 

[jira] (SPARK-41391) The output column name of `groupBy.agg(count_distinct)` is incorrect

2023-02-21 Thread Ritika Maheshwari (Jira)


[ https://issues.apache.org/jira/browse/SPARK-41391 ]


Ritika Maheshwari deleted comment on SPARK-41391:
---

was (Author: ritikam):
Modifying the agg method in RelationalGroupedDataset.scala to

@scala.annotation.varargs
def agg(expr: Column, exprs: Column*): DataFrame = {
toDF((expr +: exprs).map {
case typed: TypedColumn[_, _] =>
typed.withInputType(df.exprEnc, df.logicalPlan.output).expr
case f: Column =>
if (f.expr != null &&
f.expr.isInstanceOf[UnresolvedFunction] &&
f.expr.asInstanceOf[UnresolvedFunction].arguments(0).isInstanceOf[UnresolvedStar])
 {
f.expr.asInstanceOf[UnresolvedFunction].childrenResolved
f.expr.asInstanceOf[UnresolvedFunction].copy(arguments = df.numericColumns)
} else {
f.expr
}
case c => c.expr
})
}

Seems to work though I am not able to get the distinct in the count function.

scala> df.groupBy("id").agg(count_distinct($"*"))

*res7*: *org.apache.spark.sql.DataFrame* = [id: bigint, COUNT(id, value): 
bigint]

Some how the alias formed for this function is missing the distinct key word too

'COUNT(distinct id#0L, value#2) AS COUNT(id, value)#47

Any ideas why distinct is not showing up? Also is this a possible solution for 
the unresolvedstar? We resolve the unresolvedstar in the RelationGroupedDataset 
rather than in the Analyzer.

> The output column name of `groupBy.agg(count_distinct)` is incorrect
> 
>
> Key: SPARK-41391
> URL: https://issues.apache.org/jira/browse/SPARK-41391
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.2.0, 3.3.0, 3.4.0
>Reporter: Ruifeng Zheng
>Priority: Major
>
> scala> val df = spark.range(1, 10).withColumn("value", lit(1))
> df: org.apache.spark.sql.DataFrame = [id: bigint, value: int]
> scala> df.createOrReplaceTempView("table")
> scala> df.groupBy("id").agg(count_distinct($"value"))
> res1: org.apache.spark.sql.DataFrame = [id: bigint, count(value): bigint]
> scala> spark.sql(" SELECT id, COUNT(DISTINCT value) FROM table GROUP BY id ")
> res2: org.apache.spark.sql.DataFrame = [id: bigint, count(DISTINCT value): 
> bigint]
> scala> df.groupBy("id").agg(count_distinct($"*"))
> res3: org.apache.spark.sql.DataFrame = [id: bigint, count(unresolvedstar()): 
> bigint]
> scala> spark.sql(" SELECT id, COUNT(DISTINCT *) FROM table GROUP BY id ")
> res4: org.apache.spark.sql.DataFrame = [id: bigint, count(DISTINCT id, 
> value): bigint]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-41391) The output column name of `groupBy.agg(count_distinct)` is incorrect

2023-02-21 Thread Ritika Maheshwari (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-41391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17691760#comment-17691760
 ] 

Ritika Maheshwari commented on SPARK-41391:
---

A better solution

 

In *RelationalGroupedDataset* changed the following method to add another case

private[this] def alias(expr: Expression): NamedExpression = expr match {
case expr: NamedExpression => expr
case a: AggregateExpression if 
a.aggregateFunction.isInstanceOf[TypedAggregateExpression] =>
UnresolvedAlias(a, Some(Column.generateAlias))
*case ag: UnresolvedFunction if (containsStar(Seq(ag))) =>*
 *UnresolvedAlias(expr, None)*
case expr: Expression =>
Alias(expr, toPrettySQL(expr))()
}

In *SQLImplicit.scala* MADE CHANGE TO FOLLOWING METHOD

implicit class StringToColumn(val sc: StringContext) {
def $(args: Any*): ColumnName = {
*if (sc.parts.length == 1 && sc.parts.contains("*")) {*
 *new ColumnName("*")*
 *}* else {
new ColumnName(sc.s(args: _*))
}

This seems to work and this create a tree structure similar to what you get 
when you use spark sql for aggrgate queries. Major difference between scala and 
spark sql was that spark sql was creating an unresolvedalias for count 
(distinct * ) expressions where as scala was creating an alias where the alias 
was count(unresolvedstar()) 

scala> val df = spark.range(1, 10).withColumn("value", lit(1))

*df*: *org.apache.spark.sql.DataFrame* = [id: bigint, value: int]

 

scala> df.createOrReplaceTempView("table")

 

scala>  df.groupBy("id").agg(count_distinct($"*"))

*res1*: *org.apache.spark.sql.DataFrame* = [id: bigint, count(DISTINCT id, 
value): bigint]

 

scala> spark.sql(" SELECT id, COUNT(DISTINCT *) FROM table GROUP BY id ")

*res2*: *org.apache.spark.sql.DataFrame* = [id: bigint, count(DISTINCT id, 
value): bigint]

 

scala> df.groupBy("id").agg(count_distinct($"value"))

*res3*: *org.apache.spark.sql.DataFrame* = [id: bigint, COUNT(value): bigint]

 

scala> 

 

> The output column name of `groupBy.agg(count_distinct)` is incorrect
> 
>
> Key: SPARK-41391
> URL: https://issues.apache.org/jira/browse/SPARK-41391
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.2.0, 3.3.0, 3.4.0
>Reporter: Ruifeng Zheng
>Priority: Major
>
> scala> val df = spark.range(1, 10).withColumn("value", lit(1))
> df: org.apache.spark.sql.DataFrame = [id: bigint, value: int]
> scala> df.createOrReplaceTempView("table")
> scala> df.groupBy("id").agg(count_distinct($"value"))
> res1: org.apache.spark.sql.DataFrame = [id: bigint, count(value): bigint]
> scala> spark.sql(" SELECT id, COUNT(DISTINCT value) FROM table GROUP BY id ")
> res2: org.apache.spark.sql.DataFrame = [id: bigint, count(DISTINCT value): 
> bigint]
> scala> df.groupBy("id").agg(count_distinct($"*"))
> res3: org.apache.spark.sql.DataFrame = [id: bigint, count(unresolvedstar()): 
> bigint]
> scala> spark.sql(" SELECT id, COUNT(DISTINCT *) FROM table GROUP BY id ")
> res4: org.apache.spark.sql.DataFrame = [id: bigint, count(DISTINCT id, 
> value): bigint]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-41391) The output column name of `groupBy.agg(count_distinct)` is incorrect

2023-02-21 Thread Ritika Maheshwari (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-41391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17691760#comment-17691760
 ] 

Ritika Maheshwari edited comment on SPARK-41391 at 2/21/23 8:06 PM:


A better solution

 

In *RelationalGroupedDataset* changed the following method to add another case

private[this] def alias(expr: Expression): NamedExpression = expr match

{     case expr: NamedExpression => expr     case a: AggregateExpression if 
a.aggregateFunction.isInstanceOf[TypedAggregateExpression] =>    

  UnresolvedAlias(a, Some(Column.generateAlias))

{color:#ff}   case ag: UnresolvedFunction if (containsStar(Seq(ag))) =>    
UnresolvedAlias(expr, None){color}   

  case expr: Expression =>       Alias(expr, toPrettySQL(expr))()   }

In *SQLImplicit.scala* MADE CHANGE TO FOLLOWING METHOD

implicit class StringToColumn(val sc: StringContext) {
def $(args: Any*): ColumnName = {
{color:#ff}if (sc.parts.length == 1 && sc.parts.contains("*")) {{color}
{color:#ff}new ColumnName(name= "*"){color}
{color:#FF}} else

{ new ColumnName(sc.s(args: _*)) }

This seems to work and this create a tree structure similar to what you get 
when you use spark sql for aggrgate queries. Major difference between scala and 
spark sql was that spark sql was creating an unresolvedalias for count 
(distinct * ) expressions where as scala was creating an alias where the alias 
was count(unresolvedstar()) 

scala> val df = spark.range(1, 10).withColumn("value", lit(1))

{*}df{*}: *org.apache.spark.sql.DataFrame* = [id: bigint, value: int]

 

scala> df.createOrReplaceTempView("table")

 

scala>  df.groupBy("id").agg(count_distinct($"*"))

{*}res1{*}: *org.apache.spark.sql.DataFrame* = [id: bigint, count(DISTINCT id, 
value): bigint]

 

scala> spark.sql(" SELECT id, COUNT(DISTINCT *) FROM table GROUP BY id ")

{*}res2{*}: *org.apache.spark.sql.DataFrame* = [id: bigint, count(DISTINCT id, 
value): bigint]

 

scala> df.groupBy("id").agg(count_distinct($"value"))

{*}res3{*}: *org.apache.spark.sql.DataFrame* = [id: bigint, COUNT(value): 
bigint]

 

scala> 

 


was (Author: ritikam):
A better solution

 

In *RelationalGroupedDataset* changed the following method to add another case

private[this] def alias(expr: Expression): NamedExpression = expr match

{     case expr: NamedExpression => expr     case a: AggregateExpression if 
a.aggregateFunction.isInstanceOf[TypedAggregateExpression] =>    

  UnresolvedAlias(a, Some(Column.generateAlias))

{color:#ff}   case ag: UnresolvedFunction if (containsStar(Seq(ag))) =>    
UnresolvedAlias(expr, None){color}   

  case expr: Expression =>       Alias(expr, toPrettySQL(expr))()   }

In *SQLImplicit.scala* MADE CHANGE TO FOLLOWING METHOD

implicit class StringToColumn(val sc: StringContext) {
def $(args: Any*): ColumnName = {
{color:#FF}if (sc.parts.length == 1 && sc.parts.contains("")) {{color}
{color:#FF}new ColumnName(name= "*"){color}
{color:#FF}}{color} else

{ new ColumnName(sc.s(args: _*)) }

This seems to work and this create a tree structure similar to what you get 
when you use spark sql for aggrgate queries. Major difference between scala and 
spark sql was that spark sql was creating an unresolvedalias for count 
(distinct * ) expressions where as scala was creating an alias where the alias 
was count(unresolvedstar()) 

scala> val df = spark.range(1, 10).withColumn("value", lit(1))

{*}df{*}: *org.apache.spark.sql.DataFrame* = [id: bigint, value: int]

 

scala> df.createOrReplaceTempView("table")

 

scala>  df.groupBy("id").agg(count_distinct($"*"))

{*}res1{*}: *org.apache.spark.sql.DataFrame* = [id: bigint, count(DISTINCT id, 
value): bigint]

 

scala> spark.sql(" SELECT id, COUNT(DISTINCT *) FROM table GROUP BY id ")

{*}res2{*}: *org.apache.spark.sql.DataFrame* = [id: bigint, count(DISTINCT id, 
value): bigint]

 

scala> df.groupBy("id").agg(count_distinct($"value"))

{*}res3{*}: *org.apache.spark.sql.DataFrame* = [id: bigint, COUNT(value): 
bigint]

 

scala> 

 

> The output column name of `groupBy.agg(count_distinct)` is incorrect
> 
>
> Key: SPARK-41391
> URL: https://issues.apache.org/jira/browse/SPARK-41391
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.2.0, 3.3.0, 3.4.0
>Reporter: Ruifeng Zheng
>Priority: Major
>
> scala> val df = spark.range(1, 10).withColumn("value", lit(1))
> df: org.apache.spark.sql.DataFrame = [id: bigint, value: int]
> scala> df.createOrReplaceTempView("table")
> scala> df.groupBy("id").agg(count_distinct($"value"))
> res1: org.apache.spark.sql.DataFrame = [id: bigint, count(value): bigint]
> scala> spark.sql(" SELECT id, COUNT(DISTINCT value) FROM table GROUP BY id ")
> res2: org.apache.spark.sql.DataFrame = [id: 

[jira] [Commented] (SPARK-30212) COUNT(DISTINCT) window function should be supported

2023-02-27 Thread Ritika Maheshwari (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-30212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17694144#comment-17694144
 ] 

Ritika Maheshwari commented on SPARK-30212:
---

This is the suggested way to handle this issue(from stack overflow)

"The typical approach to solving this problem is with a self-join. You group by 
whatever columns you need, compute the distinct count or any other aggregate 
function that cannot be used as a window function, and then join back to your 
original data."

So perhaps you can rewrite your query as
spark.sql("""SELECT T.*, (select COUNT(DISTINCT T2.DEMO_DATE) from 
DEMO_COUNT_DISTINCT T2 group by T2.DEMO_DATE) UNIQ_DATES FROM 
DEMO_COUNT_DISTINCT T
""").show

> COUNT(DISTINCT) window function should be supported
> ---
>
> Key: SPARK-30212
> URL: https://issues.apache.org/jira/browse/SPARK-30212
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.1.0
> Environment: Spark 2.4.4
> Scala 2.11.12
> Hive 2.3.6
>Reporter: Kernel Force
>Priority: Major
>
> Suppose we have a typical table in Hive like below:
> {code:sql}
> CREATE TABLE DEMO_COUNT_DISTINCT (
> demo_date string,
> demo_id string
> );
> {code}
> {noformat}
> ++--+
> | demo_count_distinct.demo_date | demo_count_distinct.demo_id |
> ++--+
> | 20180301 | 101 |
> | 20180301 | 102 |
> | 20180301 | 103 |
> | 20180401 | 201 |
> | 20180401 | 202 |
> ++--+
> {noformat}
> Now I want to count distinct number of DEMO_DATE but also reserve every 
> columns' data in each row.
> So I use COUNT(DISTINCT) window function (which is also common in other 
> mainstream databases like Oracle) in Hive beeline and it work:
> {code:sql}
> SELECT T.*, COUNT(DISTINCT T.DEMO_DATE) OVER(PARTITION BY NULL) UNIQ_DATES
>  FROM DEMO_COUNT_DISTINCT T;
> {code}
> {noformat}
> +--++-+
> | t.demo_date | t.demo_id | uniq_dates |
> +--++-+
> | 20180401 | 202 | 2 |
> | 20180401 | 201 | 2 |
> | 20180301 | 103 | 2 |
> | 20180301 | 102 | 2 |
> | 20180301 | 101 | 2 |
> +--++-+
> {noformat}
> But when I came to SparkSQL, it threw exception even if I run the same SQL.
> {code:sql}
> spark.sql("""
> SELECT T.*, COUNT(DISTINCT T.DEMO_DATE) OVER(PARTITION BY NULL) UNIQ_DATES
>  FROM DEMO_COUNT_DISTINCT T
> """).show
> {code}
> {noformat}
> org.apache.spark.sql.AnalysisException: Distinct window functions are not 
> supported: count(distinct DEMO_DATE#1) windowspecdefinition(null, 
> specifiedwindowframe(RowFrame, unboundedpreceding$(), 
> unboundedfollowing$()));;
> Project [demo_date#1, demo_id#2, UNIQ_DATES#0L]
> +- Project [demo_date#1, demo_id#2, UNIQ_DATES#0L, UNIQ_DATES#0L]
>  +- Window [count(distinct DEMO_DATE#1) windowspecdefinition(null, 
> specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) 
> AS UNIQ_DATES#0L], [null]
>  +- Project [demo_date#1, demo_id#2]
>  +- SubqueryAlias `T`
>  +- SubqueryAlias `default`.`demo_count_distinct`
>  +- HiveTableRelation `default`.`demo_count_distinct`, 
> org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [demo_date#1, demo_id#2]
> {noformat}
> Then I try to use countDistinct function but also got exceptions.
> {code:sql}
> spark.sql("""
> SELECT T.*, countDistinct(T.DEMO_DATE) OVER(PARTITION BY NULL) UNIQ_DATES
>  FROM DEMO_COUNT_DISTINCT T
> """).show
> {code}
> {noformat}
> org.apache.spark.sql.AnalysisException: Undefined function: 'countDistinct'. 
> This function is neither a registered temporary function nor a permanent 
> function registered in the database 'default'.; line 2 pos 12
>  at 
> org.apache.spark.sql.catalyst.analysis.Analyzer$LookupFunctions$$anonfun$apply$15$$anonfun$applyOrElse$49.apply(Analyzer.scala:1279)
>  at 
> org.apache.spark.sql.catalyst.analysis.Analyzer$LookupFunctions$$anonfun$apply$15$$anonfun$applyOrElse$49.apply(Analyzer.scala:1279)
>  at 
> org.apache.spark.sql.catalyst.analysis.package$.withPosition(package.scala:53)
>  ..
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-36604) timestamp type column analyze result is wrong

2023-03-02 Thread Ritika Maheshwari (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-36604?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17695869#comment-17695869
 ] 

Ritika Maheshwari commented on SPARK-36604:
---

Seems to be working correctly in  Spark 3.3.0

spark-sql> insert into a values(cast('2021-08-15 15:30:01' as timestamp)
         > );
23/03/02 11:04:11 WARN ObjectStore: Failed to get database global_temp, 
returning NoSuchObjectException
Time taken: 3.278 seconds
spark-sql> select * from a;
2021-08-15 15:30:01
Time taken: 0.782 seconds, Fetched 1 row(s)
spark-sql> analyze table a compute statistics for columns a;
Time taken: 1.882 seconds
spark-sql> desc formatted a a;
col_name        a
data_type       timestamp
comment NULL
min     2021-08-15 15:30:01.00 -0700
max     2021-08-15 15:30:01.00 -0700
num_nulls       0
distinct_count  1
avg_col_len     8
max_col_len     8
histogram       NULL
Time taken: 0.095 seconds, Fetched 10 row(s)
spark-sql> desc a;
a                       timestamp                                   
Time taken: 0.059 seconds, Fetched 1 row(s)
spark-sql>

> timestamp type column analyze result is wrong
> -
>
> Key: SPARK-36604
> URL: https://issues.apache.org/jira/browse/SPARK-36604
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.1.1, 3.1.2
> Environment: Spark 3.1.1
>Reporter: YuanGuanhu
>Priority: Major
>
> when we create table with timestamp column type, the min and max data of the 
> analyze result for the timestamp column is wrong
> eg:
> {code}
> > select * from a;
> {code}
> {code}
> 2021-08-15 15:30:01
> Time taken: 2.789 seconds, Fetched 1 row(s)
> spark-sql> desc formatted a a;
> col_name a
> data_type timestamp
> comment NULL
> min 2021-08-15 07:30:01.00
> max 2021-08-15 07:30:01.00
> num_nulls 0
> distinct_count 1
> avg_col_len 8
> max_col_len 8
> histogram NULL
> Time taken: 0.278 seconds, Fetched 10 row(s)
> spark-sql> desc a;
> a timestamp NULL
> Time taken: 1.432 seconds, Fetched 1 row(s)
> {code}
>  
> reproduce step:
> {code}
> create table a(a timestamp);
> insert into a select '2021-08-15 15:30:01';
> analyze table a compute statistics for columns a;
> desc formatted a a;
> select * from a;
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-40159) Aggregate should be group only after collapse project to aggregate

2023-03-01 Thread Ritika Maheshwari (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-40159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17695327#comment-17695327
 ] 

Ritika Maheshwari commented on SPARK-40159:
---

This issue seems to have been resolved by SPARK-38489

> Aggregate should be group only after collapse project to aggregate
> --
>
> Key: SPARK-40159
> URL: https://issues.apache.org/jira/browse/SPARK-40159
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: Wan Kun
>Priority: Major
>
> CollapseProject rule will merge project expressions into AggregateExpressions 
> in aggregate, which will make the *aggregate.groupOnly* to false.
> {code}
> val df = testData.distinct().select('key + 1, ('key + 1).cast("long"))
> df.queryExecution.optimizedPlan.collect {
>   case a: Aggregate => a
> }.foreach(agg => assert(agg.groupOnly === true)) 
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-41391) The output column name of `groupBy.agg(count_distinct)` is incorrect

2023-02-21 Thread Ritika Maheshwari (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-41391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17691928#comment-17691928
 ] 

Ritika Maheshwari commented on SPARK-41391:
---

I did send a PR

> The output column name of `groupBy.agg(count_distinct)` is incorrect
> 
>
> Key: SPARK-41391
> URL: https://issues.apache.org/jira/browse/SPARK-41391
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.2.0, 3.3.0, 3.4.0
>Reporter: Ruifeng Zheng
>Priority: Major
>
> scala> val df = spark.range(1, 10).withColumn("value", lit(1))
> df: org.apache.spark.sql.DataFrame = [id: bigint, value: int]
> scala> df.createOrReplaceTempView("table")
> scala> df.groupBy("id").agg(count_distinct($"value"))
> res1: org.apache.spark.sql.DataFrame = [id: bigint, count(value): bigint]
> scala> spark.sql(" SELECT id, COUNT(DISTINCT value) FROM table GROUP BY id ")
> res2: org.apache.spark.sql.DataFrame = [id: bigint, count(DISTINCT value): 
> bigint]
> scala> df.groupBy("id").agg(count_distinct($"*"))
> res3: org.apache.spark.sql.DataFrame = [id: bigint, count(unresolvedstar()): 
> bigint]
> scala> spark.sql(" SELECT id, COUNT(DISTINCT *) FROM table GROUP BY id ")
> res4: org.apache.spark.sql.DataFrame = [id: bigint, count(DISTINCT id, 
> value): bigint]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-37581) sql hang at planning stage

2023-02-05 Thread Ritika Maheshwari (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-37581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17684361#comment-17684361
 ] 

Ritika Maheshwari commented on SPARK-37581:
---

Just executed this on 3.3.0  and it takes 4.14 second. I am also including the 
plan. So does not seem like an issue in 3.3.0

 

> left join

         > (select dt,count(distinct kb_code) as h_kbs

         > from test.test_b

         > where dt = '20211126'

         > group by dt) t8

         > on calendar.dt = t8.dt

         > 

         > left join

         > (select dt,count(distinct kb_code) as i_kbs

         > from test.test_b

         > where dt = '20211126'

         > group by dt) t9

         > on calendar.dt = t9.dt

         > 

         > left join

         > (select dt,count(distinct kb_code) as j_kbs

         > from test.test_b

         > where dt = '20211126'

         > group by dt) t10

         > on calendar.dt = t10.dt

         > 

         > left join

         > (select dt,count(distinct kb_code) as k_kbs

         > from test.test_b

         > where dt = '20211126'

         > group by dt) t11

         > on calendar.dt = t11.dt;

Time taken: 0.609 seconds

23/02/05 16:10:47 WARN HiveMetaStore: Location: 
file:/Users/ritka/Documents/spark-3.3.0/spark-warehouse/test.db/test_c 
specified for non-external table:test_c

{color:#FF}*Time taken: 4.14 seconds*{color}

spark-sql> select * from test.test_c

         > ;

1 1 2 1 1 1 1 1 1 1 1 1 1 1

Time taken: 0.296 seconds, Fetched 1 row(s)

spark-sql> 

 

{color:#FF}*The plan for the query is* {color}

 

CommandResult Execute OptimizedCreateHiveTableAsSelectCommand [Database: test, 
TableName: test_c, InsertIntoHadoopFsRelationCommand]

   +- OptimizedCreateHiveTableAsSelectCommand [Database: test, TableName: 
test_c, InsertIntoHadoopFsRelationCommand]

      +- Project [day#11, week#12, weekday#13, a_kbs#0L, b_kbs#1L, c_kbs#2L, 
d_kbs#3L, e_kbs#4L, f_kbs#5L, g_kbs#6L, h_kbs#7L, i_kbs#8L, j_kbs#9L, k_kbs#10L]

         +- Join LeftOuter, (dt#14 = dt#366)

            :- Join LeftOuter, (dt#14 = dt#334)

            :  :- Join LeftOuter, (dt#14 = dt#302)

            :  :  :- Join LeftOuter, (dt#14 = dt#270)

            :  :  :  :- Join LeftOuter, (dt#14 = dt#238)

            :  :  :  :  :- Join LeftOuter, (dt#14 = dt#206)

            :  :  :  :  :  :- Join LeftOuter, (dt#14 = dt#174)

            :  :  :  :  :  :  :- Join LeftOuter, (dt#14 = dt#142)

            :  :  :  :  :  :  :  :- Join LeftOuter, (dt#14 = dt#110)

            :  :  :  :  :  :  :  :  :- Join LeftOuter, (dt#14 = dt#78)

            :  :  :  :  :  :  :  :  :  :- Join LeftOuter, (dt#14 = dt#46)

            :  :  :  :  :  :  :  :  :  :  :- SubqueryAlias calendar

            :  :  :  :  :  :  :  :  :  :  :  +- Project [day#11, week#12, 
weekday#13, dt#14]

            :  :  :  :  :  :  :  :  :  :  :     +- Filter (dt#14 = 20211126)

            :  :  :  :  :  :  :  :  :  :  :        +- SubqueryAlias 
spark_catalog.test.test_a

            :  :  :  :  :  :  :  :  :  :  :           +- Relation 
test.test_a[day#11,week#12,weekday#13,dt#14] orc

            :  :  :  :  :  :  :  :  :  :  +- SubqueryAlias t1

            :  :  :  :  :  :  :  :  :  :     +- Aggregate [dt#46], [dt#46, 
count(distinct kb_code#40) AS a_kbs#0L]

            :  :  :  :  :  :  :  :  :  :        +- Filter (dt#46 = 20211126)

            :  :  :  :  :  :  :  :  :  :           +- SubqueryAlias 
spark_catalog.test.test_b

            :  :  :  :  :  :  :  :  :  :              +- Relation 
test.test_b[session_id#15,device_id#16,brand#17,model#18,wx_version#19,os#20,net_work_type#21,app_id#22,app_name#23,col_z#24,page_url#25,page_title#26,olabel#27,otitle#28,source#29,send_dt#30,recv_dt#31,request_time#32,write_time#33,client_ip#34,col_a#35,dt_hour#36,product#37,channelfrom#38,...
 8 more fields] orc

            :  :  :  :  :  :  :  :  :  +- SubqueryAlias t2

            :  :  :  :  :  :  :  :  :     +- Aggregate [dt#78], [dt#78, 
count(distinct kb_code#72) AS b_kbs#1L]

            :  :  :  :  :  :  :  :  :        +- Filter (dt#78 = 20211126)

            :  :  :  :  :  :  :  :  :           +- SubqueryAlias 
spark_catalog.test.test_b

            :  :  :  :  :  :  :  :  :              +- Relation 
test.test_b[session_id#47,device_id#48,brand#49,model#50,wx_version#51,os#52,net_work_type#53,app_id#54,app_name#55,col_z#56,page_url#57,page_title#58,olabel#59,otitle#60,source#61,send_dt#62,recv_dt#63,request_time#64,write_time#65,client_ip#66,col_a#67,dt_hour#68,product#69,channelfrom#70,...
 8 more fields] orc

            :  :  :  :  :  :  :  :  +- SubqueryAlias t3

            :  :  :  :  :  :  :  :     +- Aggregate [dt#110], [dt#110, 
count(distinct kb_code#104) AS c_kbs#2L]

            :  :  :  :  :  :  :  :        +- Filter (dt#110 = 20211126)

            :  :  :  :  :  :  :  :           +- SubqueryAlias 
spark_catalog.test.test_b


[jira] [Commented] (SPARK-42346) distinct(count colname) with UNION ALL causes query analyzer bug

2023-02-06 Thread Ritika Maheshwari (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-42346?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17684992#comment-17684992
 ] 

Ritika Maheshwari commented on SPARK-42346:
---

I have Spark 3.3.0 and I do not have 39887 fix . I am not able to reproduce 
this issue. Am I missing something?

 

scala> val df = Seq(("a","b")).toDF("surname","first_name")

*df*: *org.apache.spark.sql.DataFrame* = [surname: string, first_name: string]

 

scala> df.createOrReplaceTempView("input_table")

 

scala> spark.sql("select(Select Count(Distinct first_name) from input_table) As 
distinct_value_count from input_table Union all select (select count(Distinct 
surname) from input_table) as distinct_value_count from input_table").show()

++                                                          

|distinct_value_count|

++

|                   1|

|                   1|

++

= Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Union
   :- Project [cast(Subquery subquery#46, [id=#114] as string) AS 
distinct_value_count#62]
   :  :  +- Subquery subquery#46, [id=#114]
   :  :     +- AdaptiveSparkPlan isFinalPlan=false
   :  :        +- HashAggregate(keys=[], functions=[count(first_name#12)], 
output=[count(DISTINCT first_name)#53L])
   :  :           +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#112]
   :  :              +- HashAggregate(keys=[], 
functions=[partial_count(first_name#12)], output=[count#67L])
   :  :                 +- LocalTableScan [first_name#12]
   :  +- LocalTableScan [_1#6, _2#7]
   +- Project [cast(Subquery subquery#48, [id=#125] as string) AS 
distinct_value_count#64]
      :  +- Subquery subquery#48, [id=#125]
      :     +- AdaptiveSparkPlan isFinalPlan=false
      :        +- HashAggregate(keys=[], functions=[count(surname#11)], 
output=[count(DISTINCT surname)#55L])
      :           +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#123]
      :              +- HashAggregate(keys=[], 
functions=[partial_count(surname#11)], output=[count#68L])
      :                 +- LocalTableScan [surname#11]
      +- LocalTableScan [_1#50, _2#51|#50, _2#51]


 

This is what I have in my SparkOptimizer.scala


override def defaultBatches: Seq[Batch] = (preOptimizationBatches ++ 
super.defaultBatches :+
Batch("Optimize Metadata Only Query", Once, OptimizeMetadataOnlyQuery(catalog)) 
:+
Batch("PartitionPruning", Once,
PartitionPruning) :+
Batch("InjectRuntimeFilter", FixedPoint(1),
InjectRuntimeFilter,
RewritePredicateSubquery) :+
Batch("MergeScalarSubqueries", Once,
MergeScalarSubqueries) :+
Batch("Pushdown Filters from PartitionPruning", fixedPoint,
PushDownPredicates) :+
Batch

> distinct(count colname) with UNION ALL causes query analyzer bug
> 
>
> Key: SPARK-42346
> URL: https://issues.apache.org/jira/browse/SPARK-42346
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.3.0, 3.4.0, 3.5.0
>Reporter: Robin
>Assignee: Peter Toth
>Priority: Major
> Fix For: 3.3.2, 3.4.0, 3.5.0
>
>
> If you combine a UNION ALL with a count(distinct colname) you get a query 
> analyzer bug.
>  
> This behaviour is introduced in 3.3.0.  The bug was not present in 3.2.1.
>  
> Here is a reprex in PySpark:
> {{df_pd = pd.DataFrame([}}
> {{    \{'surname': 'a', 'first_name': 'b'}}}
> {{])}}
> {{df_spark = spark.createDataFrame(df_pd)}}
> {{df_spark.createOrReplaceTempView("input_table")}}
> {{sql = """}}
> {{SELECT }}
> {{    (SELECT Count(DISTINCT first_name) FROM   input_table) }}
> {{        AS distinct_value_count}}
> {{FROM   input_table}}
> {{UNION ALL}}
> {{SELECT }}
> {{    (SELECT Count(DISTINCT surname) FROM   input_table) }}
> {{        AS distinct_value_count}}
> {{FROM   input_table """}}
> {{spark.sql(sql).toPandas()}}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-42346) distinct(count colname) with UNION ALL causes query analyzer bug

2023-02-08 Thread Ritika Maheshwari (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-42346?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17686081#comment-17686081
 ] 

Ritika Maheshwari commented on SPARK-42346:
---

Yes that caused the error to appear. Thanks

> distinct(count colname) with UNION ALL causes query analyzer bug
> 
>
> Key: SPARK-42346
> URL: https://issues.apache.org/jira/browse/SPARK-42346
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.3.0, 3.4.0, 3.5.0
>Reporter: Robin
>Assignee: Peter Toth
>Priority: Major
> Fix For: 3.3.2, 3.4.0, 3.5.0
>
>
> If you combine a UNION ALL with a count(distinct colname) you get a query 
> analyzer bug.
>  
> This behaviour is introduced in 3.3.0.  The bug was not present in 3.2.1.
>  
> Here is a reprex in PySpark:
> {{df_pd = pd.DataFrame([}}
> {{    \{'surname': 'a', 'first_name': 'b'}}}
> {{])}}
> {{df_spark = spark.createDataFrame(df_pd)}}
> {{df_spark.createOrReplaceTempView("input_table")}}
> {{sql = """}}
> {{SELECT }}
> {{    (SELECT Count(DISTINCT first_name) FROM   input_table) }}
> {{        AS distinct_value_count}}
> {{FROM   input_table}}
> {{UNION ALL}}
> {{SELECT }}
> {{    (SELECT Count(DISTINCT surname) FROM   input_table) }}
> {{        AS distinct_value_count}}
> {{FROM   input_table """}}
> {{spark.sql(sql).toPandas()}}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-42346) distinct(count colname) with UNION ALL causes query analyzer bug

2023-02-07 Thread Ritika Maheshwari (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-42346?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17685466#comment-17685466
 ] 

Ritika Maheshwari commented on SPARK-42346:
---

Hello added three rows to input_table. Still no error. I do have DPP enabled.

*

Using Scala version 2.12.15 (Java HotSpot(TM) 64-Bit Server VM, Java 12.0.2)

Type in expressions to have them evaluated.

Type :help for more information.

 

scala> val df = Seq(("a","b"),("c","d"),("e","f")).toDF("surname","first_name")

*df*: *org.apache.spark.sql.DataFrame* = [surname: string, first_name: string]

 

scala> df.createOrReplaceTempView("input_table")

 

scala> spark.sql("select(Select Count(Distinct first_name) from input_table) As 
distinct_value_count from input_table Union all select (select count(Distinct 
surname) from input_table) as distinct_value_count from input_table").show()

++                                                          

|distinct_value_count|

++

|                   3|

|                   3|

|                   3|

|                   3|

|                   3|

|                   3|

++

 

**

AdaptiveSparkPlan isFinalPlan=false
+- Union
   :- Project [cast(Subquery subquery#145, [id=#571] as string) AS 
distinct_value_count#161]
   :  :  +- Subquery subquery#145, [id=#571]
   :  :     +- AdaptiveSparkPlan isFinalPlan=false
   :  :        +- HashAggregate(keys=[], functions=[count(distinct 
first_name#8)], output=[count(DISTINCT first_name)#152L])
   :  :           +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#569]
   :  :              +- HashAggregate(keys=[], 
functions=[partial_count(distinct first_name#8)], output=[count#167L])
   :  :                 +- HashAggregate(keys=[first_name#8], functions=[], 
output=[first_name#8])
   :  :                    +- Exchange hashpartitioning(first_name#8, 200), 
ENSURE_REQUIREMENTS, [id=#565]
   :  :                       +- HashAggregate(keys=[first_name#8], 
functions=[], output=[first_name#8])
   :  :                          +- LocalTableScan [first_name#8]
   :  +- LocalTableScan [_1#2, _2#3]
   +- Project [cast(Subquery subquery#147, [id=#590] as string) AS 
distinct_value_count#163]
      :  +- Subquery subquery#147, [id=#590]
      :     +- AdaptiveSparkPlan isFinalPlan=false
      :        +- HashAggregate(keys=[], functions=[count(distinct surname#7)], 
output=[count(DISTINCT surname)#154L])
      :           +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#588]
      :              +- HashAggregate(keys=[], 
functions=[partial_count(distinct surname#7)], output=[count#170L])
      :                 +- HashAggregate(keys=[surname#7], functions=[], 
output=[surname#7])
      :                    +- Exchange hashpartitioning(surname#7, 200), 
ENSURE_REQUIREMENTS, [id=#584]
      :                       +- HashAggregate(keys=[surname#7], functions=[], 
output=[surname#7])
      :                          +- LocalTableScan [surname#7]
      +- LocalTableScan [_1#149, _2#150]

> distinct(count colname) with UNION ALL causes query analyzer bug
> 
>
> Key: SPARK-42346
> URL: https://issues.apache.org/jira/browse/SPARK-42346
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.3.0, 3.4.0, 3.5.0
>Reporter: Robin
>Assignee: Peter Toth
>Priority: Major
> Fix For: 3.3.2, 3.4.0, 3.5.0
>
>
> If you combine a UNION ALL with a count(distinct colname) you get a query 
> analyzer bug.
>  
> This behaviour is introduced in 3.3.0.  The bug was not present in 3.2.1.
>  
> Here is a reprex in PySpark:
> {{df_pd = pd.DataFrame([}}
> {{    \{'surname': 'a', 'first_name': 'b'}}}
> {{])}}
> {{df_spark = spark.createDataFrame(df_pd)}}
> {{df_spark.createOrReplaceTempView("input_table")}}
> {{sql = """}}
> {{SELECT }}
> {{    (SELECT Count(DISTINCT first_name) FROM   input_table) }}
> {{        AS distinct_value_count}}
> {{FROM   input_table}}
> {{UNION ALL}}
> {{SELECT }}
> {{    (SELECT Count(DISTINCT surname) FROM   input_table) }}
> {{        AS distinct_value_count}}
> {{FROM   input_table """}}
> {{spark.sql(sql).toPandas()}}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-41391) The output column name of `groupBy.agg(count_distinct)` is incorrect

2023-01-31 Thread Ritika Maheshwari (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-41391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17682843#comment-17682843
 ] 

Ritika Maheshwari commented on SPARK-41391:
---

Modifying the agg method in RelationalGroupedDataset.scala to

@scala.annotation.varargs
def agg(expr: Column, exprs: Column*): DataFrame = {
toDF((expr +: exprs).map {
case typed: TypedColumn[_, _] =>
typed.withInputType(df.exprEnc, df.logicalPlan.output).expr
case f: Column =>
if (f.expr != null &&
f.expr.isInstanceOf[UnresolvedFunction] &&
f.expr.asInstanceOf[UnresolvedFunction].arguments(0).isInstanceOf[UnresolvedStar])
 {
f.expr.asInstanceOf[UnresolvedFunction].childrenResolved
f.expr.asInstanceOf[UnresolvedFunction].copy(arguments = df.numericColumns)
} else {
f.expr
}
case c => c.expr
})
}

Seems to work though I am not able to get the distinct in the count function.

scala> df.groupBy("id").agg(count_distinct($"*"))

*res7*: *org.apache.spark.sql.DataFrame* = [id: bigint, COUNT(id, value): 
bigint]

Some how the alias formed for this function is missing the distinct key word too

'COUNT(distinct id#0L, value#2) AS COUNT(id, value)#47

Any ideas why distinct is not showing up? Also is this a possible solution for 
the unresolvedstar? We resolve the unresolvedstar in the RelationGroupedDataset 
rather than in the Analyzer.

> The output column name of `groupBy.agg(count_distinct)` is incorrect
> 
>
> Key: SPARK-41391
> URL: https://issues.apache.org/jira/browse/SPARK-41391
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.2.0, 3.3.0, 3.4.0
>Reporter: Ruifeng Zheng
>Priority: Major
>
> scala> val df = spark.range(1, 10).withColumn("value", lit(1))
> df: org.apache.spark.sql.DataFrame = [id: bigint, value: int]
> scala> df.createOrReplaceTempView("table")
> scala> df.groupBy("id").agg(count_distinct($"value"))
> res1: org.apache.spark.sql.DataFrame = [id: bigint, count(value): bigint]
> scala> spark.sql(" SELECT id, COUNT(DISTINCT value) FROM table GROUP BY id ")
> res2: org.apache.spark.sql.DataFrame = [id: bigint, count(DISTINCT value): 
> bigint]
> scala> df.groupBy("id").agg(count_distinct($"*"))
> res3: org.apache.spark.sql.DataFrame = [id: bigint, count(unresolvedstar()): 
> bigint]
> scala> spark.sql(" SELECT id, COUNT(DISTINCT *) FROM table GROUP BY id ")
> res4: org.apache.spark.sql.DataFrame = [id: bigint, count(DISTINCT id, 
> value): bigint]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-43514) Unexpected NullPointerException or IllegalArgumentException inside UDFs of ML features caused by certain SQL functions

2023-06-07 Thread Ritika Maheshwari (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-43514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17730287#comment-17730287
 ] 

Ritika Maheshwari commented on SPARK-43514:
---

I had deleted the comment because it was incorrect. I could recreate the issue 
in 3.3.0 as well. Once we exclude the ConvertToLocalRelation rule it works both 
in 3.3.0 and 3.4.0.

By excluding this rule the Filter clause does not go through optimization rules 
and it was the Filter clause that was throwing the Null pointer exception.

> Unexpected NullPointerException or IllegalArgumentException inside UDFs of ML 
> features caused by certain SQL functions
> --
>
> Key: SPARK-43514
> URL: https://issues.apache.org/jira/browse/SPARK-43514
> Project: Spark
>  Issue Type: Bug
>  Components: ML, SQL
>Affects Versions: 3.3.2, 3.4.0
> Environment: Scala version: 2.12.17
> Test examples were executed inside Zeppelin 0.10.1 with Spark 3.4.0.
> Spark 3.3.2 deployed on cluster was used to check the issue on real data.
>Reporter: Svyatoslav Semenyuk
>Priority: Major
>  Labels: ml, sql
> Attachments: 2023-05-30 13-47-04.mp4, Plan.png, Screen Shot 
> 2023-05-22 at 5.39.55 PM.png, Screen Shot 2023-05-31 at 11.14.24 PM.png, 
> Test.scala
>
>
> We designed a function that joins two DFs on common column with some 
> similarity. All next code will be on Scala 2.12.
> I've added {{show}} calls for demonstration purposes.
> {code:scala}
> import org.apache.spark.ml.Pipeline
> import org.apache.spark.ml.feature.{HashingTF, MinHashLSH, NGram, 
> RegexTokenizer, MinHashLSHModel}
> import org.apache.spark.sql.{DataFrame, Column}
> /**
>  * Joins two dataframes on a string column using LSH algorithm for similarity 
> computation.
>  *
>  * The output dataframe has three columns:
>  *
>  *   - `datasetA`, data from the left dataframe.
>  *   - `datasetB`, data from the right dataframe.
>  *   - `distance`, abstract distance between values of `joinColumn` from 
> datasets;
>  * the lower `distance` value, the more similar `joinColumn` values are.
>  */
> def similarityJoin(
>   leftDf: DataFrame,
>   rightDf: DataFrame,
>   joinColumn: String,
>   threshold: Double = 0.8,
> ): DataFrame = {
>   leftDf.show(false)
>   rightDf.show(false)
>   val pipeline = new Pipeline().setStages(Array(
> new RegexTokenizer()
>   .setPattern("")
>   .setMinTokenLength(1)
>   .setInputCol(joinColumn)
>   .setOutputCol("tokens"),
> new NGram().setN(3).setInputCol("tokens").setOutputCol("ngrams"),
> new HashingTF().setInputCol("ngrams").setOutputCol("vectors"),
> new MinHashLSH().setInputCol("vectors").setOutputCol("lsh"),
>   ))
>   val model = pipeline.fit(leftDf)
>   val storedHashed = model.transform(leftDf)
>   val landedHashed = model.transform(rightDf)
>   def columnMapper(dsName: String)(c: String): Column = col(s"$dsName.$c") as 
> c
>   val result = model
> .stages
> .last
> .asInstanceOf[MinHashLSHModel]
> .approxSimilarityJoin(storedHashed, landedHashed, threshold, "distance")
> .withColumn("datasetA", 
> struct(leftDf.columns.map(columnMapper("datasetA")).toSeq: _*))
> .withColumn("datasetB", 
> struct(rightDf.columns.map(columnMapper("datasetB")).toSeq: _*))
>   result.show(false)
>   result
> }
> {code}
> Now consider such simple example:
> {code:scala}
> val inputDF1 = Seq("", null).toDF("name").filter(length($"name") > 2) as "df1"
> val inputDF2 = Seq("", null).toDF("name").filter(length($"name") > 2) as "df2"
> similarityJoin(inputDF1, inputDF2, "name", 0.6)
> {code}
> This example runs with no errors and outputs 3 empty DFs. Let's add 
> {{distinct}} method to one data frame:
> {code:scala}
> val inputDF1 = Seq("", null).toDF("name").distinct().filter(length($"name") > 
> 2) as "df1"
> val inputDF2 = Seq("", null).toDF("name").filter(length($"name") > 2) as "df2"
> similarityJoin(inputDF1, inputDF2, "name", 0.6)
> {code}
> This example outputs two empty DFs and then fails at {{result.show(false)}}. 
> Error:
> {code:none}
> org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user 
> defined function (LSHModel$$Lambda$3769/0x000101804840: 
> (struct,values:array>) => 
> array,values:array>>).
>   ... many elided
> Caused by: java.lang.IllegalArgumentException: requirement failed: Must have 
> at least 1 non zero entry.
>   at scala.Predef$.require(Predef.scala:281)
>   at 
> org.apache.spark.ml.feature.MinHashLSHModel.hashFunction(MinHashLSH.scala:61)
>   at org.apache.spark.ml.feature.LSHModel.$anonfun$transform$1(LSH.scala:99)
>   ... many more
> {code}
> 
> Now let's take a look on the example which is close to our application code. 
> Define some helper functions:
> 

[jira] [Commented] (SPARK-40296) Error Class for DISTINCT function not found

2023-07-18 Thread Ritika Maheshwari (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-40296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17744346#comment-17744346
 ] 

Ritika Maheshwari commented on SPARK-40296:
---

Isn't dropDuplicates taking care of applying distinct to multiple columns?

> Error Class for DISTINCT function not found
> ---
>
> Key: SPARK-40296
> URL: https://issues.apache.org/jira/browse/SPARK-40296
> Project: Spark
>  Issue Type: Task
>  Components: SQL
>Affects Versions: 3.4.0
>Reporter: Rui Wang
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-43514) Unexpected NullPointerException or IllegalArgumentException inside UDFs of ML features caused by certain SQL functions

2023-06-07 Thread Ritika Maheshwari (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-43514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17730377#comment-17730377
 ] 

Ritika Maheshwari commented on SPARK-43514:
---

Please provide the code to recreate the issue. 

> Unexpected NullPointerException or IllegalArgumentException inside UDFs of ML 
> features caused by certain SQL functions
> --
>
> Key: SPARK-43514
> URL: https://issues.apache.org/jira/browse/SPARK-43514
> Project: Spark
>  Issue Type: Bug
>  Components: ML, SQL
>Affects Versions: 3.3.2, 3.4.0
> Environment: Scala version: 2.12.17
> Test examples were executed inside Zeppelin 0.10.1 with Spark 3.4.0.
> Spark 3.3.2 deployed on cluster was used to check the issue on real data.
>Reporter: Svyatoslav Semenyuk
>Priority: Major
>  Labels: ml, sql
> Attachments: 2023-05-30 13-47-04.mp4, Plan.png, Screen Shot 
> 2023-05-22 at 5.39.55 PM.png, Screen Shot 2023-05-31 at 11.14.24 PM.png, 
> Test.scala
>
>
> We designed a function that joins two DFs on common column with some 
> similarity. All next code will be on Scala 2.12.
> I've added {{show}} calls for demonstration purposes.
> {code:scala}
> import org.apache.spark.ml.Pipeline
> import org.apache.spark.ml.feature.{HashingTF, MinHashLSH, NGram, 
> RegexTokenizer, MinHashLSHModel}
> import org.apache.spark.sql.{DataFrame, Column}
> /**
>  * Joins two dataframes on a string column using LSH algorithm for similarity 
> computation.
>  *
>  * The output dataframe has three columns:
>  *
>  *   - `datasetA`, data from the left dataframe.
>  *   - `datasetB`, data from the right dataframe.
>  *   - `distance`, abstract distance between values of `joinColumn` from 
> datasets;
>  * the lower `distance` value, the more similar `joinColumn` values are.
>  */
> def similarityJoin(
>   leftDf: DataFrame,
>   rightDf: DataFrame,
>   joinColumn: String,
>   threshold: Double = 0.8,
> ): DataFrame = {
>   leftDf.show(false)
>   rightDf.show(false)
>   val pipeline = new Pipeline().setStages(Array(
> new RegexTokenizer()
>   .setPattern("")
>   .setMinTokenLength(1)
>   .setInputCol(joinColumn)
>   .setOutputCol("tokens"),
> new NGram().setN(3).setInputCol("tokens").setOutputCol("ngrams"),
> new HashingTF().setInputCol("ngrams").setOutputCol("vectors"),
> new MinHashLSH().setInputCol("vectors").setOutputCol("lsh"),
>   ))
>   val model = pipeline.fit(leftDf)
>   val storedHashed = model.transform(leftDf)
>   val landedHashed = model.transform(rightDf)
>   def columnMapper(dsName: String)(c: String): Column = col(s"$dsName.$c") as 
> c
>   val result = model
> .stages
> .last
> .asInstanceOf[MinHashLSHModel]
> .approxSimilarityJoin(storedHashed, landedHashed, threshold, "distance")
> .withColumn("datasetA", 
> struct(leftDf.columns.map(columnMapper("datasetA")).toSeq: _*))
> .withColumn("datasetB", 
> struct(rightDf.columns.map(columnMapper("datasetB")).toSeq: _*))
>   result.show(false)
>   result
> }
> {code}
> Now consider such simple example:
> {code:scala}
> val inputDF1 = Seq("", null).toDF("name").filter(length($"name") > 2) as "df1"
> val inputDF2 = Seq("", null).toDF("name").filter(length($"name") > 2) as "df2"
> similarityJoin(inputDF1, inputDF2, "name", 0.6)
> {code}
> This example runs with no errors and outputs 3 empty DFs. Let's add 
> {{distinct}} method to one data frame:
> {code:scala}
> val inputDF1 = Seq("", null).toDF("name").distinct().filter(length($"name") > 
> 2) as "df1"
> val inputDF2 = Seq("", null).toDF("name").filter(length($"name") > 2) as "df2"
> similarityJoin(inputDF1, inputDF2, "name", 0.6)
> {code}
> This example outputs two empty DFs and then fails at {{result.show(false)}}. 
> Error:
> {code:none}
> org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user 
> defined function (LSHModel$$Lambda$3769/0x000101804840: 
> (struct,values:array>) => 
> array,values:array>>).
>   ... many elided
> Caused by: java.lang.IllegalArgumentException: requirement failed: Must have 
> at least 1 non zero entry.
>   at scala.Predef$.require(Predef.scala:281)
>   at 
> org.apache.spark.ml.feature.MinHashLSHModel.hashFunction(MinHashLSH.scala:61)
>   at org.apache.spark.ml.feature.LSHModel.$anonfun$transform$1(LSH.scala:99)
>   ... many more
> {code}
> 
> Now let's take a look on the example which is close to our application code. 
> Define some helper functions:
> {code:scala}
> import org.apache.spark.sql.functions._
> def process1(df: DataFrame): Unit = {
> val companies = df.select($"id", $"name")
> val directors = df
> .select(explode($"directors"))
> .select($"col.name", $"col.id")
> 

[jira] [Commented] (SPARK-43514) Unexpected NullPointerException or IllegalArgumentException inside UDFs of ML features caused by certain SQL functions

2023-06-08 Thread Ritika Maheshwari (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-43514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17730623#comment-17730623
 ] 

Ritika Maheshwari commented on SPARK-43514:
---

Please refer to Spark Properties section in the documentation to see how to set 
the excludeRule property on SparkConf
https://spark.apache.org/docs/latest/configuration.html#spark-properties

> Unexpected NullPointerException or IllegalArgumentException inside UDFs of ML 
> features caused by certain SQL functions
> --
>
> Key: SPARK-43514
> URL: https://issues.apache.org/jira/browse/SPARK-43514
> Project: Spark
>  Issue Type: Bug
>  Components: ML, SQL
>Affects Versions: 3.3.2, 3.4.0
> Environment: Scala version: 2.12.17
> Test examples were executed inside Zeppelin 0.10.1 with Spark 3.4.0.
> Spark 3.3.2 deployed on cluster was used to check the issue on real data.
>Reporter: Svyatoslav Semenyuk
>Priority: Major
>  Labels: ml, sql
> Attachments: 2023-05-30 13-47-04.mp4, Plan.png, Screen Shot 
> 2023-05-22 at 5.39.55 PM.png, Screen Shot 2023-05-31 at 11.14.24 PM.png, 
> Test.scala
>
>
> We designed a function that joins two DFs on common column with some 
> similarity. All next code will be on Scala 2.12.
> I've added {{show}} calls for demonstration purposes.
> {code:scala}
> import org.apache.spark.ml.Pipeline
> import org.apache.spark.ml.feature.{HashingTF, MinHashLSH, NGram, 
> RegexTokenizer, MinHashLSHModel}
> import org.apache.spark.sql.{DataFrame, Column}
> /**
>  * Joins two dataframes on a string column using LSH algorithm for similarity 
> computation.
>  *
>  * The output dataframe has three columns:
>  *
>  *   - `datasetA`, data from the left dataframe.
>  *   - `datasetB`, data from the right dataframe.
>  *   - `distance`, abstract distance between values of `joinColumn` from 
> datasets;
>  * the lower `distance` value, the more similar `joinColumn` values are.
>  */
> def similarityJoin(
>   leftDf: DataFrame,
>   rightDf: DataFrame,
>   joinColumn: String,
>   threshold: Double = 0.8,
> ): DataFrame = {
>   leftDf.show(false)
>   rightDf.show(false)
>   val pipeline = new Pipeline().setStages(Array(
> new RegexTokenizer()
>   .setPattern("")
>   .setMinTokenLength(1)
>   .setInputCol(joinColumn)
>   .setOutputCol("tokens"),
> new NGram().setN(3).setInputCol("tokens").setOutputCol("ngrams"),
> new HashingTF().setInputCol("ngrams").setOutputCol("vectors"),
> new MinHashLSH().setInputCol("vectors").setOutputCol("lsh"),
>   ))
>   val model = pipeline.fit(leftDf)
>   val storedHashed = model.transform(leftDf)
>   val landedHashed = model.transform(rightDf)
>   def columnMapper(dsName: String)(c: String): Column = col(s"$dsName.$c") as 
> c
>   val result = model
> .stages
> .last
> .asInstanceOf[MinHashLSHModel]
> .approxSimilarityJoin(storedHashed, landedHashed, threshold, "distance")
> .withColumn("datasetA", 
> struct(leftDf.columns.map(columnMapper("datasetA")).toSeq: _*))
> .withColumn("datasetB", 
> struct(rightDf.columns.map(columnMapper("datasetB")).toSeq: _*))
>   result.show(false)
>   result
> }
> {code}
> Now consider such simple example:
> {code:scala}
> val inputDF1 = Seq("", null).toDF("name").filter(length($"name") > 2) as "df1"
> val inputDF2 = Seq("", null).toDF("name").filter(length($"name") > 2) as "df2"
> similarityJoin(inputDF1, inputDF2, "name", 0.6)
> {code}
> This example runs with no errors and outputs 3 empty DFs. Let's add 
> {{distinct}} method to one data frame:
> {code:scala}
> val inputDF1 = Seq("", null).toDF("name").distinct().filter(length($"name") > 
> 2) as "df1"
> val inputDF2 = Seq("", null).toDF("name").filter(length($"name") > 2) as "df2"
> similarityJoin(inputDF1, inputDF2, "name", 0.6)
> {code}
> This example outputs two empty DFs and then fails at {{result.show(false)}}. 
> Error:
> {code:none}
> org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user 
> defined function (LSHModel$$Lambda$3769/0x000101804840: 
> (struct,values:array>) => 
> array,values:array>>).
>   ... many elided
> Caused by: java.lang.IllegalArgumentException: requirement failed: Must have 
> at least 1 non zero entry.
>   at scala.Predef$.require(Predef.scala:281)
>   at 
> org.apache.spark.ml.feature.MinHashLSHModel.hashFunction(MinHashLSH.scala:61)
>   at org.apache.spark.ml.feature.LSHModel.$anonfun$transform$1(LSH.scala:99)
>   ... many more
> {code}
> 
> Now let's take a look on the example which is close to our application code. 
> Define some helper functions:
> {code:scala}
> import org.apache.spark.sql.functions._
> def process1(df: DataFrame): Unit = {
> val companies = df.select($"id", $"name")
> 

[jira] [Updated] (SPARK-43514) Unexpected NullPointerException or IllegalArgumentException inside UDFs of ML features caused by certain SQL functions

2023-06-07 Thread Ritika Maheshwari (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-43514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ritika Maheshwari updated SPARK-43514:
--
Attachment: Plan.png

> Unexpected NullPointerException or IllegalArgumentException inside UDFs of ML 
> features caused by certain SQL functions
> --
>
> Key: SPARK-43514
> URL: https://issues.apache.org/jira/browse/SPARK-43514
> Project: Spark
>  Issue Type: Bug
>  Components: ML, SQL
>Affects Versions: 3.3.2, 3.4.0
> Environment: Scala version: 2.12.17
> Test examples were executed inside Zeppelin 0.10.1 with Spark 3.4.0.
> Spark 3.3.2 deployed on cluster was used to check the issue on real data.
>Reporter: Svyatoslav Semenyuk
>Priority: Major
>  Labels: ml, sql
> Attachments: 2023-05-30 13-47-04.mp4, Plan.png, Screen Shot 
> 2023-05-22 at 5.39.55 PM.png, Screen Shot 2023-05-31 at 11.14.24 PM.png, 
> Test.scala
>
>
> We designed a function that joins two DFs on common column with some 
> similarity. All next code will be on Scala 2.12.
> I've added {{show}} calls for demonstration purposes.
> {code:scala}
> import org.apache.spark.ml.Pipeline
> import org.apache.spark.ml.feature.{HashingTF, MinHashLSH, NGram, 
> RegexTokenizer, MinHashLSHModel}
> import org.apache.spark.sql.{DataFrame, Column}
> /**
>  * Joins two dataframes on a string column using LSH algorithm for similarity 
> computation.
>  *
>  * The output dataframe has three columns:
>  *
>  *   - `datasetA`, data from the left dataframe.
>  *   - `datasetB`, data from the right dataframe.
>  *   - `distance`, abstract distance between values of `joinColumn` from 
> datasets;
>  * the lower `distance` value, the more similar `joinColumn` values are.
>  */
> def similarityJoin(
>   leftDf: DataFrame,
>   rightDf: DataFrame,
>   joinColumn: String,
>   threshold: Double = 0.8,
> ): DataFrame = {
>   leftDf.show(false)
>   rightDf.show(false)
>   val pipeline = new Pipeline().setStages(Array(
> new RegexTokenizer()
>   .setPattern("")
>   .setMinTokenLength(1)
>   .setInputCol(joinColumn)
>   .setOutputCol("tokens"),
> new NGram().setN(3).setInputCol("tokens").setOutputCol("ngrams"),
> new HashingTF().setInputCol("ngrams").setOutputCol("vectors"),
> new MinHashLSH().setInputCol("vectors").setOutputCol("lsh"),
>   ))
>   val model = pipeline.fit(leftDf)
>   val storedHashed = model.transform(leftDf)
>   val landedHashed = model.transform(rightDf)
>   def columnMapper(dsName: String)(c: String): Column = col(s"$dsName.$c") as 
> c
>   val result = model
> .stages
> .last
> .asInstanceOf[MinHashLSHModel]
> .approxSimilarityJoin(storedHashed, landedHashed, threshold, "distance")
> .withColumn("datasetA", 
> struct(leftDf.columns.map(columnMapper("datasetA")).toSeq: _*))
> .withColumn("datasetB", 
> struct(rightDf.columns.map(columnMapper("datasetB")).toSeq: _*))
>   result.show(false)
>   result
> }
> {code}
> Now consider such simple example:
> {code:scala}
> val inputDF1 = Seq("", null).toDF("name").filter(length($"name") > 2) as "df1"
> val inputDF2 = Seq("", null).toDF("name").filter(length($"name") > 2) as "df2"
> similarityJoin(inputDF1, inputDF2, "name", 0.6)
> {code}
> This example runs with no errors and outputs 3 empty DFs. Let's add 
> {{distinct}} method to one data frame:
> {code:scala}
> val inputDF1 = Seq("", null).toDF("name").distinct().filter(length($"name") > 
> 2) as "df1"
> val inputDF2 = Seq("", null).toDF("name").filter(length($"name") > 2) as "df2"
> similarityJoin(inputDF1, inputDF2, "name", 0.6)
> {code}
> This example outputs two empty DFs and then fails at {{result.show(false)}}. 
> Error:
> {code:none}
> org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user 
> defined function (LSHModel$$Lambda$3769/0x000101804840: 
> (struct,values:array>) => 
> array,values:array>>).
>   ... many elided
> Caused by: java.lang.IllegalArgumentException: requirement failed: Must have 
> at least 1 non zero entry.
>   at scala.Predef$.require(Predef.scala:281)
>   at 
> org.apache.spark.ml.feature.MinHashLSHModel.hashFunction(MinHashLSH.scala:61)
>   at org.apache.spark.ml.feature.LSHModel.$anonfun$transform$1(LSH.scala:99)
>   ... many more
> {code}
> 
> Now let's take a look on the example which is close to our application code. 
> Define some helper functions:
> {code:scala}
> import org.apache.spark.sql.functions._
> def process1(df: DataFrame): Unit = {
> val companies = df.select($"id", $"name")
> val directors = df
> .select(explode($"directors"))
> .select($"col.name", $"col.id")
> .dropDuplicates("id")
> val toBeMatched1 = companies
> 

[jira] [Commented] (SPARK-43514) Unexpected NullPointerException or IllegalArgumentException inside UDFs of ML features caused by certain SQL functions

2023-06-07 Thread Ritika Maheshwari (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-43514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17730239#comment-17730239
 ] 

Ritika Maheshwari commented on SPARK-43514:
---

On further investigating the issue. It seems that the plan generated for 3.3.0 
and 3.4.0 is same. Please see the attachment. 
 !Plan.png! 

The error occurs in 3.4.0 when executing 

{code:java}
Filter ((isnotnull(name#9) AND (length(name#9) > 2)) AND 
((size(UDF(UDF(UDF(UDF(name#9, true) > 0) AND 
isnotnull(UDF(UDF(UDF(UDF(name#9)))
{code}

Possible cause could be the difference between the batch defined for 3.3.0 and 
3.4.0

The Batch in 3.3.0 

{code:java}
Batch(LocalRelation 
*early*,FixedPoint(100,false,spark.sql.optimizer.maxIterations),WrappedArray(org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$@5549bf0,
 org.apache.spark.sql.catalyst.optimizer.PropagateEmptyRelation$@4bd35f3f, 
org.apache.spark.sql.catalyst.analysis.UpdateAttributeNullability$@11cd86bd))
{code}

Batch in 3.4.0

{code:java}
Batch(LocalRelation,FixedPoint(100,false,spark.sql.optimizer.maxIterations),WrappedArray(org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$@5549bf0,
 org.apache.spark.sql.catalyst.optimizer.PropagateEmptyRelation$@4bd35f3f, 
org.apache.spark.sql.catalyst.analysis.UpdateAttributeNullability$@11cd86bd))
{code}

Any idea why the Batch definition is different for two versions?


> Unexpected NullPointerException or IllegalArgumentException inside UDFs of ML 
> features caused by certain SQL functions
> --
>
> Key: SPARK-43514
> URL: https://issues.apache.org/jira/browse/SPARK-43514
> Project: Spark
>  Issue Type: Bug
>  Components: ML, SQL
>Affects Versions: 3.3.2, 3.4.0
> Environment: Scala version: 2.12.17
> Test examples were executed inside Zeppelin 0.10.1 with Spark 3.4.0.
> Spark 3.3.2 deployed on cluster was used to check the issue on real data.
>Reporter: Svyatoslav Semenyuk
>Priority: Major
>  Labels: ml, sql
> Attachments: 2023-05-30 13-47-04.mp4, Plan.png, Screen Shot 
> 2023-05-22 at 5.39.55 PM.png, Screen Shot 2023-05-31 at 11.14.24 PM.png, 
> Test.scala
>
>
> We designed a function that joins two DFs on common column with some 
> similarity. All next code will be on Scala 2.12.
> I've added {{show}} calls for demonstration purposes.
> {code:scala}
> import org.apache.spark.ml.Pipeline
> import org.apache.spark.ml.feature.{HashingTF, MinHashLSH, NGram, 
> RegexTokenizer, MinHashLSHModel}
> import org.apache.spark.sql.{DataFrame, Column}
> /**
>  * Joins two dataframes on a string column using LSH algorithm for similarity 
> computation.
>  *
>  * The output dataframe has three columns:
>  *
>  *   - `datasetA`, data from the left dataframe.
>  *   - `datasetB`, data from the right dataframe.
>  *   - `distance`, abstract distance between values of `joinColumn` from 
> datasets;
>  * the lower `distance` value, the more similar `joinColumn` values are.
>  */
> def similarityJoin(
>   leftDf: DataFrame,
>   rightDf: DataFrame,
>   joinColumn: String,
>   threshold: Double = 0.8,
> ): DataFrame = {
>   leftDf.show(false)
>   rightDf.show(false)
>   val pipeline = new Pipeline().setStages(Array(
> new RegexTokenizer()
>   .setPattern("")
>   .setMinTokenLength(1)
>   .setInputCol(joinColumn)
>   .setOutputCol("tokens"),
> new NGram().setN(3).setInputCol("tokens").setOutputCol("ngrams"),
> new HashingTF().setInputCol("ngrams").setOutputCol("vectors"),
> new MinHashLSH().setInputCol("vectors").setOutputCol("lsh"),
>   ))
>   val model = pipeline.fit(leftDf)
>   val storedHashed = model.transform(leftDf)
>   val landedHashed = model.transform(rightDf)
>   def columnMapper(dsName: String)(c: String): Column = col(s"$dsName.$c") as 
> c
>   val result = model
> .stages
> .last
> .asInstanceOf[MinHashLSHModel]
> .approxSimilarityJoin(storedHashed, landedHashed, threshold, "distance")
> .withColumn("datasetA", 
> struct(leftDf.columns.map(columnMapper("datasetA")).toSeq: _*))
> .withColumn("datasetB", 
> struct(rightDf.columns.map(columnMapper("datasetB")).toSeq: _*))
>   result.show(false)
>   result
> }
> {code}
> Now consider such simple example:
> {code:scala}
> val inputDF1 = Seq("", null).toDF("name").filter(length($"name") > 2) as "df1"
> val inputDF2 = Seq("", null).toDF("name").filter(length($"name") > 2) as "df2"
> similarityJoin(inputDF1, inputDF2, "name", 0.6)
> {code}
> This example runs with no errors and outputs 3 empty DFs. Let's add 
> {{distinct}} method to one data frame:
> {code:scala}
> val inputDF1 = Seq("", null).toDF("name").distinct().filter(length($"name") > 
> 2) as "df1"
> val inputDF2 = Seq("", 

[jira] (SPARK-43514) Unexpected NullPointerException or IllegalArgumentException inside UDFs of ML features caused by certain SQL functions

2023-06-07 Thread Ritika Maheshwari (Jira)


[ https://issues.apache.org/jira/browse/SPARK-43514 ]


Ritika Maheshwari deleted comment on SPARK-43514:
---

was (Author: ritikam):
On further investigating the issue. It seems that the plan generated for 3.3.0 
and 3.4.0 is same. Please see the attachment. 
 !Plan.png! 

The error occurs in 3.4.0 when executing 

{code:java}
Filter ((isnotnull(name#9) AND (length(name#9) > 2)) AND 
((size(UDF(UDF(UDF(UDF(name#9, true) > 0) AND 
isnotnull(UDF(UDF(UDF(UDF(name#9)))
{code}

Possible cause could be the difference between the batch defined for 3.3.0 and 
3.4.0

The Batch in 3.3.0 

{code:java}
Batch(LocalRelation 
*early*,FixedPoint(100,false,spark.sql.optimizer.maxIterations),WrappedArray(org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$@5549bf0,
 org.apache.spark.sql.catalyst.optimizer.PropagateEmptyRelation$@4bd35f3f, 
org.apache.spark.sql.catalyst.analysis.UpdateAttributeNullability$@11cd86bd))
{code}

Batch in 3.4.0

{code:java}
Batch(LocalRelation,FixedPoint(100,false,spark.sql.optimizer.maxIterations),WrappedArray(org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$@5549bf0,
 org.apache.spark.sql.catalyst.optimizer.PropagateEmptyRelation$@4bd35f3f, 
org.apache.spark.sql.catalyst.analysis.UpdateAttributeNullability$@11cd86bd))
{code}

Any idea why the Batch definition is different for two versions?


> Unexpected NullPointerException or IllegalArgumentException inside UDFs of ML 
> features caused by certain SQL functions
> --
>
> Key: SPARK-43514
> URL: https://issues.apache.org/jira/browse/SPARK-43514
> Project: Spark
>  Issue Type: Bug
>  Components: ML, SQL
>Affects Versions: 3.3.2, 3.4.0
> Environment: Scala version: 2.12.17
> Test examples were executed inside Zeppelin 0.10.1 with Spark 3.4.0.
> Spark 3.3.2 deployed on cluster was used to check the issue on real data.
>Reporter: Svyatoslav Semenyuk
>Priority: Major
>  Labels: ml, sql
> Attachments: 2023-05-30 13-47-04.mp4, Plan.png, Screen Shot 
> 2023-05-22 at 5.39.55 PM.png, Screen Shot 2023-05-31 at 11.14.24 PM.png, 
> Test.scala
>
>
> We designed a function that joins two DFs on common column with some 
> similarity. All next code will be on Scala 2.12.
> I've added {{show}} calls for demonstration purposes.
> {code:scala}
> import org.apache.spark.ml.Pipeline
> import org.apache.spark.ml.feature.{HashingTF, MinHashLSH, NGram, 
> RegexTokenizer, MinHashLSHModel}
> import org.apache.spark.sql.{DataFrame, Column}
> /**
>  * Joins two dataframes on a string column using LSH algorithm for similarity 
> computation.
>  *
>  * The output dataframe has three columns:
>  *
>  *   - `datasetA`, data from the left dataframe.
>  *   - `datasetB`, data from the right dataframe.
>  *   - `distance`, abstract distance between values of `joinColumn` from 
> datasets;
>  * the lower `distance` value, the more similar `joinColumn` values are.
>  */
> def similarityJoin(
>   leftDf: DataFrame,
>   rightDf: DataFrame,
>   joinColumn: String,
>   threshold: Double = 0.8,
> ): DataFrame = {
>   leftDf.show(false)
>   rightDf.show(false)
>   val pipeline = new Pipeline().setStages(Array(
> new RegexTokenizer()
>   .setPattern("")
>   .setMinTokenLength(1)
>   .setInputCol(joinColumn)
>   .setOutputCol("tokens"),
> new NGram().setN(3).setInputCol("tokens").setOutputCol("ngrams"),
> new HashingTF().setInputCol("ngrams").setOutputCol("vectors"),
> new MinHashLSH().setInputCol("vectors").setOutputCol("lsh"),
>   ))
>   val model = pipeline.fit(leftDf)
>   val storedHashed = model.transform(leftDf)
>   val landedHashed = model.transform(rightDf)
>   def columnMapper(dsName: String)(c: String): Column = col(s"$dsName.$c") as 
> c
>   val result = model
> .stages
> .last
> .asInstanceOf[MinHashLSHModel]
> .approxSimilarityJoin(storedHashed, landedHashed, threshold, "distance")
> .withColumn("datasetA", 
> struct(leftDf.columns.map(columnMapper("datasetA")).toSeq: _*))
> .withColumn("datasetB", 
> struct(rightDf.columns.map(columnMapper("datasetB")).toSeq: _*))
>   result.show(false)
>   result
> }
> {code}
> Now consider such simple example:
> {code:scala}
> val inputDF1 = Seq("", null).toDF("name").filter(length($"name") > 2) as "df1"
> val inputDF2 = Seq("", null).toDF("name").filter(length($"name") > 2) as "df2"
> similarityJoin(inputDF1, inputDF2, "name", 0.6)
> {code}
> This example runs with no errors and outputs 3 empty DFs. Let's add 
> {{distinct}} method to one data frame:
> {code:scala}
> val inputDF1 = Seq("", null).toDF("name").distinct().filter(length($"name") > 
> 2) as "df1"
> val inputDF2 = Seq("", null).toDF("name").filter(length($"name") > 2) as "df2"
> similarityJoin(inputDF1, 

[jira] [Commented] (SPARK-43514) Unexpected NullPointerException or IllegalArgumentException inside UDFs of ML features caused by certain SQL functions

2023-06-07 Thread Ritika Maheshwari (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-43514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17730261#comment-17730261
 ] 

Ritika Maheshwari commented on SPARK-43514:
---

Hello please add this to your sparks-defaults.conf and this should solve the 
problem.


{code:java}
spark.sql.optimizer.excludedRules
org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation
{code}


> Unexpected NullPointerException or IllegalArgumentException inside UDFs of ML 
> features caused by certain SQL functions
> --
>
> Key: SPARK-43514
> URL: https://issues.apache.org/jira/browse/SPARK-43514
> Project: Spark
>  Issue Type: Bug
>  Components: ML, SQL
>Affects Versions: 3.3.2, 3.4.0
> Environment: Scala version: 2.12.17
> Test examples were executed inside Zeppelin 0.10.1 with Spark 3.4.0.
> Spark 3.3.2 deployed on cluster was used to check the issue on real data.
>Reporter: Svyatoslav Semenyuk
>Priority: Major
>  Labels: ml, sql
> Attachments: 2023-05-30 13-47-04.mp4, Plan.png, Screen Shot 
> 2023-05-22 at 5.39.55 PM.png, Screen Shot 2023-05-31 at 11.14.24 PM.png, 
> Test.scala
>
>
> We designed a function that joins two DFs on common column with some 
> similarity. All next code will be on Scala 2.12.
> I've added {{show}} calls for demonstration purposes.
> {code:scala}
> import org.apache.spark.ml.Pipeline
> import org.apache.spark.ml.feature.{HashingTF, MinHashLSH, NGram, 
> RegexTokenizer, MinHashLSHModel}
> import org.apache.spark.sql.{DataFrame, Column}
> /**
>  * Joins two dataframes on a string column using LSH algorithm for similarity 
> computation.
>  *
>  * The output dataframe has three columns:
>  *
>  *   - `datasetA`, data from the left dataframe.
>  *   - `datasetB`, data from the right dataframe.
>  *   - `distance`, abstract distance between values of `joinColumn` from 
> datasets;
>  * the lower `distance` value, the more similar `joinColumn` values are.
>  */
> def similarityJoin(
>   leftDf: DataFrame,
>   rightDf: DataFrame,
>   joinColumn: String,
>   threshold: Double = 0.8,
> ): DataFrame = {
>   leftDf.show(false)
>   rightDf.show(false)
>   val pipeline = new Pipeline().setStages(Array(
> new RegexTokenizer()
>   .setPattern("")
>   .setMinTokenLength(1)
>   .setInputCol(joinColumn)
>   .setOutputCol("tokens"),
> new NGram().setN(3).setInputCol("tokens").setOutputCol("ngrams"),
> new HashingTF().setInputCol("ngrams").setOutputCol("vectors"),
> new MinHashLSH().setInputCol("vectors").setOutputCol("lsh"),
>   ))
>   val model = pipeline.fit(leftDf)
>   val storedHashed = model.transform(leftDf)
>   val landedHashed = model.transform(rightDf)
>   def columnMapper(dsName: String)(c: String): Column = col(s"$dsName.$c") as 
> c
>   val result = model
> .stages
> .last
> .asInstanceOf[MinHashLSHModel]
> .approxSimilarityJoin(storedHashed, landedHashed, threshold, "distance")
> .withColumn("datasetA", 
> struct(leftDf.columns.map(columnMapper("datasetA")).toSeq: _*))
> .withColumn("datasetB", 
> struct(rightDf.columns.map(columnMapper("datasetB")).toSeq: _*))
>   result.show(false)
>   result
> }
> {code}
> Now consider such simple example:
> {code:scala}
> val inputDF1 = Seq("", null).toDF("name").filter(length($"name") > 2) as "df1"
> val inputDF2 = Seq("", null).toDF("name").filter(length($"name") > 2) as "df2"
> similarityJoin(inputDF1, inputDF2, "name", 0.6)
> {code}
> This example runs with no errors and outputs 3 empty DFs. Let's add 
> {{distinct}} method to one data frame:
> {code:scala}
> val inputDF1 = Seq("", null).toDF("name").distinct().filter(length($"name") > 
> 2) as "df1"
> val inputDF2 = Seq("", null).toDF("name").filter(length($"name") > 2) as "df2"
> similarityJoin(inputDF1, inputDF2, "name", 0.6)
> {code}
> This example outputs two empty DFs and then fails at {{result.show(false)}}. 
> Error:
> {code:none}
> org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user 
> defined function (LSHModel$$Lambda$3769/0x000101804840: 
> (struct,values:array>) => 
> array,values:array>>).
>   ... many elided
> Caused by: java.lang.IllegalArgumentException: requirement failed: Must have 
> at least 1 non zero entry.
>   at scala.Predef$.require(Predef.scala:281)
>   at 
> org.apache.spark.ml.feature.MinHashLSHModel.hashFunction(MinHashLSH.scala:61)
>   at org.apache.spark.ml.feature.LSHModel.$anonfun$transform$1(LSH.scala:99)
>   ... many more
> {code}
> 
> Now let's take a look on the example which is close to our application code. 
> Define some helper functions:
> {code:scala}
> import org.apache.spark.sql.functions._
> def process1(df: DataFrame): Unit = {
> val companies 

[jira] [Updated] (SPARK-43514) Unexpected NullPointerException or IllegalArgumentException inside UDFs of ML features caused by certain SQL functions

2023-05-22 Thread Ritika Maheshwari (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-43514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ritika Maheshwari updated SPARK-43514:
--
Attachment: Screen Shot 2023-05-22 at 5.39.55 PM.png

> Unexpected NullPointerException or IllegalArgumentException inside UDFs of ML 
> features caused by certain SQL functions
> --
>
> Key: SPARK-43514
> URL: https://issues.apache.org/jira/browse/SPARK-43514
> Project: Spark
>  Issue Type: Bug
>  Components: ML, SQL
>Affects Versions: 3.3.2, 3.4.0
> Environment: Scala version: 2.12.17
> Test examples were executed inside Zeppelin 0.10.1 with Spark 3.4.0.
> Spark 3.3.2 deployed on cluster was used to check the issue on real data.
>Reporter: Svyatoslav Semenyuk
>Priority: Major
>  Labels: ml, sql
> Attachments: Screen Shot 2023-05-22 at 5.39.55 PM.png
>
>
> We designed a function that joins two DFs on common column with some 
> similarity. All next code will be on Scala 2.12.
> I've added {{show}} calls for demonstration purposes.
> {code:scala}
> import org.apache.spark.ml.Pipeline
> import org.apache.spark.ml.feature.{HashingTF, MinHashLSH, NGram, 
> RegexTokenizer, MinHashLSHModel}
> import org.apache.spark.sql.{DataFrame, Column}
> /**
>  * Joins two data frames on a string column using LSH algorithm
>  * for similarity computation.
>  *
>  * If input data frames have columns with identical names,
>  * the resulting dataframe will have columns from them both
>  * with prefixes `datasetA` and `datasetB` respectively.
>  *
>  * For example, if both dataframes have a column with name `myColumn`,
>  * then the result will have columns `datasetAMyColumn` and 
> `datasetBMyColumn`.
>  */
> def similarityJoin(
> df: DataFrame,
> anotherDf: DataFrame,
> joinExpr: String,
> threshold: Double = 0.8,
> ): DataFrame = {
> df.show(false)
> anotherDf.show(false)
> val pipeline = new Pipeline().setStages(Array(
> new RegexTokenizer()
> .setPattern("")
> .setMinTokenLength(1)
> .setInputCol(joinExpr)
> .setOutputCol("tokens"),
> new NGram().setN(3).setInputCol("tokens").setOutputCol("ngrams"),
> new HashingTF().setInputCol("ngrams").setOutputCol("vectors"),
> new MinHashLSH().setInputCol("vectors").setOutputCol("lsh"),
> )
> )
> val model = pipeline.fit(df)
> val storedHashed = model.transform(df)
> val landedHashed = model.transform(anotherDf)
> val commonColumns = df.columns.toSet & anotherDf.columns.toSet
> /**
>  * Converts column name from a data frame to the column of resulting 
> dataset.
>  */
> def convertColumn(datasetName: String)(columnName: String): Column = {
> val newName =
> if (commonColumns.contains(columnName)) 
> s"$datasetName${columnName.capitalize}"
> else columnName
> col(s"$datasetName.$columnName") as newName
> }
> val columnsToSelect = df.columns.map(convertColumn("datasetA")) ++
>   anotherDf.columns.map(convertColumn("datasetB"))
> val result = model
> .stages
> .last
> .asInstanceOf[MinHashLSHModel]
> .approxSimilarityJoin(storedHashed, landedHashed, threshold, 
> "confidence")
> .select(columnsToSelect.toSeq: _*)
> result.show(false)
> result
> }
> {code}
> Now consider such simple example:
> {code:scala}
> val inputDF1 = Seq("", null).toDF("name").filter(length($"name") > 2) as "df1"
> val inputDF2 = Seq("", null).toDF("name").filter(length($"name") > 2) as "df2"
> similarityJoin(inputDF1, inputDF2, "name", 0.6)
> {code}
> This example runs with no errors and outputs 3 empty DFs. Let's add 
> {{distinct}} method to one data frame:
> {code:scala}
> val inputDF1 = Seq("", null).toDF("name").distinct().filter(length($"name") > 
> 2) as "df1"
> val inputDF2 = Seq("", null).toDF("name").filter(length($"name") > 2) as "df2"
> similarityJoin(inputDF1, inputDF2, "name", 0.6)
> {code}
> This example outputs two empty DFs and then fails at {{result.show(false)}}. 
> Error:
> {code:none}
> org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user 
> defined function (LSHModel$$Lambda$3769/0x000101804840: 
> (struct,values:array>) => 
> array,values:array>>).
>   ... many elided
> Caused by: java.lang.IllegalArgumentException: requirement failed: Must have 
> at least 1 non zero entry.
>   at scala.Predef$.require(Predef.scala:281)
>   at 
> org.apache.spark.ml.feature.MinHashLSHModel.hashFunction(MinHashLSH.scala:61)
>   at org.apache.spark.ml.feature.LSHModel.$anonfun$transform$1(LSH.scala:99)
>   ... many more
> {code}

[jira] [Commented] (SPARK-43514) Unexpected NullPointerException or IllegalArgumentException inside UDFs of ML features caused by certain SQL functions

2023-05-22 Thread Ritika Maheshwari (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-43514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17725144#comment-17725144
 ] 

Ritika Maheshwari commented on SPARK-43514:
---

Unable to reproduce the error on 3.4.0

See the attachment. !Screen Shot 2023-05-22 at 5.39.55 PM.png!

 

> Unexpected NullPointerException or IllegalArgumentException inside UDFs of ML 
> features caused by certain SQL functions
> --
>
> Key: SPARK-43514
> URL: https://issues.apache.org/jira/browse/SPARK-43514
> Project: Spark
>  Issue Type: Bug
>  Components: ML, SQL
>Affects Versions: 3.3.2, 3.4.0
> Environment: Scala version: 2.12.17
> Test examples were executed inside Zeppelin 0.10.1 with Spark 3.4.0.
> Spark 3.3.2 deployed on cluster was used to check the issue on real data.
>Reporter: Svyatoslav Semenyuk
>Priority: Major
>  Labels: ml, sql
> Attachments: Screen Shot 2023-05-22 at 5.39.55 PM.png
>
>
> We designed a function that joins two DFs on common column with some 
> similarity. All next code will be on Scala 2.12.
> I've added {{show}} calls for demonstration purposes.
> {code:scala}
> import org.apache.spark.ml.Pipeline
> import org.apache.spark.ml.feature.{HashingTF, MinHashLSH, NGram, 
> RegexTokenizer, MinHashLSHModel}
> import org.apache.spark.sql.{DataFrame, Column}
> /**
>  * Joins two data frames on a string column using LSH algorithm
>  * for similarity computation.
>  *
>  * If input data frames have columns with identical names,
>  * the resulting dataframe will have columns from them both
>  * with prefixes `datasetA` and `datasetB` respectively.
>  *
>  * For example, if both dataframes have a column with name `myColumn`,
>  * then the result will have columns `datasetAMyColumn` and 
> `datasetBMyColumn`.
>  */
> def similarityJoin(
> df: DataFrame,
> anotherDf: DataFrame,
> joinExpr: String,
> threshold: Double = 0.8,
> ): DataFrame = {
> df.show(false)
> anotherDf.show(false)
> val pipeline = new Pipeline().setStages(Array(
> new RegexTokenizer()
> .setPattern("")
> .setMinTokenLength(1)
> .setInputCol(joinExpr)
> .setOutputCol("tokens"),
> new NGram().setN(3).setInputCol("tokens").setOutputCol("ngrams"),
> new HashingTF().setInputCol("ngrams").setOutputCol("vectors"),
> new MinHashLSH().setInputCol("vectors").setOutputCol("lsh"),
> )
> )
> val model = pipeline.fit(df)
> val storedHashed = model.transform(df)
> val landedHashed = model.transform(anotherDf)
> val commonColumns = df.columns.toSet & anotherDf.columns.toSet
> /**
>  * Converts column name from a data frame to the column of resulting 
> dataset.
>  */
> def convertColumn(datasetName: String)(columnName: String): Column = {
> val newName =
> if (commonColumns.contains(columnName)) 
> s"$datasetName${columnName.capitalize}"
> else columnName
> col(s"$datasetName.$columnName") as newName
> }
> val columnsToSelect = df.columns.map(convertColumn("datasetA")) ++
>   anotherDf.columns.map(convertColumn("datasetB"))
> val result = model
> .stages
> .last
> .asInstanceOf[MinHashLSHModel]
> .approxSimilarityJoin(storedHashed, landedHashed, threshold, 
> "confidence")
> .select(columnsToSelect.toSeq: _*)
> result.show(false)
> result
> }
> {code}
> Now consider such simple example:
> {code:scala}
> val inputDF1 = Seq("", null).toDF("name").filter(length($"name") > 2) as "df1"
> val inputDF2 = Seq("", null).toDF("name").filter(length($"name") > 2) as "df2"
> similarityJoin(inputDF1, inputDF2, "name", 0.6)
> {code}
> This example runs with no errors and outputs 3 empty DFs. Let's add 
> {{distinct}} method to one data frame:
> {code:scala}
> val inputDF1 = Seq("", null).toDF("name").distinct().filter(length($"name") > 
> 2) as "df1"
> val inputDF2 = Seq("", null).toDF("name").filter(length($"name") > 2) as "df2"
> similarityJoin(inputDF1, inputDF2, "name", 0.6)
> {code}
> This example outputs two empty DFs and then fails at {{result.show(false)}}. 
> Error:
> {code:none}
> org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user 
> defined function (LSHModel$$Lambda$3769/0x000101804840: 
> (struct,values:array>) => 
> array,values:array>>).
>   ... many elided
> Caused by: java.lang.IllegalArgumentException: requirement failed: Must have 
> at least 1 non zero entry.
>   at scala.Predef$.require(Predef.scala:281)
>   at 
> org.apache.spark.ml.feature.MinHashLSHModel.hashFunction(MinHashLSH.scala:61)
>   at 

[jira] [Commented] (SPARK-27908) Improve parser error message for SELECT TOP statement

2023-05-21 Thread Ritika Maheshwari (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-27908?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17724726#comment-17724726
 ] 

Ritika Maheshwari commented on SPARK-27908:
---

In Spark 3.3.0 the error message is improved scala> spark.sql("select TOP 1 
from manager")
{code:java}
scala> spark.sql("select TOP 1 from manager")
org.apache.spark.sql.catalyst.parser.ParseException:
Syntax error at or near '1'(line 1, pos 11)


== SQL ==
select TOP 1 from manager
---^^^ {code}

> Improve parser error message for SELECT TOP statement
> -
>
> Key: SPARK-27908
> URL: https://issues.apache.org/jira/browse/SPARK-27908
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.1.0
>Reporter: Yesheng Ma
>Priority: Major
>
> The {{SELECT TOP}} statement is actually not supported in Spark SQL. However, 
> when a user queries such a statement, the error message is confusing. For 
> example, the error message for
> {code:sql}
> SELECT TOP 1 FROM test;
> {code}
> is
> {code:java}
> Error in query:
> mismatched input '1' expecting {, '(', ',', '.', '[', 'ADD', 'AFTER', 
> 'ALL', 'ALTER', 'ANALYZE', 'AND', 'ANTI', 'ANY', 'ARCHIVE', 'ARRAY', 'AS', 
> 'ASC', 'AT', 'AUTHORIZATION', 'BETWEEN', 'BOTH', 'BUCKET', 'BUCKETS', 'BY', 
> 'CACHE', 'CASCADE', 'CASE', 'CAST', 'CHANGE', 'CHECK', 'CLEAR', 'CLUSTER', 
> 'CLUSTERED', 'CODEGEN', 'COLLATE', 'COLLECTION', 'COLUMN', 'COLUMNS', 
> 'COMMENT', 'COMMIT', 'COMPACT', 'COMPACTIONS', 'COMPUTE', 'CONCATENATE', 
> 'CONSTRAINT', 'COST', 'CREATE', 'CROSS', 'CUBE', 'CURRENT', 'CURRENT_DATE', 
> 'CURRENT_TIME', 'CURRENT_TIMESTAMP', 'CURRENT_USER', 'DATA', 'DATABASE', 
> DATABASES, 'DAY', 'DAYS', 'DBPROPERTIES', 'DEFINED', 'DELETE', 'DELIMITED', 
> 'DESC', 'DESCRIBE', 'DFS', 'DIRECTORIES', 'DIRECTORY', 'DISTINCT', 
> 'DISTRIBUTE', 'DROP', 'ELSE', 'END', 'ESCAPED', 'EXCEPT', 'EXCHANGE', 
> 'EXISTS', 'EXPLAIN', 'EXPORT', 'EXTENDED', 'EXTERNAL', 'EXTRACT', 'FALSE', 
> 'FETCH', 'FIELDS', 'FILEFORMAT', 'FIRST', 'FOLLOWING', 'FOR', 'FOREIGN', 
> 'FORMAT', 'FORMATTED', 'FROM', 'FULL', 'FUNCTION', 'FUNCTIONS', 'GLOBAL', 
> 'GRANT', 'GROUP', 'GROUPING', 'HAVING', 'HOUR', 'HOURS', 'IF', 'IGNORE', 
> 'IMPORT', 'IN', 'INDEX', 'INDEXES', 'INNER', 'INPATH', 'INPUTFORMAT', 
> 'INSERT', 'INTERSECT', 'INTERVAL', 'INTO', 'IS', 'ITEMS', 'JOIN', 'KEYS', 
> 'LAST', 'LATERAL', 'LAZY', 'LEADING', 'LEFT', 'LIKE', 'LIMIT', 'LINES', 
> 'LIST', 'LOAD', 'LOCAL', 'LOCATION', 'LOCK', 'LOCKS', 'LOGICAL', 'MACRO', 
> 'MAP', 'MICROSECOND', 'MICROSECONDS', 'MILLISECOND', 'MILLISECONDS', 
> 'MINUTE', 'MINUTES', 'MONTH', 'MONTHS', 'MSCK', 'NATURAL', 'NO', NOT, 'NULL', 
> 'NULLS', 'OF', 'ON', 'ONLY', 'OPTION', 'OPTIONS', 'OR', 'ORDER', 'OUT', 
> 'OUTER', 'OUTPUTFORMAT', 'OVER', 'OVERLAPS', 'OVERWRITE', 'PARTITION', 
> 'PARTITIONED', 'PARTITIONS', 'PERCENT', 'PIVOT', 'POSITION', 'PRECEDING', 
> 'PRIMARY', 'PRINCIPALS', 'PURGE', 'QUERY', 'RANGE', 'RECORDREADER', 
> 'RECORDWRITER', 'RECOVER', 'REDUCE', 'REFERENCES', 'REFRESH', 'RENAME', 
> 'REPAIR', 'REPLACE', 'RESET', 'RESTRICT', 'REVOKE', 'RIGHT', RLIKE, 'ROLE', 
> 'ROLES', 'ROLLBACK', 'ROLLUP', 'ROW', 'ROWS', 'SCHEMA', 'SECOND', 'SECONDS', 
> 'SELECT', 'SEMI', 'SEPARATED', 'SERDE', 'SERDEPROPERTIES', 'SESSION_USER', 
> 'SET', 'MINUS', 'SETS', 'SHOW', 'SKEWED', 'SOME', 'SORT', 'SORTED', 'START', 
> 'STATISTICS', 'STORED', 'STRATIFY', 'STRUCT', 'TABLE', 'TABLES', 
> 'TABLESAMPLE', 'TBLPROPERTIES', TEMPORARY, 'TERMINATED', 'THEN', 'TO', 
> 'TOUCH', 'TRAILING', 'TRANSACTION', 'TRANSACTIONS', 'TRANSFORM', 'TRUE', 
> 'TRUNCATE', 'UNARCHIVE', 'UNBOUNDED', 'UNCACHE', 'UNION', 'UNIQUE', 'UNLOCK', 
> 'UNSET', 'USE', 'USER', 'USING', 'VALUES', 'VIEW', 'WEEK', 'WEEKS', 'WHEN', 
> 'WHERE', 'WINDOW', 'WITH', 'YEAR', 'YEARS', EQ, '<=>', '<>', '!=', '<', LTE, 
> '>', GTE, '+', '-', '*', '/', '%', 'DIV', '&', '|', '||', '^', IDENTIFIER, 
> BACKQUOTED_IDENTIFIER}(line 1, pos 11)
> == SQL ==
> SELECT TOP 1 FROM test
> ---^^^
> {code}
> which is verbose and misleading.
>  
> One possible way to fix is to explicitly capture these statements in a 
> grammar rule and print user-friendly error message such as
> {code:java}
> SELECT TOP statements are not supported.
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-43514) Unexpected NullPointerException or IllegalArgumentException inside UDFs of ML features caused by certain SQL functions

2023-06-01 Thread Ritika Maheshwari (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-43514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ritika Maheshwari updated SPARK-43514:
--
Attachment: Screen Shot 2023-05-31 at 11.04.06 PM.png

> Unexpected NullPointerException or IllegalArgumentException inside UDFs of ML 
> features caused by certain SQL functions
> --
>
> Key: SPARK-43514
> URL: https://issues.apache.org/jira/browse/SPARK-43514
> Project: Spark
>  Issue Type: Bug
>  Components: ML, SQL
>Affects Versions: 3.3.2, 3.4.0
> Environment: Scala version: 2.12.17
> Test examples were executed inside Zeppelin 0.10.1 with Spark 3.4.0.
> Spark 3.3.2 deployed on cluster was used to check the issue on real data.
>Reporter: Svyatoslav Semenyuk
>Priority: Major
>  Labels: ml, sql
> Attachments: 2023-05-30 13-47-04.mp4, Screen Shot 2023-05-22 at 
> 5.39.55 PM.png, Screen Shot 2023-05-31 at 11.04.06 PM.png, Test.scala
>
>
> We designed a function that joins two DFs on common column with some 
> similarity. All next code will be on Scala 2.12.
> I've added {{show}} calls for demonstration purposes.
> {code:scala}
> import org.apache.spark.ml.Pipeline
> import org.apache.spark.ml.feature.{HashingTF, MinHashLSH, NGram, 
> RegexTokenizer, MinHashLSHModel}
> import org.apache.spark.sql.{DataFrame, Column}
> /**
>  * Joins two dataframes on a string column using LSH algorithm for similarity 
> computation.
>  *
>  * The output dataframe has three columns:
>  *
>  *   - `datasetA`, data from the left dataframe.
>  *   - `datasetB`, data from the right dataframe.
>  *   - `distance`, abstract distance between values of `joinColumn` from 
> datasets;
>  * the lower `distance` value, the more similar `joinColumn` values are.
>  */
> def similarityJoin(
>   leftDf: DataFrame,
>   rightDf: DataFrame,
>   joinColumn: String,
>   threshold: Double = 0.8,
> ): DataFrame = {
>   leftDf.show(false)
>   rightDf.show(false)
>   val pipeline = new Pipeline().setStages(Array(
> new RegexTokenizer()
>   .setPattern("")
>   .setMinTokenLength(1)
>   .setInputCol(joinColumn)
>   .setOutputCol("tokens"),
> new NGram().setN(3).setInputCol("tokens").setOutputCol("ngrams"),
> new HashingTF().setInputCol("ngrams").setOutputCol("vectors"),
> new MinHashLSH().setInputCol("vectors").setOutputCol("lsh"),
>   ))
>   val model = pipeline.fit(leftDf)
>   val storedHashed = model.transform(leftDf)
>   val landedHashed = model.transform(rightDf)
>   def columnMapper(dsName: String)(c: String): Column = col(s"$dsName.$c") as 
> c
>   val result = model
> .stages
> .last
> .asInstanceOf[MinHashLSHModel]
> .approxSimilarityJoin(storedHashed, landedHashed, threshold, "distance")
> .withColumn("datasetA", 
> struct(leftDf.columns.map(columnMapper("datasetA")).toSeq: _*))
> .withColumn("datasetB", 
> struct(rightDf.columns.map(columnMapper("datasetB")).toSeq: _*))
>   result.show(false)
>   result
> }
> {code}
> Now consider such simple example:
> {code:scala}
> val inputDF1 = Seq("", null).toDF("name").filter(length($"name") > 2) as "df1"
> val inputDF2 = Seq("", null).toDF("name").filter(length($"name") > 2) as "df2"
> similarityJoin(inputDF1, inputDF2, "name", 0.6)
> {code}
> This example runs with no errors and outputs 3 empty DFs. Let's add 
> {{distinct}} method to one data frame:
> {code:scala}
> val inputDF1 = Seq("", null).toDF("name").distinct().filter(length($"name") > 
> 2) as "df1"
> val inputDF2 = Seq("", null).toDF("name").filter(length($"name") > 2) as "df2"
> similarityJoin(inputDF1, inputDF2, "name", 0.6)
> {code}
> This example outputs two empty DFs and then fails at {{result.show(false)}}. 
> Error:
> {code:none}
> org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user 
> defined function (LSHModel$$Lambda$3769/0x000101804840: 
> (struct,values:array>) => 
> array,values:array>>).
>   ... many elided
> Caused by: java.lang.IllegalArgumentException: requirement failed: Must have 
> at least 1 non zero entry.
>   at scala.Predef$.require(Predef.scala:281)
>   at 
> org.apache.spark.ml.feature.MinHashLSHModel.hashFunction(MinHashLSH.scala:61)
>   at org.apache.spark.ml.feature.LSHModel.$anonfun$transform$1(LSH.scala:99)
>   ... many more
> {code}
> 
> Now let's take a look on the example which is close to our application code. 
> Define some helper functions:
> {code:scala}
> import org.apache.spark.sql.functions._
> def process1(df: DataFrame): Unit = {
> val companies = df.select($"id", $"name")
> val directors = df
> .select(explode($"directors"))
> .select($"col.name", $"col.id")
> .dropDuplicates("id")
> val toBeMatched1 = companies
>   

[jira] [Updated] (SPARK-43514) Unexpected NullPointerException or IllegalArgumentException inside UDFs of ML features caused by certain SQL functions

2023-06-01 Thread Ritika Maheshwari (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-43514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ritika Maheshwari updated SPARK-43514:
--
Attachment: (was: Screen Shot 2023-05-31 at 11.04.06 PM-1.png)

> Unexpected NullPointerException or IllegalArgumentException inside UDFs of ML 
> features caused by certain SQL functions
> --
>
> Key: SPARK-43514
> URL: https://issues.apache.org/jira/browse/SPARK-43514
> Project: Spark
>  Issue Type: Bug
>  Components: ML, SQL
>Affects Versions: 3.3.2, 3.4.0
> Environment: Scala version: 2.12.17
> Test examples were executed inside Zeppelin 0.10.1 with Spark 3.4.0.
> Spark 3.3.2 deployed on cluster was used to check the issue on real data.
>Reporter: Svyatoslav Semenyuk
>Priority: Major
>  Labels: ml, sql
> Attachments: 2023-05-30 13-47-04.mp4, Screen Shot 2023-05-22 at 
> 5.39.55 PM.png, Screen Shot 2023-05-31 at 11.04.06 PM.png, Test.scala
>
>
> We designed a function that joins two DFs on common column with some 
> similarity. All next code will be on Scala 2.12.
> I've added {{show}} calls for demonstration purposes.
> {code:scala}
> import org.apache.spark.ml.Pipeline
> import org.apache.spark.ml.feature.{HashingTF, MinHashLSH, NGram, 
> RegexTokenizer, MinHashLSHModel}
> import org.apache.spark.sql.{DataFrame, Column}
> /**
>  * Joins two dataframes on a string column using LSH algorithm for similarity 
> computation.
>  *
>  * The output dataframe has three columns:
>  *
>  *   - `datasetA`, data from the left dataframe.
>  *   - `datasetB`, data from the right dataframe.
>  *   - `distance`, abstract distance between values of `joinColumn` from 
> datasets;
>  * the lower `distance` value, the more similar `joinColumn` values are.
>  */
> def similarityJoin(
>   leftDf: DataFrame,
>   rightDf: DataFrame,
>   joinColumn: String,
>   threshold: Double = 0.8,
> ): DataFrame = {
>   leftDf.show(false)
>   rightDf.show(false)
>   val pipeline = new Pipeline().setStages(Array(
> new RegexTokenizer()
>   .setPattern("")
>   .setMinTokenLength(1)
>   .setInputCol(joinColumn)
>   .setOutputCol("tokens"),
> new NGram().setN(3).setInputCol("tokens").setOutputCol("ngrams"),
> new HashingTF().setInputCol("ngrams").setOutputCol("vectors"),
> new MinHashLSH().setInputCol("vectors").setOutputCol("lsh"),
>   ))
>   val model = pipeline.fit(leftDf)
>   val storedHashed = model.transform(leftDf)
>   val landedHashed = model.transform(rightDf)
>   def columnMapper(dsName: String)(c: String): Column = col(s"$dsName.$c") as 
> c
>   val result = model
> .stages
> .last
> .asInstanceOf[MinHashLSHModel]
> .approxSimilarityJoin(storedHashed, landedHashed, threshold, "distance")
> .withColumn("datasetA", 
> struct(leftDf.columns.map(columnMapper("datasetA")).toSeq: _*))
> .withColumn("datasetB", 
> struct(rightDf.columns.map(columnMapper("datasetB")).toSeq: _*))
>   result.show(false)
>   result
> }
> {code}
> Now consider such simple example:
> {code:scala}
> val inputDF1 = Seq("", null).toDF("name").filter(length($"name") > 2) as "df1"
> val inputDF2 = Seq("", null).toDF("name").filter(length($"name") > 2) as "df2"
> similarityJoin(inputDF1, inputDF2, "name", 0.6)
> {code}
> This example runs with no errors and outputs 3 empty DFs. Let's add 
> {{distinct}} method to one data frame:
> {code:scala}
> val inputDF1 = Seq("", null).toDF("name").distinct().filter(length($"name") > 
> 2) as "df1"
> val inputDF2 = Seq("", null).toDF("name").filter(length($"name") > 2) as "df2"
> similarityJoin(inputDF1, inputDF2, "name", 0.6)
> {code}
> This example outputs two empty DFs and then fails at {{result.show(false)}}. 
> Error:
> {code:none}
> org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user 
> defined function (LSHModel$$Lambda$3769/0x000101804840: 
> (struct,values:array>) => 
> array,values:array>>).
>   ... many elided
> Caused by: java.lang.IllegalArgumentException: requirement failed: Must have 
> at least 1 non zero entry.
>   at scala.Predef$.require(Predef.scala:281)
>   at 
> org.apache.spark.ml.feature.MinHashLSHModel.hashFunction(MinHashLSH.scala:61)
>   at org.apache.spark.ml.feature.LSHModel.$anonfun$transform$1(LSH.scala:99)
>   ... many more
> {code}
> 
> Now let's take a look on the example which is close to our application code. 
> Define some helper functions:
> {code:scala}
> import org.apache.spark.sql.functions._
> def process1(df: DataFrame): Unit = {
> val companies = df.select($"id", $"name")
> val directors = df
> .select(explode($"directors"))
> .select($"col.name", $"col.id")
> .dropDuplicates("id")
> val toBeMatched1 = 

[jira] [Updated] (SPARK-43514) Unexpected NullPointerException or IllegalArgumentException inside UDFs of ML features caused by certain SQL functions

2023-06-01 Thread Ritika Maheshwari (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-43514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ritika Maheshwari updated SPARK-43514:
--
Attachment: Screen Shot 2023-05-31 at 11.14.24 PM.png

> Unexpected NullPointerException or IllegalArgumentException inside UDFs of ML 
> features caused by certain SQL functions
> --
>
> Key: SPARK-43514
> URL: https://issues.apache.org/jira/browse/SPARK-43514
> Project: Spark
>  Issue Type: Bug
>  Components: ML, SQL
>Affects Versions: 3.3.2, 3.4.0
> Environment: Scala version: 2.12.17
> Test examples were executed inside Zeppelin 0.10.1 with Spark 3.4.0.
> Spark 3.3.2 deployed on cluster was used to check the issue on real data.
>Reporter: Svyatoslav Semenyuk
>Priority: Major
>  Labels: ml, sql
> Attachments: 2023-05-30 13-47-04.mp4, Screen Shot 2023-05-22 at 
> 5.39.55 PM.png, Screen Shot 2023-05-31 at 11.14.24 PM.png, Test.scala
>
>
> We designed a function that joins two DFs on common column with some 
> similarity. All next code will be on Scala 2.12.
> I've added {{show}} calls for demonstration purposes.
> {code:scala}
> import org.apache.spark.ml.Pipeline
> import org.apache.spark.ml.feature.{HashingTF, MinHashLSH, NGram, 
> RegexTokenizer, MinHashLSHModel}
> import org.apache.spark.sql.{DataFrame, Column}
> /**
>  * Joins two dataframes on a string column using LSH algorithm for similarity 
> computation.
>  *
>  * The output dataframe has three columns:
>  *
>  *   - `datasetA`, data from the left dataframe.
>  *   - `datasetB`, data from the right dataframe.
>  *   - `distance`, abstract distance between values of `joinColumn` from 
> datasets;
>  * the lower `distance` value, the more similar `joinColumn` values are.
>  */
> def similarityJoin(
>   leftDf: DataFrame,
>   rightDf: DataFrame,
>   joinColumn: String,
>   threshold: Double = 0.8,
> ): DataFrame = {
>   leftDf.show(false)
>   rightDf.show(false)
>   val pipeline = new Pipeline().setStages(Array(
> new RegexTokenizer()
>   .setPattern("")
>   .setMinTokenLength(1)
>   .setInputCol(joinColumn)
>   .setOutputCol("tokens"),
> new NGram().setN(3).setInputCol("tokens").setOutputCol("ngrams"),
> new HashingTF().setInputCol("ngrams").setOutputCol("vectors"),
> new MinHashLSH().setInputCol("vectors").setOutputCol("lsh"),
>   ))
>   val model = pipeline.fit(leftDf)
>   val storedHashed = model.transform(leftDf)
>   val landedHashed = model.transform(rightDf)
>   def columnMapper(dsName: String)(c: String): Column = col(s"$dsName.$c") as 
> c
>   val result = model
> .stages
> .last
> .asInstanceOf[MinHashLSHModel]
> .approxSimilarityJoin(storedHashed, landedHashed, threshold, "distance")
> .withColumn("datasetA", 
> struct(leftDf.columns.map(columnMapper("datasetA")).toSeq: _*))
> .withColumn("datasetB", 
> struct(rightDf.columns.map(columnMapper("datasetB")).toSeq: _*))
>   result.show(false)
>   result
> }
> {code}
> Now consider such simple example:
> {code:scala}
> val inputDF1 = Seq("", null).toDF("name").filter(length($"name") > 2) as "df1"
> val inputDF2 = Seq("", null).toDF("name").filter(length($"name") > 2) as "df2"
> similarityJoin(inputDF1, inputDF2, "name", 0.6)
> {code}
> This example runs with no errors and outputs 3 empty DFs. Let's add 
> {{distinct}} method to one data frame:
> {code:scala}
> val inputDF1 = Seq("", null).toDF("name").distinct().filter(length($"name") > 
> 2) as "df1"
> val inputDF2 = Seq("", null).toDF("name").filter(length($"name") > 2) as "df2"
> similarityJoin(inputDF1, inputDF2, "name", 0.6)
> {code}
> This example outputs two empty DFs and then fails at {{result.show(false)}}. 
> Error:
> {code:none}
> org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user 
> defined function (LSHModel$$Lambda$3769/0x000101804840: 
> (struct,values:array>) => 
> array,values:array>>).
>   ... many elided
> Caused by: java.lang.IllegalArgumentException: requirement failed: Must have 
> at least 1 non zero entry.
>   at scala.Predef$.require(Predef.scala:281)
>   at 
> org.apache.spark.ml.feature.MinHashLSHModel.hashFunction(MinHashLSH.scala:61)
>   at org.apache.spark.ml.feature.LSHModel.$anonfun$transform$1(LSH.scala:99)
>   ... many more
> {code}
> 
> Now let's take a look on the example which is close to our application code. 
> Define some helper functions:
> {code:scala}
> import org.apache.spark.sql.functions._
> def process1(df: DataFrame): Unit = {
> val companies = df.select($"id", $"name")
> val directors = df
> .select(explode($"directors"))
> .select($"col.name", $"col.id")
> .dropDuplicates("id")
> val toBeMatched1 = companies
>   

[jira] [Commented] (SPARK-43514) Unexpected NullPointerException or IllegalArgumentException inside UDFs of ML features caused by certain SQL functions

2023-06-01 Thread Ritika Maheshwari (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-43514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17728230#comment-17728230
 ] 

Ritika Maheshwari commented on SPARK-43514:
---

My SPARK_HOME was still pointing to 3.3.0, once fixed I do see the reported 
error

 !Screen Shot 2023-05-31 at 11.14.24 PM.png! 

> Unexpected NullPointerException or IllegalArgumentException inside UDFs of ML 
> features caused by certain SQL functions
> --
>
> Key: SPARK-43514
> URL: https://issues.apache.org/jira/browse/SPARK-43514
> Project: Spark
>  Issue Type: Bug
>  Components: ML, SQL
>Affects Versions: 3.3.2, 3.4.0
> Environment: Scala version: 2.12.17
> Test examples were executed inside Zeppelin 0.10.1 with Spark 3.4.0.
> Spark 3.3.2 deployed on cluster was used to check the issue on real data.
>Reporter: Svyatoslav Semenyuk
>Priority: Major
>  Labels: ml, sql
> Attachments: 2023-05-30 13-47-04.mp4, Screen Shot 2023-05-22 at 
> 5.39.55 PM.png, Screen Shot 2023-05-31 at 11.14.24 PM.png, Test.scala
>
>
> We designed a function that joins two DFs on common column with some 
> similarity. All next code will be on Scala 2.12.
> I've added {{show}} calls for demonstration purposes.
> {code:scala}
> import org.apache.spark.ml.Pipeline
> import org.apache.spark.ml.feature.{HashingTF, MinHashLSH, NGram, 
> RegexTokenizer, MinHashLSHModel}
> import org.apache.spark.sql.{DataFrame, Column}
> /**
>  * Joins two dataframes on a string column using LSH algorithm for similarity 
> computation.
>  *
>  * The output dataframe has three columns:
>  *
>  *   - `datasetA`, data from the left dataframe.
>  *   - `datasetB`, data from the right dataframe.
>  *   - `distance`, abstract distance between values of `joinColumn` from 
> datasets;
>  * the lower `distance` value, the more similar `joinColumn` values are.
>  */
> def similarityJoin(
>   leftDf: DataFrame,
>   rightDf: DataFrame,
>   joinColumn: String,
>   threshold: Double = 0.8,
> ): DataFrame = {
>   leftDf.show(false)
>   rightDf.show(false)
>   val pipeline = new Pipeline().setStages(Array(
> new RegexTokenizer()
>   .setPattern("")
>   .setMinTokenLength(1)
>   .setInputCol(joinColumn)
>   .setOutputCol("tokens"),
> new NGram().setN(3).setInputCol("tokens").setOutputCol("ngrams"),
> new HashingTF().setInputCol("ngrams").setOutputCol("vectors"),
> new MinHashLSH().setInputCol("vectors").setOutputCol("lsh"),
>   ))
>   val model = pipeline.fit(leftDf)
>   val storedHashed = model.transform(leftDf)
>   val landedHashed = model.transform(rightDf)
>   def columnMapper(dsName: String)(c: String): Column = col(s"$dsName.$c") as 
> c
>   val result = model
> .stages
> .last
> .asInstanceOf[MinHashLSHModel]
> .approxSimilarityJoin(storedHashed, landedHashed, threshold, "distance")
> .withColumn("datasetA", 
> struct(leftDf.columns.map(columnMapper("datasetA")).toSeq: _*))
> .withColumn("datasetB", 
> struct(rightDf.columns.map(columnMapper("datasetB")).toSeq: _*))
>   result.show(false)
>   result
> }
> {code}
> Now consider such simple example:
> {code:scala}
> val inputDF1 = Seq("", null).toDF("name").filter(length($"name") > 2) as "df1"
> val inputDF2 = Seq("", null).toDF("name").filter(length($"name") > 2) as "df2"
> similarityJoin(inputDF1, inputDF2, "name", 0.6)
> {code}
> This example runs with no errors and outputs 3 empty DFs. Let's add 
> {{distinct}} method to one data frame:
> {code:scala}
> val inputDF1 = Seq("", null).toDF("name").distinct().filter(length($"name") > 
> 2) as "df1"
> val inputDF2 = Seq("", null).toDF("name").filter(length($"name") > 2) as "df2"
> similarityJoin(inputDF1, inputDF2, "name", 0.6)
> {code}
> This example outputs two empty DFs and then fails at {{result.show(false)}}. 
> Error:
> {code:none}
> org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user 
> defined function (LSHModel$$Lambda$3769/0x000101804840: 
> (struct,values:array>) => 
> array,values:array>>).
>   ... many elided
> Caused by: java.lang.IllegalArgumentException: requirement failed: Must have 
> at least 1 non zero entry.
>   at scala.Predef$.require(Predef.scala:281)
>   at 
> org.apache.spark.ml.feature.MinHashLSHModel.hashFunction(MinHashLSH.scala:61)
>   at org.apache.spark.ml.feature.LSHModel.$anonfun$transform$1(LSH.scala:99)
>   ... many more
> {code}
> 
> Now let's take a look on the example which is close to our application code. 
> Define some helper functions:
> {code:scala}
> import org.apache.spark.sql.functions._
> def process1(df: DataFrame): Unit = {
> val companies = df.select($"id", $"name")
> val directors = df
> .select(explode($"directors"))
>   

[jira] [Updated] (SPARK-43514) Unexpected NullPointerException or IllegalArgumentException inside UDFs of ML features caused by certain SQL functions

2023-06-01 Thread Ritika Maheshwari (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-43514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ritika Maheshwari updated SPARK-43514:
--
Attachment: (was: Screen Shot 2023-05-31 at 11.04.06 PM.png)

> Unexpected NullPointerException or IllegalArgumentException inside UDFs of ML 
> features caused by certain SQL functions
> --
>
> Key: SPARK-43514
> URL: https://issues.apache.org/jira/browse/SPARK-43514
> Project: Spark
>  Issue Type: Bug
>  Components: ML, SQL
>Affects Versions: 3.3.2, 3.4.0
> Environment: Scala version: 2.12.17
> Test examples were executed inside Zeppelin 0.10.1 with Spark 3.4.0.
> Spark 3.3.2 deployed on cluster was used to check the issue on real data.
>Reporter: Svyatoslav Semenyuk
>Priority: Major
>  Labels: ml, sql
> Attachments: 2023-05-30 13-47-04.mp4, Screen Shot 2023-05-22 at 
> 5.39.55 PM.png, Test.scala
>
>
> We designed a function that joins two DFs on common column with some 
> similarity. All next code will be on Scala 2.12.
> I've added {{show}} calls for demonstration purposes.
> {code:scala}
> import org.apache.spark.ml.Pipeline
> import org.apache.spark.ml.feature.{HashingTF, MinHashLSH, NGram, 
> RegexTokenizer, MinHashLSHModel}
> import org.apache.spark.sql.{DataFrame, Column}
> /**
>  * Joins two dataframes on a string column using LSH algorithm for similarity 
> computation.
>  *
>  * The output dataframe has three columns:
>  *
>  *   - `datasetA`, data from the left dataframe.
>  *   - `datasetB`, data from the right dataframe.
>  *   - `distance`, abstract distance between values of `joinColumn` from 
> datasets;
>  * the lower `distance` value, the more similar `joinColumn` values are.
>  */
> def similarityJoin(
>   leftDf: DataFrame,
>   rightDf: DataFrame,
>   joinColumn: String,
>   threshold: Double = 0.8,
> ): DataFrame = {
>   leftDf.show(false)
>   rightDf.show(false)
>   val pipeline = new Pipeline().setStages(Array(
> new RegexTokenizer()
>   .setPattern("")
>   .setMinTokenLength(1)
>   .setInputCol(joinColumn)
>   .setOutputCol("tokens"),
> new NGram().setN(3).setInputCol("tokens").setOutputCol("ngrams"),
> new HashingTF().setInputCol("ngrams").setOutputCol("vectors"),
> new MinHashLSH().setInputCol("vectors").setOutputCol("lsh"),
>   ))
>   val model = pipeline.fit(leftDf)
>   val storedHashed = model.transform(leftDf)
>   val landedHashed = model.transform(rightDf)
>   def columnMapper(dsName: String)(c: String): Column = col(s"$dsName.$c") as 
> c
>   val result = model
> .stages
> .last
> .asInstanceOf[MinHashLSHModel]
> .approxSimilarityJoin(storedHashed, landedHashed, threshold, "distance")
> .withColumn("datasetA", 
> struct(leftDf.columns.map(columnMapper("datasetA")).toSeq: _*))
> .withColumn("datasetB", 
> struct(rightDf.columns.map(columnMapper("datasetB")).toSeq: _*))
>   result.show(false)
>   result
> }
> {code}
> Now consider such simple example:
> {code:scala}
> val inputDF1 = Seq("", null).toDF("name").filter(length($"name") > 2) as "df1"
> val inputDF2 = Seq("", null).toDF("name").filter(length($"name") > 2) as "df2"
> similarityJoin(inputDF1, inputDF2, "name", 0.6)
> {code}
> This example runs with no errors and outputs 3 empty DFs. Let's add 
> {{distinct}} method to one data frame:
> {code:scala}
> val inputDF1 = Seq("", null).toDF("name").distinct().filter(length($"name") > 
> 2) as "df1"
> val inputDF2 = Seq("", null).toDF("name").filter(length($"name") > 2) as "df2"
> similarityJoin(inputDF1, inputDF2, "name", 0.6)
> {code}
> This example outputs two empty DFs and then fails at {{result.show(false)}}. 
> Error:
> {code:none}
> org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user 
> defined function (LSHModel$$Lambda$3769/0x000101804840: 
> (struct,values:array>) => 
> array,values:array>>).
>   ... many elided
> Caused by: java.lang.IllegalArgumentException: requirement failed: Must have 
> at least 1 non zero entry.
>   at scala.Predef$.require(Predef.scala:281)
>   at 
> org.apache.spark.ml.feature.MinHashLSHModel.hashFunction(MinHashLSH.scala:61)
>   at org.apache.spark.ml.feature.LSHModel.$anonfun$transform$1(LSH.scala:99)
>   ... many more
> {code}
> 
> Now let's take a look on the example which is close to our application code. 
> Define some helper functions:
> {code:scala}
> import org.apache.spark.sql.functions._
> def process1(df: DataFrame): Unit = {
> val companies = df.select($"id", $"name")
> val directors = df
> .select(explode($"directors"))
> .select($"col.name", $"col.id")
> .dropDuplicates("id")
> val toBeMatched1 = companies
> .filter(length($"name") > 

[jira] [Updated] (SPARK-43514) Unexpected NullPointerException or IllegalArgumentException inside UDFs of ML features caused by certain SQL functions

2023-06-01 Thread Ritika Maheshwari (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-43514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ritika Maheshwari updated SPARK-43514:
--
Attachment: Screen Shot 2023-05-31 at 11.04.06 PM-1.png

> Unexpected NullPointerException or IllegalArgumentException inside UDFs of ML 
> features caused by certain SQL functions
> --
>
> Key: SPARK-43514
> URL: https://issues.apache.org/jira/browse/SPARK-43514
> Project: Spark
>  Issue Type: Bug
>  Components: ML, SQL
>Affects Versions: 3.3.2, 3.4.0
> Environment: Scala version: 2.12.17
> Test examples were executed inside Zeppelin 0.10.1 with Spark 3.4.0.
> Spark 3.3.2 deployed on cluster was used to check the issue on real data.
>Reporter: Svyatoslav Semenyuk
>Priority: Major
>  Labels: ml, sql
> Attachments: 2023-05-30 13-47-04.mp4, Screen Shot 2023-05-22 at 
> 5.39.55 PM.png, Screen Shot 2023-05-31 at 11.04.06 PM-1.png, Screen Shot 
> 2023-05-31 at 11.04.06 PM.png, Test.scala
>
>
> We designed a function that joins two DFs on common column with some 
> similarity. All next code will be on Scala 2.12.
> I've added {{show}} calls for demonstration purposes.
> {code:scala}
> import org.apache.spark.ml.Pipeline
> import org.apache.spark.ml.feature.{HashingTF, MinHashLSH, NGram, 
> RegexTokenizer, MinHashLSHModel}
> import org.apache.spark.sql.{DataFrame, Column}
> /**
>  * Joins two dataframes on a string column using LSH algorithm for similarity 
> computation.
>  *
>  * The output dataframe has three columns:
>  *
>  *   - `datasetA`, data from the left dataframe.
>  *   - `datasetB`, data from the right dataframe.
>  *   - `distance`, abstract distance between values of `joinColumn` from 
> datasets;
>  * the lower `distance` value, the more similar `joinColumn` values are.
>  */
> def similarityJoin(
>   leftDf: DataFrame,
>   rightDf: DataFrame,
>   joinColumn: String,
>   threshold: Double = 0.8,
> ): DataFrame = {
>   leftDf.show(false)
>   rightDf.show(false)
>   val pipeline = new Pipeline().setStages(Array(
> new RegexTokenizer()
>   .setPattern("")
>   .setMinTokenLength(1)
>   .setInputCol(joinColumn)
>   .setOutputCol("tokens"),
> new NGram().setN(3).setInputCol("tokens").setOutputCol("ngrams"),
> new HashingTF().setInputCol("ngrams").setOutputCol("vectors"),
> new MinHashLSH().setInputCol("vectors").setOutputCol("lsh"),
>   ))
>   val model = pipeline.fit(leftDf)
>   val storedHashed = model.transform(leftDf)
>   val landedHashed = model.transform(rightDf)
>   def columnMapper(dsName: String)(c: String): Column = col(s"$dsName.$c") as 
> c
>   val result = model
> .stages
> .last
> .asInstanceOf[MinHashLSHModel]
> .approxSimilarityJoin(storedHashed, landedHashed, threshold, "distance")
> .withColumn("datasetA", 
> struct(leftDf.columns.map(columnMapper("datasetA")).toSeq: _*))
> .withColumn("datasetB", 
> struct(rightDf.columns.map(columnMapper("datasetB")).toSeq: _*))
>   result.show(false)
>   result
> }
> {code}
> Now consider such simple example:
> {code:scala}
> val inputDF1 = Seq("", null).toDF("name").filter(length($"name") > 2) as "df1"
> val inputDF2 = Seq("", null).toDF("name").filter(length($"name") > 2) as "df2"
> similarityJoin(inputDF1, inputDF2, "name", 0.6)
> {code}
> This example runs with no errors and outputs 3 empty DFs. Let's add 
> {{distinct}} method to one data frame:
> {code:scala}
> val inputDF1 = Seq("", null).toDF("name").distinct().filter(length($"name") > 
> 2) as "df1"
> val inputDF2 = Seq("", null).toDF("name").filter(length($"name") > 2) as "df2"
> similarityJoin(inputDF1, inputDF2, "name", 0.6)
> {code}
> This example outputs two empty DFs and then fails at {{result.show(false)}}. 
> Error:
> {code:none}
> org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user 
> defined function (LSHModel$$Lambda$3769/0x000101804840: 
> (struct,values:array>) => 
> array,values:array>>).
>   ... many elided
> Caused by: java.lang.IllegalArgumentException: requirement failed: Must have 
> at least 1 non zero entry.
>   at scala.Predef$.require(Predef.scala:281)
>   at 
> org.apache.spark.ml.feature.MinHashLSHModel.hashFunction(MinHashLSH.scala:61)
>   at org.apache.spark.ml.feature.LSHModel.$anonfun$transform$1(LSH.scala:99)
>   ... many more
> {code}
> 
> Now let's take a look on the example which is close to our application code. 
> Define some helper functions:
> {code:scala}
> import org.apache.spark.sql.functions._
> def process1(df: DataFrame): Unit = {
> val companies = df.select($"id", $"name")
> val directors = df
> .select(explode($"directors"))
> .select($"col.name", $"col.id")
> 

[jira] [Commented] (SPARK-43514) Unexpected NullPointerException or IllegalArgumentException inside UDFs of ML features caused by certain SQL functions

2023-06-25 Thread Ritika Maheshwari (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-43514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17736945#comment-17736945
 ] 

Ritika Maheshwari commented on SPARK-43514:
---

Somehow your code is invoking the hashFunction on MinHashLSHModel with Vector 
containing zero entires

And the following code throws an error
 
{code:java}
 @Since("2.1.0")
  override protected[ml] def hashFunction(elems: Vector): Array[Vector] = {
require(elems.nonZeroIterator.nonEmpty, "Must have at least 1 non zero 
entry.")
{code}

But the example you have provided is not invoking the hashFunction



> Unexpected NullPointerException or IllegalArgumentException inside UDFs of ML 
> features caused by certain SQL functions
> --
>
> Key: SPARK-43514
> URL: https://issues.apache.org/jira/browse/SPARK-43514
> Project: Spark
>  Issue Type: Bug
>  Components: ML, SQL
>Affects Versions: 3.3.2, 3.4.0
> Environment: Scala version: 2.12.17
> Test examples were executed inside Zeppelin 0.10.1 with Spark 3.4.0.
> Spark 3.3.2 deployed on cluster was used to check the issue on real data.
>Reporter: Svyatoslav Semenyuk
>Priority: Major
>  Labels: ml, sql
> Attachments: 2023-05-30 13-47-04.mp4, Plan.png, Screen Shot 
> 2023-05-22 at 5.39.55 PM.png, Screen Shot 2023-05-31 at 11.14.24 PM.png, 
> Test.scala
>
>
> We designed a function that joins two DFs on common column with some 
> similarity. All next code will be on Scala 2.12.
> I've added {{show}} calls for demonstration purposes.
> {code:scala}
> import org.apache.spark.ml.Pipeline
> import org.apache.spark.ml.feature.{HashingTF, MinHashLSH, NGram, 
> RegexTokenizer, MinHashLSHModel}
> import org.apache.spark.sql.{DataFrame, Column}
> /**
>  * Joins two dataframes on a string column using LSH algorithm for similarity 
> computation.
>  *
>  * The output dataframe has three columns:
>  *
>  *   - `datasetA`, data from the left dataframe.
>  *   - `datasetB`, data from the right dataframe.
>  *   - `distance`, abstract distance between values of `joinColumn` from 
> datasets;
>  * the lower `distance` value, the more similar `joinColumn` values are.
>  */
> def similarityJoin(
>   leftDf: DataFrame,
>   rightDf: DataFrame,
>   joinColumn: String,
>   threshold: Double = 0.8,
> ): DataFrame = {
>   leftDf.show(false)
>   rightDf.show(false)
>   val pipeline = new Pipeline().setStages(Array(
> new RegexTokenizer()
>   .setPattern("")
>   .setMinTokenLength(1)
>   .setInputCol(joinColumn)
>   .setOutputCol("tokens"),
> new NGram().setN(3).setInputCol("tokens").setOutputCol("ngrams"),
> new HashingTF().setInputCol("ngrams").setOutputCol("vectors"),
> new MinHashLSH().setInputCol("vectors").setOutputCol("lsh"),
>   ))
>   val model = pipeline.fit(leftDf)
>   val storedHashed = model.transform(leftDf)
>   val landedHashed = model.transform(rightDf)
>   def columnMapper(dsName: String)(c: String): Column = col(s"$dsName.$c") as 
> c
>   val result = model
> .stages
> .last
> .asInstanceOf[MinHashLSHModel]
> .approxSimilarityJoin(storedHashed, landedHashed, threshold, "distance")
> .withColumn("datasetA", 
> struct(leftDf.columns.map(columnMapper("datasetA")).toSeq: _*))
> .withColumn("datasetB", 
> struct(rightDf.columns.map(columnMapper("datasetB")).toSeq: _*))
>   result.show(false)
>   result
> }
> {code}
> Now consider such simple example:
> {code:scala}
> val inputDF1 = Seq("", null).toDF("name").filter(length($"name") > 2) as "df1"
> val inputDF2 = Seq("", null).toDF("name").filter(length($"name") > 2) as "df2"
> similarityJoin(inputDF1, inputDF2, "name", 0.6)
> {code}
> This example runs with no errors and outputs 3 empty DFs. Let's add 
> {{distinct}} method to one data frame:
> {code:scala}
> val inputDF1 = Seq("", null).toDF("name").distinct().filter(length($"name") > 
> 2) as "df1"
> val inputDF2 = Seq("", null).toDF("name").filter(length($"name") > 2) as "df2"
> similarityJoin(inputDF1, inputDF2, "name", 0.6)
> {code}
> This example outputs two empty DFs and then fails at {{result.show(false)}}. 
> Error:
> {code:none}
> org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user 
> defined function (LSHModel$$Lambda$3769/0x000101804840: 
> (struct,values:array>) => 
> array,values:array>>).
>   ... many elided
> Caused by: java.lang.IllegalArgumentException: requirement failed: Must have 
> at least 1 non zero entry.
>   at scala.Predef$.require(Predef.scala:281)
>   at 
> org.apache.spark.ml.feature.MinHashLSHModel.hashFunction(MinHashLSH.scala:61)
>   at org.apache.spark.ml.feature.LSHModel.$anonfun$transform$1(LSH.scala:99)
>   ... many more
> {code}
> 
> Now let's take a look on the example 

[jira] [Commented] (SPARK-45414) spark-xml misplaces string tag content

2024-02-06 Thread Ritika Maheshwari (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-45414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17815121#comment-17815121
 ] 

Ritika Maheshwari commented on SPARK-45414:
---

@Sean Owen 

What is causing this issue is this piece of code 

 
{code:java}
com.databricks.spark.xml.parsers.StaxXMLGenerator  
def writeElement {
val (names, values) = elements.unzip
val elementSchema = StructType(schema.filter(names.contains))
val elementRow = Row.fromSeq(row.toSeq.filter(values.contains))


{code}
 

In this code elements contains only elements, since attributes have already 
been handled before but row contains values for both attributes and elements. 
In this sql the attribute  "SerialNumberFlag" has value ''. But other elements 
like Color also has value ''. Therefore the line of code in red will also get 
the value for "SerialNumberFlag" where as the elementSchema will not have the 
schema for that attribute but instead would have schema for 
MerchandiseHierarchy. Hence MerchandiseHierarchy gets the value '' causing the 
issue.

Here are the values we get for elementSchema and elementRow which show the 
mismatch

 
{code:java}
24/02/06 22:05:38 INFO StaxXmlGenerator$: In apply elementSchema  is 
StructField(ItemID,StructType(StructField(_Type,StringType,true),StructField(_VALUE,StringType,true)),false),StructField(UPC,StructType(StructField(_Type,StringType,true),StructField(_VALUE,StringType,true)),false),StructField(Description,StringType,false),StructField(MerchandiseHierarchy,ArrayType(StructType(StructField(_ID,StringType,true),StructField(_Level,StringType,true)),true),false),StructField(ItemPrice,ArrayType(StructType(StructField(_ValueTypeCode,StringType,true),StructField(_Value,StringType,true)),true),false),StructField(Color,StringType,false),StructField(IntendedIndustry,StringType,false),StructField(Manufacturer,StructType(StructField(Name,StringType,true)),false),StructField(Marketing,StructType(StructField(Season,StringType,true)),false),StructField(BrandOwner,StructType(StructField(_Name,StringType,true)),false),StructField(ItemAttribute_culinary,ArrayType(StructType(StructField(_Name,StringType,true),StructField(AttributeValue,StringType,true)),true),false),StructField(ItemAttribute_noculinary,ArrayType(StructType(StructField(_Name,StringType,true),StructField(_VALUE,ArrayType(ArrayType(StructType(StructField(AttributeCode,StringType,true),StructField(AttributeValue,StringType,true)),true),true),true)),true),false),StructField(ItemMeasurements,StructType(StructField(Depth,StructType(StructField(_UnitOfMeasure,StringType,true),StructField(_VALUE,StringType,true)),true),StructField(Height,StructType(StructField(_UnitOfMeasure,StringType,true),StructField(_VALUE,StringType,true)),true),StructField(Width,StructType(StructField(_UnitOfMeasure,StringType,true),StructField(_VALUE,StringType,true)),true),StructField(Diameter,StructType(StructField(_UnitOfMeasure,StringType,true),StructField(_VALUE,StringType,true)),true)),false),StructField(TaxInformation,StructType(StructField(TaxGroupID,StringType,true),StructField(TaxExemptCode,StringType,true),StructField(TaxAmount,StringType,true)),false),StructField(ItemImageUrl,StringType,false),StructField(ItemFranchisees,ArrayType(ArrayType(StructType(StructField(_action,StringType,true),StructField(_franchiseeId,StringType,true),StructField(_franchiseeName,StringType,true)),true),true),false)

24/02/06 22:05:38 INFO StaxXmlGenerator$:  In apply the elementRow is 
[ItemId,123],[UPC,123],MyDescription,,WrappedArray([null,null]),WrappedArray([null,null]),,,[null],[null],[null],WrappedArray([Attribute1,Value..
 


{code}

> spark-xml misplaces string tag content
> --
>
> Key: SPARK-45414
> URL: https://issues.apache.org/jira/browse/SPARK-45414
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark, Spark Core
>Affects Versions: 3.3.0
>Reporter: Giuseppe Ceravolo
>Priority: Critical
> Attachments: IllegalArgumentException.txt, Screen Shot 2024-02-01 at 
> 7.46.29 PM.png
>
>
> h1. Intro
> Hi all! Please expect some degree of incompleteness in this issue as this is 
> the very first one I post, and feel free to edit it as you like - I welcome 
> your feedback.
> My goal is to provide you with as many details and indications as I can on 
> this issue that I am currently facing with a Client of mine on its Production 
> environment (we use Azure Databricks DBR 11.3 LTS).
> I was told by Sean Owen [[srowen (Sean Owen) 
> (github.com)|https://github.com/srowen]], who maintains the spark-xml maven 
> repository on GitHub [[https://github.com/srowen/spark-xml]] to post an issue 
> here because "This code has been ported to Apache Spark now anyway so won't 
> be updated here" (refer to his comment [here|#issuecomment-1744792958]).
> h1. Issue
> When I write a 

[jira] [Commented] (SPARK-45414) spark-xml misplaces string tag content

2024-02-06 Thread Ritika Maheshwari (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-45414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17815123#comment-17815123
 ] 

Ritika Maheshwari commented on SPARK-45414:
---

So basically the fix is to replace 

val elementRow = Row.fromSeq(row.toSeq.filter(values.contains))

with

val elementRow = Row.fromSeq(values.toSeq)

 

> spark-xml misplaces string tag content
> --
>
> Key: SPARK-45414
> URL: https://issues.apache.org/jira/browse/SPARK-45414
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark, Spark Core
>Affects Versions: 3.3.0
>Reporter: Giuseppe Ceravolo
>Priority: Critical
> Attachments: IllegalArgumentException.txt, Screen Shot 2024-02-01 at 
> 7.46.29 PM.png
>
>
> h1. Intro
> Hi all! Please expect some degree of incompleteness in this issue as this is 
> the very first one I post, and feel free to edit it as you like - I welcome 
> your feedback.
> My goal is to provide you with as many details and indications as I can on 
> this issue that I am currently facing with a Client of mine on its Production 
> environment (we use Azure Databricks DBR 11.3 LTS).
> I was told by Sean Owen [[srowen (Sean Owen) 
> (github.com)|https://github.com/srowen]], who maintains the spark-xml maven 
> repository on GitHub [[https://github.com/srowen/spark-xml]] to post an issue 
> here because "This code has been ported to Apache Spark now anyway so won't 
> be updated here" (refer to his comment [here|#issuecomment-1744792958]).
> h1. Issue
> When I write a DataFrame into xml format via the spark-xml library either (1) 
> I get an error if empty string columns are in between non-string nested ones 
> or (2) if I put all string columns at the end then I get a wrong xml where 
> the content of string tags are misplaced into the following ones.
> h1. Code to reproduce the issue
> Please find below the end-to-end code snippet that results into the error
> h2. CASE (1): ERROR
> When empty strings are in between non-string nested ones, the write fails 
> with the following error.
> _Caused by: java.lang.IllegalArgumentException: Failed to convert value 
> MyDescription (class of class java.lang.String) in type 
> ArrayType(StructType(StructField(_ID,StringType,true),StructField(_Level,StringType,true)),true)
>  to XML._
> Please find attached the full trace of the error.
> {code:python}
> fake_file_df = spark \
>     .sql(
>         """SELECT
>             CAST(STRUCT('ItemId' AS `_Type`, '123' AS `_VALUE`) AS 
> STRUCT<_Type: STRING, _VALUE: STRING>) AS ItemID,
>             CAST(STRUCT('UPC' AS `_Type`, '123' AS `_VALUE`) AS STRUCT<_Type: 
> STRING, _VALUE: STRING>) AS UPC,
>             CAST('' AS STRING) AS _SerialNumberFlag,
>             CAST('MyDescription' AS STRING) AS Description,
>             CAST(ARRAY(STRUCT(NULL AS `_ID`, NULL AS `_Level`)) AS 
> ARRAY>) AS MerchandiseHierarchy,
>             CAST(ARRAY(STRUCT(NULL AS `_ValueTypeCode`, NULL AS `_VALUE`)) AS 
> ARRAY>) AS ItemPrice,
>             CAST('' AS STRING) AS Color,
>             CAST('' AS STRING) AS IntendedIndustry,
>             CAST(STRUCT(NULL AS `Name`) AS STRUCT) AS 
> Manufacturer,
>             CAST(STRUCT(NULL AS `Season`) AS STRUCT) AS 
> Marketing,
>             CAST(STRUCT(NULL AS `_Name`) AS STRUCT<_Name: STRING>) AS 
> BrandOwner,
>             CAST(ARRAY(STRUCT('Attribute1' AS `_Name`, 'Value1' AS `_VALUE`)) 
> AS ARRAY>) AS 
> ItemAttribute_culinary,
>             CAST(ARRAY(STRUCT(NULL AS `_Name`, ARRAY(ARRAY(STRUCT(NULL AS 
> `AttributeCode`, NULL AS `AttributeValue`))) AS `_VALUE`)) AS 
> ARRAY AttributeValue: STRING>) AS ItemAttribute_noculinary,
>             CAST(STRUCT(STRUCT(NULL AS `_UnitOfMeasure`, NULL AS `_VALUE`) AS 
> `Depth`, STRUCT(NULL AS `_UnitOfMeasure`, NULL AS `_VALUE`) AS `Height`, 
> STRUCT(NULL AS `_UnitOfMeasure`, NULL AS `_VALUE`) AS `Width`, STRUCT(NULL AS 
> `_UnitOfMeasure`, NULL AS `_VALUE`) AS `Diameter`) AS STRUCT STRUCT<_UnitOfMeasure: STRING, _VALUE: STRING>, Height: 
> STRUCT<_UnitOfMeasure: STRING, _VALUE: STRING>, Width: STRUCT<_UnitOfMeasure: 
> STRING, _VALUE: STRING>, Diameter: STRUCT<_UnitOfMeasure: STRING, _VALUE: 
> STRING>>) AS ItemMeasurements,
>             CAST(STRUCT('GroupA' AS `TaxGroupID`, 'CodeA' AS `TaxExemptCode`, 
> '1' AS `TaxAmount`) AS STRUCT TaxAmount: STRING>) AS TaxInformation,
>             CAST('' AS STRING) AS ItemImageUrl,
>             CAST(ARRAY(ARRAY(STRUCT(NULL AS `_action`, NULL AS 
> `_franchiseeId`, NULL AS `_franchiseeName`))) AS ARRAY STRING, _franchiseeId: STRING, _franchiseeName: STRING>>>) AS ItemFranchisees,
>             CAST('Add' AS STRING) AS _Action
>         ;"""
>     )
> # fake_file_df.display()
> fake_file_df \
>     .coalesce(1) \
>     .write \
>     .format('com.databricks.spark.xml') \
>     .option('declaration', 

[jira] [Updated] (SPARK-45414) spark-xml misplaces string tag content

2024-02-01 Thread Ritika Maheshwari (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-45414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ritika Maheshwari updated SPARK-45414:
--
Attachment: Screen Shot 2024-02-01 at 7.46.29 PM.png

> spark-xml misplaces string tag content
> --
>
> Key: SPARK-45414
> URL: https://issues.apache.org/jira/browse/SPARK-45414
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark, Spark Core
>Affects Versions: 3.3.0
>Reporter: Giuseppe Ceravolo
>Priority: Critical
> Attachments: IllegalArgumentException.txt, Screen Shot 2024-02-01 at 
> 7.46.29 PM.png
>
>
> h1. Intro
> Hi all! Please expect some degree of incompleteness in this issue as this is 
> the very first one I post, and feel free to edit it as you like - I welcome 
> your feedback.
> My goal is to provide you with as many details and indications as I can on 
> this issue that I am currently facing with a Client of mine on its Production 
> environment (we use Azure Databricks DBR 11.3 LTS).
> I was told by Sean Owen [[srowen (Sean Owen) 
> (github.com)|https://github.com/srowen]], who maintains the spark-xml maven 
> repository on GitHub [[https://github.com/srowen/spark-xml]] to post an issue 
> here because "This code has been ported to Apache Spark now anyway so won't 
> be updated here" (refer to his comment [here|#issuecomment-1744792958]).
> h1. Issue
> When I write a DataFrame into xml format via the spark-xml library either (1) 
> I get an error if empty string columns are in between non-string nested ones 
> or (2) if I put all string columns at the end then I get a wrong xml where 
> the content of string tags are misplaced into the following ones.
> h1. Code to reproduce the issue
> Please find below the end-to-end code snippet that results into the error
> h2. CASE (1): ERROR
> When empty strings are in between non-string nested ones, the write fails 
> with the following error.
> _Caused by: java.lang.IllegalArgumentException: Failed to convert value 
> MyDescription (class of class java.lang.String) in type 
> ArrayType(StructType(StructField(_ID,StringType,true),StructField(_Level,StringType,true)),true)
>  to XML._
> Please find attached the full trace of the error.
> {code:python}
> fake_file_df = spark \
>     .sql(
>         """SELECT
>             CAST(STRUCT('ItemId' AS `_Type`, '123' AS `_VALUE`) AS 
> STRUCT<_Type: STRING, _VALUE: STRING>) AS ItemID,
>             CAST(STRUCT('UPC' AS `_Type`, '123' AS `_VALUE`) AS STRUCT<_Type: 
> STRING, _VALUE: STRING>) AS UPC,
>             CAST('' AS STRING) AS _SerialNumberFlag,
>             CAST('MyDescription' AS STRING) AS Description,
>             CAST(ARRAY(STRUCT(NULL AS `_ID`, NULL AS `_Level`)) AS 
> ARRAY>) AS MerchandiseHierarchy,
>             CAST(ARRAY(STRUCT(NULL AS `_ValueTypeCode`, NULL AS `_VALUE`)) AS 
> ARRAY>) AS ItemPrice,
>             CAST('' AS STRING) AS Color,
>             CAST('' AS STRING) AS IntendedIndustry,
>             CAST(STRUCT(NULL AS `Name`) AS STRUCT) AS 
> Manufacturer,
>             CAST(STRUCT(NULL AS `Season`) AS STRUCT) AS 
> Marketing,
>             CAST(STRUCT(NULL AS `_Name`) AS STRUCT<_Name: STRING>) AS 
> BrandOwner,
>             CAST(ARRAY(STRUCT('Attribute1' AS `_Name`, 'Value1' AS `_VALUE`)) 
> AS ARRAY>) AS 
> ItemAttribute_culinary,
>             CAST(ARRAY(STRUCT(NULL AS `_Name`, ARRAY(ARRAY(STRUCT(NULL AS 
> `AttributeCode`, NULL AS `AttributeValue`))) AS `_VALUE`)) AS 
> ARRAY AttributeValue: STRING>) AS ItemAttribute_noculinary,
>             CAST(STRUCT(STRUCT(NULL AS `_UnitOfMeasure`, NULL AS `_VALUE`) AS 
> `Depth`, STRUCT(NULL AS `_UnitOfMeasure`, NULL AS `_VALUE`) AS `Height`, 
> STRUCT(NULL AS `_UnitOfMeasure`, NULL AS `_VALUE`) AS `Width`, STRUCT(NULL AS 
> `_UnitOfMeasure`, NULL AS `_VALUE`) AS `Diameter`) AS STRUCT STRUCT<_UnitOfMeasure: STRING, _VALUE: STRING>, Height: 
> STRUCT<_UnitOfMeasure: STRING, _VALUE: STRING>, Width: STRUCT<_UnitOfMeasure: 
> STRING, _VALUE: STRING>, Diameter: STRUCT<_UnitOfMeasure: STRING, _VALUE: 
> STRING>>) AS ItemMeasurements,
>             CAST(STRUCT('GroupA' AS `TaxGroupID`, 'CodeA' AS `TaxExemptCode`, 
> '1' AS `TaxAmount`) AS STRUCT TaxAmount: STRING>) AS TaxInformation,
>             CAST('' AS STRING) AS ItemImageUrl,
>             CAST(ARRAY(ARRAY(STRUCT(NULL AS `_action`, NULL AS 
> `_franchiseeId`, NULL AS `_franchiseeName`))) AS ARRAY STRING, _franchiseeId: STRING, _franchiseeName: STRING>>>) AS ItemFranchisees,
>             CAST('Add' AS STRING) AS _Action
>         ;"""
>     )
> # fake_file_df.display()
> fake_file_df \
>     .coalesce(1) \
>     .write \
>     .format('com.databricks.spark.xml') \
>     .option('declaration', 'version="1.0" encoding="UTF-8"') \
>     .option("nullValue", "") \
>     .option('rootTag', "root_tag") \
>     .option('rowTag', "row_tag") \
>