[jira] [Updated] (SPARK-47320) Datasets involving self joins behave in an inconsistent and unintuitive manner

2024-03-09 Thread Asif (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47320?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Asif updated SPARK-47320:
-
Description: 
The behaviour of Datasets involving self joins behave in an unintuitive manner 
in terms when AnalysisException is thrown due to ambiguity and when it works.

Found situations where join order swapping causes query to throw Ambiguity 
related exceptions which otherwise passes.  Some of the Datasets which from 
user perspective are un-ambiguous will result in Analysis Exception getting 
thrown.

After testing and fixing a bug , I think the issue lies in inconsistency in 
determining what constitutes ambiguous and what is un-ambiguous.

There are two ways to look at resolution regarding ambiguity

1) ExprId of attributes : This is unintuitive approach as spark users do not 
bother with the ExprIds

2) Column Extraction from the Dataset using df(col) api : Which is the user 
visible/understandable Point of View.  So determining ambiguity should be based 
on this. What is Logically unambiguous from users perspective ( assuming its is 
logically correct) , should also be the basis of spark product, to decide on 
un-ambiguity.

For Example:
{quote}val df1 = Seq((1, 2)).toDF("a", "b")
val df2 = Seq((1, 2)).toDF("aa", "bb")
val df1Joindf2 = df1.join(df2, df1("a") === df2("aa")).select(df1("a"),
df2("aa"), df1("b"))
val df3 = df1Joindf2.join(df1, df1Joindf2("aa") === df1("a")).select(df1("a"))
{quote}
The above code from perspective #1 should throw ambiguity exception, because 
the join condition and projection of df3 dataframe, has df1("a) which has 
exprId which matches both df1Joindf2 and df1.

But if we look is from perspective of Dataset used to get column, which is the 
intent of the user, the expectation is that df1("a) should be resolved to 
Dataset df1 being joined, and not 
df1Joindf2. If user intended "a" from df1Joindf2, then would have used 
df1Joindf2("a")

So In this case , current spark throws Exception as it is using resolution 
based on # 1

But the below Dataframe by the above logic, should also throw Ambiguity 
Exception but it passes
{quote}val df1 = Seq((1, 2)).toDF("a", "b")
val df2 = Seq((1, 2)).toDF("aa", "bb")
val df1Joindf2 = df1.join(df2, df1("a") === df2("aa")).select(df1("a"),
df2("aa"), df1("b"))

df1Joindf2.join(df1, df1Joindf2("a") === df1("a"))
{quote}
The difference in the 2 cases is that in the first case , select is present.
But in the 2nd query, select is not there.

So this implies that in 1st case the df1("a") in projection is causing 
ambiguity issue, but same reference in 2nd case, used just in condition, is 
considered un-ambiguous.

IMHO , the ambiguity identification criteria should be based totally on #2 and 
consistently.

In the DataFrameJoinTest and DataFrameSelfJoinTest, if we go by #2, some of the 
tests which are being considered ambiguous ( on # 1 criteria) become 
un-ambiguous using (#2) criteria.

There is an existing test in DataFrameSelfJoinSuite
{quote}test("SPARK-28344: fail ambiguous self join - column ref in Project") 
val df1 = spark.range(3)
val df2 = df1.filter($"id" > 0)

Assertion1 : existing 
assertAmbiguousSelfJoin(df1.join(df2).select(df2("id")))

Assertion2 : added by me
assertAmbiguousSelfJoin(df2.join(df1).select(df2("id")))

}
{quote}
Here the Assertion1 passes ( that is ambiguous exception is thrown)
But the Assertion2 fails ( that is no ambiguous exception is thrown)
The only chnage is the join order

Logically both the assertions are invalid ( In the sense both should NOT be 
throwing Exception as from the user's perspective there is no ambiguity.

 

Also much of this confusion arises, because join conditions are attempted being 
resolved on the "un-deduplicated" plan. Attempt to resolve join condition 
should be made after the deduplication of Join Plan. Which is what the PR for 
the bug does.

 

  was:
The behaviour of Datasets involving self joins behave in an unintuitive manner 
in terms when AnalysisException is thrown due to ambiguity and when it works.

Found situations where join order swapping causes query to throw Ambiguity 
related exceptions which otherwise passes.  Some of the Datasets which from 
user perspective are un-ambiguous will result in Analysis Exception getting 
thrown.

After testing and fixing a bug , I think the issue lies in inconsistency in 
determining what constitutes ambiguous and what is un-ambiguous.

There are two ways to look at resolution regarding ambiguity

1) ExprId of attributes : This is unintuitive approach as spark users do not 
bother with the ExprIds

2) Column Extraction from the Dataset using df(col) api : Which is the user 
visible/understandable Point of View.  So determining ambiguity should be based 
on this. What is Logically unambiguous from users perspective ( assuming its is 
logically correct) , should also be the basis of spark product, to decide on 
un-ambiguity.

For Example:
{quote}

[jira] [Updated] (SPARK-47240) SPIP: Structured Logging Framework for Apache Spark

2024-03-09 Thread Gengliang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47240?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gengliang Wang updated SPARK-47240:
---
Description: 
This proposal aims to enhance Apache Spark's logging system by implementing 
structured logging. This transition will change the format of the default log 
files from plain text to JSON, making them more accessible and analyzable. The 
new logs will include crucial identifiers such as worker, executor, query, job, 
stage, and task IDs, thereby making the logs more informative and facilitating 
easier search and analysis.
h2. Current Logging Format

The current format of Spark logs is plain text, which can be challenging to 
parse and analyze efficiently. An example of the current log format is as 
follows:
{code:java}
23/11/29 17:53:44 ERROR BlockManagerMasterEndpoint: Fail to know the executor 
289 is alive or not.
org.apache.spark.SparkException: Exception thrown in awaitResult: 
    
Caused by: org.apache.spark.rpc.RpcEndpointNotFoundException: ..
{code}
h2. Proposed Structured Logging Format

The proposed change involves structuring the logs in JSON format, which 
organizes the log information into easily identifiable fields. Here is how the 
new structured log format would look:
{code:java}
{
   "ts":"23/11/29 17:53:44",
   "level":"ERROR",
   "msg":"Fail to know the executor 289 is alive or not",
   "context":{
       "executor_id":"289"
   },
   "exception":{
      "class":"org.apache.spark.SparkException",
      "msg":"Exception thrown in awaitResult",
      "stackTrace":"..."
   },
   "source":"BlockManagerMasterEndpoint"
} {code}
This format will enable users to upload and directly query 
driver/executor/master/worker log files using Spark SQL for more effective 
problem-solving and analysis, such as tracking executor losses or identifying 
faulty tasks:
{code:java}
spark.read.json("hdfs://hdfs_host/logs").createOrReplaceTempView("logs")
/* To get all the executor lost logs */
SELECT * FROM logs WHERE contains(message, 'Lost executor');
/* To get all the distributed logs about executor 289 */
SELECT * FROM logs WHERE executor_id = 289;
/* To get all the errors on host 100.116.29.4 */
SELECT * FROM logs WHERE host = "100.116.29.4" and log_level="ERROR";
{code}
 

SPIP doc: 
[https://docs.google.com/document/d/1rATVGmFLNVLmtxSpWrEceYm7d-ocgu8ofhryVs4g3XU/edit?usp=sharing]

  was:
This proposal aims to enhance Apache Spark's logging system by implementing 
structured logging. This transition will change the format of the default log 
files from plain text to JSON, making them more accessible and analyzable. The 
new logs will include crucial identifiers such as worker, executor, query, job, 
stage, and task IDs, thereby making the logs more informative and facilitating 
easier search and analysis.
h2. Current Logging Format

The current format of Spark logs is plain text, which can be challenging to 
parse and analyze efficiently. An example of the current log format is as 
follows:
{code:java}
23/11/29 17:53:44 ERROR TaskSchedulerImpl: Lost executor 289 on 100.116.29.4: 
Executor heartbeat timed out after 150300 ms{code}
h2. Proposed Structured Logging Format

The proposed change involves structuring the logs in JSON format, which 
organizes the log information into easily identifiable fields. Here is how the 
new structured log format would look:
{code:java}
{"ts": "23/11/29 17:53:44","level": "ERROR", "message": "Lost executor 289 on 
100.116.29.4: Executor heartbeat timed out after 150300 ms", "logger": 
"TaskSchedulerImpl","executor_id":  289, "host":  "100.116.29.4"} 
{code}
This format will enable users to upload and directly query 
driver/executor/master/worker log files using Spark SQL for more effective 
problem-solving and analysis, such as tracking executor losses or identifying 
faulty tasks:
{code:java}
spark.read.json("hdfs://hdfs_host/logs").createOrReplaceTempView("logs")
/* To get all the executor lost logs */
SELECT * FROM logs WHERE contains(message, 'Lost executor');
/* To get all the distributed logs about executor 289 */
SELECT * FROM logs WHERE executor_id = 289;
/* To get all the errors on host 100.116.29.4 */
SELECT * FROM logs WHERE host = "100.116.29.4" and log_level="ERROR";
{code}
 

SPIP doc: 
[https://docs.google.com/document/d/1rATVGmFLNVLmtxSpWrEceYm7d-ocgu8ofhryVs4g3XU/edit?usp=sharing]


> SPIP: Structured Logging Framework for Apache Spark
> ---
>
> Key: SPARK-47240
> URL: https://issues.apache.org/jira/browse/SPARK-47240
> Project: Spark
>  Issue Type: New Feature
>  Components: Project Infra
>Affects Versions: 4.0.0
>Reporter: Gengliang Wang
>Priority: Major
>
> This proposal aims to enhance Apache Spark's logging system by implementing 
> structured logging. This transition will change the format of the default

[jira] [Updated] (SPARK-47240) SPIP: Structured Logging Framework for Apache Spark

2024-03-09 Thread Gengliang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47240?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gengliang Wang updated SPARK-47240:
---
Summary: SPIP: Structured Logging Framework for Apache Spark  (was: SPIP: 
Structured Spark Logging)

> SPIP: Structured Logging Framework for Apache Spark
> ---
>
> Key: SPARK-47240
> URL: https://issues.apache.org/jira/browse/SPARK-47240
> Project: Spark
>  Issue Type: New Feature
>  Components: Project Infra
>Affects Versions: 4.0.0
>Reporter: Gengliang Wang
>Priority: Major
>
> This proposal aims to enhance Apache Spark's logging system by implementing 
> structured logging. This transition will change the format of the default log 
> files from plain text to JSON, making them more accessible and analyzable. 
> The new logs will include crucial identifiers such as worker, executor, 
> query, job, stage, and task IDs, thereby making the logs more informative and 
> facilitating easier search and analysis.
> h2. Current Logging Format
> The current format of Spark logs is plain text, which can be challenging to 
> parse and analyze efficiently. An example of the current log format is as 
> follows:
> {code:java}
> 23/11/29 17:53:44 ERROR TaskSchedulerImpl: Lost executor 289 on 100.116.29.4: 
> Executor heartbeat timed out after 150300 ms{code}
> h2. Proposed Structured Logging Format
> The proposed change involves structuring the logs in JSON format, which 
> organizes the log information into easily identifiable fields. Here is how 
> the new structured log format would look:
> {code:java}
> {"ts": "23/11/29 17:53:44","level": "ERROR", "message": "Lost executor 289 on 
> 100.116.29.4: Executor heartbeat timed out after 150300 ms", "logger": 
> "TaskSchedulerImpl","executor_id":  289, "host":  "100.116.29.4"} 
> {code}
> This format will enable users to upload and directly query 
> driver/executor/master/worker log files using Spark SQL for more effective 
> problem-solving and analysis, such as tracking executor losses or identifying 
> faulty tasks:
> {code:java}
> spark.read.json("hdfs://hdfs_host/logs").createOrReplaceTempView("logs")
> /* To get all the executor lost logs */
> SELECT * FROM logs WHERE contains(message, 'Lost executor');
> /* To get all the distributed logs about executor 289 */
> SELECT * FROM logs WHERE executor_id = 289;
> /* To get all the errors on host 100.116.29.4 */
> SELECT * FROM logs WHERE host = "100.116.29.4" and log_level="ERROR";
> {code}
>  
> SPIP doc: 
> [https://docs.google.com/document/d/1rATVGmFLNVLmtxSpWrEceYm7d-ocgu8ofhryVs4g3XU/edit?usp=sharing]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-47334) Make `withColumnRenamed` reuse the implementation of `withColumnsRenamed`

2024-03-09 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47334?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated SPARK-47334:
---
Labels: pull-request-available  (was: )

> Make `withColumnRenamed` reuse the implementation of `withColumnsRenamed`
> -
>
> Key: SPARK-47334
> URL: https://issues.apache.org/jira/browse/SPARK-47334
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 4.0.0
>Reporter: Ruifeng Zheng
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-47334) Make `withColumnRenamed` reuse the implementation of `withColumnsRenamed`

2024-03-09 Thread Ruifeng Zheng (Jira)
Ruifeng Zheng created SPARK-47334:
-

 Summary: Make `withColumnRenamed` reuse the implementation of 
`withColumnsRenamed`
 Key: SPARK-47334
 URL: https://issues.apache.org/jira/browse/SPARK-47334
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 4.0.0
Reporter: Ruifeng Zheng






--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-47333) Use checkInputDataTypes to check the parameter types of the function to_xml & remove _LEGACY_ERROR_TEMP_3234

2024-03-09 Thread BingKun Pan (Jira)
BingKun Pan created SPARK-47333:
---

 Summary: Use checkInputDataTypes to check the parameter types of 
the function to_xml & remove _LEGACY_ERROR_TEMP_3234
 Key: SPARK-47333
 URL: https://issues.apache.org/jira/browse/SPARK-47333
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 4.0.0
Reporter: BingKun Pan






--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org