[jira] [Updated] (SPARK-44458) ThreadLocal not being copied to child thread when child thread is reused from pool

2023-07-17 Thread Kapil Singh (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44458?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kapil Singh updated SPARK-44458:

Affects Version/s: 3.1.0
   (was: 3.4.1)

> ThreadLocal not being copied to child thread when child thread is reused from 
> pool 
> ---
>
> Key: SPARK-44458
> URL: https://issues.apache.org/jira/browse/SPARK-44458
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.1.0
>Reporter: Kapil Singh
>Priority: Major
>
> Currently setting the Spark configuration using the statement 
> `spark.conf.set("spark.sql.caseSensitive", "true")` and subsequently 
> executing a Spark action in a separate thread. An intermittent observation 
> reveals that occasionally the configuration set in the main thread fails to 
> propagate to the child thread
> Steps to repro: [Spark SQL configs can't get propagated properly to a new 
> thread in Spark 3.1 issue · 
> GitHub|https://gist.github.com/t-rufang/25341b3678e5d7c74e3a209457fce0e9]
>  
> This is limitation due to ThreadLocal not being copied to child thread when 
> child thread is reused from pool and only done instead in thread creation or 
> when idle thread has expired KeepAlive in Executor
> Refer:  
> [https://users.scala-lang.org/t/future-executioncontext-and-threadlocal/7675/2]
>  
> [https://www.stevenskelton.ca/threadlocal-variables-scala-futures/] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-44458) ThreadLocal not being copied to child thread when child thread is reused from pool

2023-07-17 Thread Kapil Singh (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44458?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kapil Singh updated SPARK-44458:

Attachment: (was: image-2023-07-17-16-57-09-669.png)

> ThreadLocal not being copied to child thread when child thread is reused from 
> pool 
> ---
>
> Key: SPARK-44458
> URL: https://issues.apache.org/jira/browse/SPARK-44458
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.4.1
>Reporter: Kapil Singh
>Priority: Major
>
> Currently setting the Spark configuration using the statement 
> `spark.conf.set("spark.sql.caseSensitive", "true")` and subsequently 
> executing a Spark action in a separate thread. An intermittent observation 
> reveals that occasionally the configuration set in the main thread fails to 
> propagate to the child thread
> Steps to repro: [Spark SQL configs can't get propagated properly to a new 
> thread in Spark 3.1 issue · 
> GitHub|https://gist.github.com/t-rufang/25341b3678e5d7c74e3a209457fce0e9]
>  
> This is limitation due to ThreadLocal not being copied to child thread when 
> child thread is reused from pool and only done instead in thread creation or 
> when idle thread has expired KeepAlive in Executor
> Refer:  
> [https://users.scala-lang.org/t/future-executioncontext-and-threadlocal/7675/2]
>  
> [https://www.stevenskelton.ca/threadlocal-variables-scala-futures/] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-44458) ThreadLocal not being copied to child thread when child thread is reused from pool

2023-07-17 Thread Kapil Singh (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44458?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kapil Singh updated SPARK-44458:

Attachment: (was: image-2023-07-17-16-56-59-388.png)

> ThreadLocal not being copied to child thread when child thread is reused from 
> pool 
> ---
>
> Key: SPARK-44458
> URL: https://issues.apache.org/jira/browse/SPARK-44458
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.4.1
>Reporter: Kapil Singh
>Priority: Major
>
> Currently setting the Spark configuration using the statement 
> `spark.conf.set("spark.sql.caseSensitive", "true")` and subsequently 
> executing a Spark action in a separate thread. An intermittent observation 
> reveals that occasionally the configuration set in the main thread fails to 
> propagate to the child thread
> Steps to repro: [Spark SQL configs can't get propagated properly to a new 
> thread in Spark 3.1 issue · 
> GitHub|https://gist.github.com/t-rufang/25341b3678e5d7c74e3a209457fce0e9]
>  
> This is limitation due to ThreadLocal not being copied to child thread when 
> child thread is reused from pool and only done instead in thread creation or 
> when idle thread has expired KeepAlive in Executor
> Refer:  
> [https://users.scala-lang.org/t/future-executioncontext-and-threadlocal/7675/2]
>  
> [https://www.stevenskelton.ca/threadlocal-variables-scala-futures/] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-44458) ThreadLocal not being copied to child thread when child thread is reused from pool

2023-07-17 Thread Kapil Singh (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44458?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kapil Singh updated SPARK-44458:

Description: 
Currently setting the Spark configuration using the statement 
`spark.conf.set("spark.sql.caseSensitive", "true")` and subsequently executing 
a Spark action in a separate thread. An intermittent observation reveals that 
occasionally the configuration set in the main thread fails to propagate to the 
child thread

Steps to repro: [Spark SQL configs can't get propagated properly to a new 
thread in Spark 3.1 issue · 
GitHub|https://gist.github.com/t-rufang/25341b3678e5d7c74e3a209457fce0e9]

 

This is limitation due to ThreadLocal not being copied to child thread when 
child thread is reused from pool and only done instead in thread creation or 
when idle thread has expired KeepAlive in Executor

Refer:  

[https://users.scala-lang.org/t/future-executioncontext-and-threadlocal/7675/2] 

[https://www.stevenskelton.ca/threadlocal-variables-scala-futures/] 

  was:
Currently setting the Spark configuration using the statement 
`spark.conf.set("spark.sql.caseSensitive", "true")` and subsequently executing 
a Spark action in a separate thread. An intermittent observation reveals that 
occasionally the configuration set in the main thread fails to propagate to the 
child thread.

!image-2023-07-17-16-56-59-388.png!

 

!image-2023-07-17-16-57-09-669.png!

This is limitation due to ThreadLocal not being copied to child thread when 
child thread is reused from pool and only done instead in thread creation or 
when idle thread has expired KeepAlive in Executor

Refer:  

[https://users.scala-lang.org/t/future-executioncontext-and-threadlocal/7675/2] 

[https://www.stevenskelton.ca/threadlocal-variables-scala-futures/] 


> ThreadLocal not being copied to child thread when child thread is reused from 
> pool 
> ---
>
> Key: SPARK-44458
> URL: https://issues.apache.org/jira/browse/SPARK-44458
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.4.1
>Reporter: Kapil Singh
>Priority: Major
> Attachments: image-2023-07-17-16-56-59-388.png, 
> image-2023-07-17-16-57-09-669.png
>
>
> Currently setting the Spark configuration using the statement 
> `spark.conf.set("spark.sql.caseSensitive", "true")` and subsequently 
> executing a Spark action in a separate thread. An intermittent observation 
> reveals that occasionally the configuration set in the main thread fails to 
> propagate to the child thread
> Steps to repro: [Spark SQL configs can't get propagated properly to a new 
> thread in Spark 3.1 issue · 
> GitHub|https://gist.github.com/t-rufang/25341b3678e5d7c74e3a209457fce0e9]
>  
> This is limitation due to ThreadLocal not being copied to child thread when 
> child thread is reused from pool and only done instead in thread creation or 
> when idle thread has expired KeepAlive in Executor
> Refer:  
> [https://users.scala-lang.org/t/future-executioncontext-and-threadlocal/7675/2]
>  
> [https://www.stevenskelton.ca/threadlocal-variables-scala-futures/] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-44458) ThreadLocal not being copied to child thread when child thread is reused from pool

2023-07-17 Thread Kapil Singh (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44458?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kapil Singh updated SPARK-44458:

Description: 
Currently setting the Spark configuration using the statement 
`spark.conf.set("spark.sql.caseSensitive", "true")` and subsequently executing 
a Spark action in a separate thread. An intermittent observation reveals that 
occasionally the configuration set in the main thread fails to propagate to the 
child thread.

!image-2023-07-17-16-56-59-388.png!

 

!image-2023-07-17-16-57-09-669.png!

This is limitation due to ThreadLocal not being copied to child thread when 
child thread is reused from pool and only done instead in thread creation or 
when idle thread has expired KeepAlive in Executor

Refer:  

[https://users.scala-lang.org/t/future-executioncontext-and-threadlocal/7675/2] 

[https://www.stevenskelton.ca/threadlocal-variables-scala-futures/] 

  was:
Currently setting the Spark configuration using the statement 
`spark.conf.set("spark.sql.caseSensitive", "true")` and subsequently executing 
a Spark action in a separate thread. An intermittent observation reveals that 
occasionally the configuration set in the main thread fails to propagate to the 
child thread.

!image-2023-07-17-16-54-26-372.png!

!image-2023-07-17-16-54-36-860.png!

 

This is limitation due to ThreadLocal not being copied to child thread when 
child thread is reused from pool and only done instead in thread creation or 
when idle thread has expired KeepAlive in Executor

Refer:  

[https://users.scala-lang.org/t/future-executioncontext-and-threadlocal/7675/2] 

[https://www.stevenskelton.ca/threadlocal-variables-scala-futures/] 


> ThreadLocal not being copied to child thread when child thread is reused from 
> pool 
> ---
>
> Key: SPARK-44458
> URL: https://issues.apache.org/jira/browse/SPARK-44458
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.4.1
>Reporter: Kapil Singh
>Priority: Major
> Attachments: image-2023-07-17-16-56-59-388.png, 
> image-2023-07-17-16-57-09-669.png
>
>
> Currently setting the Spark configuration using the statement 
> `spark.conf.set("spark.sql.caseSensitive", "true")` and subsequently 
> executing a Spark action in a separate thread. An intermittent observation 
> reveals that occasionally the configuration set in the main thread fails to 
> propagate to the child thread.
> !image-2023-07-17-16-56-59-388.png!
>  
> !image-2023-07-17-16-57-09-669.png!
> This is limitation due to ThreadLocal not being copied to child thread when 
> child thread is reused from pool and only done instead in thread creation or 
> when idle thread has expired KeepAlive in Executor
> Refer:  
> [https://users.scala-lang.org/t/future-executioncontext-and-threadlocal/7675/2]
>  
> [https://www.stevenskelton.ca/threadlocal-variables-scala-futures/] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-44458) ThreadLocal not being copied to child thread when child thread is reused from pool

2023-07-17 Thread Kapil Singh (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44458?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kapil Singh updated SPARK-44458:

Attachment: image-2023-07-17-16-56-59-388.png

> ThreadLocal not being copied to child thread when child thread is reused from 
> pool 
> ---
>
> Key: SPARK-44458
> URL: https://issues.apache.org/jira/browse/SPARK-44458
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.4.1
>Reporter: Kapil Singh
>Priority: Major
> Attachments: image-2023-07-17-16-56-59-388.png, 
> image-2023-07-17-16-57-09-669.png
>
>
> Currently setting the Spark configuration using the statement 
> `spark.conf.set("spark.sql.caseSensitive", "true")` and subsequently 
> executing a Spark action in a separate thread. An intermittent observation 
> reveals that occasionally the configuration set in the main thread fails to 
> propagate to the child thread.
> !image-2023-07-17-16-54-26-372.png!
> !image-2023-07-17-16-54-36-860.png!
>  
> This is limitation due to ThreadLocal not being copied to child thread when 
> child thread is reused from pool and only done instead in thread creation or 
> when idle thread has expired KeepAlive in Executor
> Refer:  
> [https://users.scala-lang.org/t/future-executioncontext-and-threadlocal/7675/2]
>  
> [https://www.stevenskelton.ca/threadlocal-variables-scala-futures/] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-44458) ThreadLocal not being copied to child thread when child thread is reused from pool

2023-07-17 Thread Kapil Singh (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44458?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kapil Singh updated SPARK-44458:

Attachment: image-2023-07-17-16-57-09-669.png

> ThreadLocal not being copied to child thread when child thread is reused from 
> pool 
> ---
>
> Key: SPARK-44458
> URL: https://issues.apache.org/jira/browse/SPARK-44458
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.4.1
>Reporter: Kapil Singh
>Priority: Major
> Attachments: image-2023-07-17-16-56-59-388.png, 
> image-2023-07-17-16-57-09-669.png
>
>
> Currently setting the Spark configuration using the statement 
> `spark.conf.set("spark.sql.caseSensitive", "true")` and subsequently 
> executing a Spark action in a separate thread. An intermittent observation 
> reveals that occasionally the configuration set in the main thread fails to 
> propagate to the child thread.
> !image-2023-07-17-16-54-26-372.png!
> !image-2023-07-17-16-54-36-860.png!
>  
> This is limitation due to ThreadLocal not being copied to child thread when 
> child thread is reused from pool and only done instead in thread creation or 
> when idle thread has expired KeepAlive in Executor
> Refer:  
> [https://users.scala-lang.org/t/future-executioncontext-and-threadlocal/7675/2]
>  
> [https://www.stevenskelton.ca/threadlocal-variables-scala-futures/] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-44458) ThreadLocal not being copied to child thread when child thread is reused from pool

2023-07-17 Thread Kapil Singh (Jira)
Kapil Singh created SPARK-44458:
---

 Summary: ThreadLocal not being copied to child thread when child 
thread is reused from pool 
 Key: SPARK-44458
 URL: https://issues.apache.org/jira/browse/SPARK-44458
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 3.4.1
Reporter: Kapil Singh


Currently setting the Spark configuration using the statement 
`spark.conf.set("spark.sql.caseSensitive", "true")` and subsequently executing 
a Spark action in a separate thread. An intermittent observation reveals that 
occasionally the configuration set in the main thread fails to propagate to the 
child thread.

!image-2023-07-17-16-54-26-372.png!

!image-2023-07-17-16-54-36-860.png!

 

This is limitation due to ThreadLocal not being copied to child thread when 
child thread is reused from pool and only done instead in thread creation or 
when idle thread has expired KeepAlive in Executor

Refer:  

[https://users.scala-lang.org/t/future-executioncontext-and-threadlocal/7675/2] 

[https://www.stevenskelton.ca/threadlocal-variables-scala-futures/] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-44077) Session Configs were not getting honored in RDDs

2023-06-15 Thread Kapil Singh (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44077?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kapil Singh updated SPARK-44077:

Description: When calling SQLConf.get on executors, the configs are read 
from the local properties on the TaskContext. The local properties are 
populated driver-side when scheduling the job, using the properties found in 
sparkContext.localProperties. For RDD actions, local properties were not 
getting populated.

> Session Configs were not getting honored in RDDs
> 
>
> Key: SPARK-44077
> URL: https://issues.apache.org/jira/browse/SPARK-44077
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.4.0
>Reporter: Kapil Singh
>Priority: Major
>
> When calling SQLConf.get on executors, the configs are read from the local 
> properties on the TaskContext. The local properties are populated driver-side 
> when scheduling the job, using the properties found in 
> sparkContext.localProperties. For RDD actions, local properties were not 
> getting populated.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-44077) Session Configs were not getting honored in RDDs

2023-06-15 Thread Kapil Singh (Jira)
Kapil Singh created SPARK-44077:
---

 Summary: Session Configs were not getting honored in RDDs
 Key: SPARK-44077
 URL: https://issues.apache.org/jira/browse/SPARK-44077
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 3.4.0
Reporter: Kapil Singh






--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-42660) Infer filters for Join produced by IN and EXISTS clause (RewritePredicateSubquery rule)

2023-03-02 Thread Kapil Singh (Jira)
Kapil Singh created SPARK-42660:
---

 Summary: Infer filters for Join produced by IN and EXISTS clause 
(RewritePredicateSubquery rule)
 Key: SPARK-42660
 URL: https://issues.apache.org/jira/browse/SPARK-42660
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.4.1
Reporter: Kapil Singh






--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-42598) Refactor TPCH schema to separate file similar to TPCDS for code reuse

2023-02-27 Thread Kapil Singh (Jira)
Kapil Singh created SPARK-42598:
---

 Summary: Refactor TPCH schema to separate file similar to TPCDS 
for code reuse
 Key: SPARK-42598
 URL: https://issues.apache.org/jira/browse/SPARK-42598
 Project: Spark
  Issue Type: Test
  Components: SQL
Affects Versions: 3.5.0
Reporter: Kapil Singh






--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-40558) Add Reusable Exchange in Bloom creation side plan

2022-09-26 Thread Kapil Singh (Jira)
Kapil Singh created SPARK-40558:
---

 Summary: Add Reusable Exchange in Bloom creation side plan
 Key: SPARK-40558
 URL: https://issues.apache.org/jira/browse/SPARK-40558
 Project: Spark
  Issue Type: Task
  Components: SQL
Affects Versions: 3.4.0
Reporter: Kapil Singh






--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-40231) Add 1TB TPCDS Plan stability tests

2022-08-26 Thread Kapil Singh (Jira)
Kapil Singh created SPARK-40231:
---

 Summary: Add 1TB TPCDS Plan stability tests
 Key: SPARK-40231
 URL: https://issues.apache.org/jira/browse/SPARK-40231
 Project: Spark
  Issue Type: Task
  Components: SQL
Affects Versions: 3.4.0
Reporter: Kapil Singh






--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-40124) Update TPCDS v1.4 q32 for Plan Stability tests

2022-08-17 Thread Kapil Singh (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-40124?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kapil Singh updated SPARK-40124:

Summary: Update TPCDS v1.4 q32 for Plan Stability tests  (was: Update TPCDS 
v1.4 query32)

> Update TPCDS v1.4 q32 for Plan Stability tests
> --
>
> Key: SPARK-40124
> URL: https://issues.apache.org/jira/browse/SPARK-40124
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: Kapil Singh
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-40124) Update TPCDS v1.4 query32

2022-08-17 Thread Kapil Singh (Jira)
Kapil Singh created SPARK-40124:
---

 Summary: Update TPCDS v1.4 query32
 Key: SPARK-40124
 URL: https://issues.apache.org/jira/browse/SPARK-40124
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.3.0
Reporter: Kapil Singh






--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-39690) Reuse exchange across subqueries is broken with AQE if subquery side exchange materialized first

2022-07-06 Thread Kapil Singh (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-39690?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kapil Singh updated SPARK-39690:

Description: 
When trying to reuse Exchange of a subquery in main plan, if the Exchange 
inside subquery materialize first then main ASPE node won't have that stage 
info (in 
[stageToReplace|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala#L243])
 to replace in current logical plan. This will cause AQE to produce new 
candidate physical plan without reusing the exchange present inside subquery. 
And depending on how complex the inner plan is (no. of exchanges) AQE could 
choose plan without ReusedExchange. 

We have seen with multiple queries with our private build. This can happen in 
DPP also.

  was:
When trying to reuse Exchange of a subquery in main plan, if the Exchange 
inside subquery materialize first then main ASPE node won't have that stage 
info (in 
[stageToReplace|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala#L243])
 to replace in current logical plan. This will cause AQE to produce new 
candidate physical plan without reusing the exchange present inside subquery. 
And depending on how complex the inner plan is (no. of exchanges) AQE could 
choose plan without ReusedExchange. 

We have seen in with multiple queries with our private build. This can happen 
in DPP also.


> Reuse exchange across subqueries is broken with AQE if subquery side exchange 
> materialized first
> 
>
> Key: SPARK-39690
> URL: https://issues.apache.org/jira/browse/SPARK-39690
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: Kapil Singh
>Priority: Major
>
> When trying to reuse Exchange of a subquery in main plan, if the Exchange 
> inside subquery materialize first then main ASPE node won't have that stage 
> info (in 
> [stageToReplace|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala#L243])
>  to replace in current logical plan. This will cause AQE to produce new 
> candidate physical plan without reusing the exchange present inside subquery. 
> And depending on how complex the inner plan is (no. of exchanges) AQE could 
> choose plan without ReusedExchange. 
> We have seen with multiple queries with our private build. This can happen in 
> DPP also.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-39690) Reuse exchange across subqueries is broken with AQE if subquery side exchange materialized first

2022-07-05 Thread Kapil Singh (Jira)
Kapil Singh created SPARK-39690:
---

 Summary: Reuse exchange across subqueries is broken with AQE if 
subquery side exchange materialized first
 Key: SPARK-39690
 URL: https://issues.apache.org/jira/browse/SPARK-39690
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.3.0
Reporter: Kapil Singh


When trying to reuse Exchange of a subquery in main plan, if the Exchange 
inside subquery materialize first then main ASPE node won't have that stage 
info (in 
[stageToReplace|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala#L243])
 to replace in current logical plan. This will cause AQE to produce new 
candidate physical plan without reusing the exchange present inside subquery. 
And depending on how complex the inner plan is (no. of exchanges) AQE could 
choose plan without ReusedExchange. 

We have seen in with multiple queries with our private build. This can happen 
in DPP also.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-37995) TPCDS 1TB q72 fails when spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcastOnly is false

2022-01-31 Thread Kapil Singh (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-37995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17485039#comment-17485039
 ] 

Kapil Singh commented on SPARK-37995:
-

Hey folks, any input on this issue? [~hyukjin.kwon] [~maryannxue] [~cloud_fan] 

> TPCDS 1TB q72 fails when 
> spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcastOnly is false
> 
>
> Key: SPARK-37995
> URL: https://issues.apache.org/jira/browse/SPARK-37995
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.2.0
>Reporter: Kapil Singh
>Priority: Major
> Attachments: full-stacktrace.txt
>
>
> TPCDS 1TB q72 fails in 3.2 Spark when 
> spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcastOnly is false. We 
> have been running with this config in 3.1 as well and it worked fine in that 
> version. This used to add a subquery dpp in q72.
> Relevant stack trace
> {code:java}
> rror: java.lang.ClassCastException: 
> org.apache.spark.sql.catalyst.plans.logical.Project cannot be cast to 
> org.apache.spark.sql.execution.SparkPlan  at 
> scala.collection.immutable.List.map(List.scala:293)  at 
> org.apache.spark.sql.execution.SparkPlanInfo$.fromSparkPlan(SparkPlanInfo.scala:75)
>   at 
> org.apache.spark.sql.execution.SparkPlanInfo$.$anonfun$fromSparkPlan$3(SparkPlanInfo.scala:75)
>   at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
> 
> 
> at 
> org.apache.spark.sql.execution.SparkPlanInfo$.fromSparkPlan(SparkPlanInfo.scala:75)
>   at 
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.onUpdatePlan(AdaptiveSparkPlanExec.scala:708)
>   at 
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$2(AdaptiveSparkPlanExec.scala:239)
>   at scala.runtime.java8.JFunction1$mcVJ$sp.apply(JFunction1$mcVJ$sp.java:23) 
>  at scala.Option.foreach(Option.scala:407)  at 
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$1(AdaptiveSparkPlanExec.scala:239)
>   at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)  at 
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:226)
>   at 
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:365)
>   at 
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:338)
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-37995) TPCDS 1TB q72 fails when spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcastOnly is false

2022-01-24 Thread Kapil Singh (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-37995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17481561#comment-17481561
 ] 

Kapil Singh edited comment on SPARK-37995 at 1/25/22, 6:12 AM:
---

Seems it is related to this part of 
[PlanAdaptiveDynamicPruningFilters:79|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanAdaptiveDynamicPruningFilters.scala#L79]

 
{code:java}
// Here we can't call the QueryExecution.prepareExecutedPlan() method to
// get the sparkPlan as Non-AQE use case, which will cause the physical
// plan optimization rules be inserted twice, once in AQE framework and
// another in prepareExecutedPlan() method.
val sparkPlan = QueryExecution.createSparkPlan(session, planner, aggregate) 
{code}
In non-AQE *preparedExecutedPlan* also used to plan subquery inside current 
dynamic pruning expression but the *createSparkPlan* does not. This leaves the 
inner subquery in logical phase thus cast exception later.

 


was (Author: kapilks_ms):
Seems it is related to this part of 
[PlanAdaptiveDynamicPruningFilters:79|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanAdaptiveDynamicPruningFilters.scala#L79]

 
{code:java}
// Here we can't call the QueryExecution.prepareExecutedPlan() method to
// get the sparkPlan as Non-AQE use case, which will cause the physical
// plan optimization rules be inserted twice, once in AQE framework and
// another in prepareExecutedPlan() method.
val sparkPlan = QueryExecution.createSparkPlan(session, planner, aggregate) 
{code}
In non-AQE *preparedExecutedPlan* ** also used to plan subquery inside current 
dynamic pruning expression but the *createSparkPlan* does not. This leaves the 
inner subquery in logical phase thus cast exception later.

 

> TPCDS 1TB q72 fails when 
> spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcastOnly is false
> 
>
> Key: SPARK-37995
> URL: https://issues.apache.org/jira/browse/SPARK-37995
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.2.0
>Reporter: Kapil Singh
>Priority: Major
> Attachments: full-stacktrace.txt
>
>
> TPCDS 1TB q72 fails in 3.2 Spark when 
> spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcastOnly is false. We 
> have been running with this config in 3.1 as well and it worked fine in that 
> version. This used to add a subquery dpp in q72.
> Relevant stack trace
> {code:java}
> rror: java.lang.ClassCastException: 
> org.apache.spark.sql.catalyst.plans.logical.Project cannot be cast to 
> org.apache.spark.sql.execution.SparkPlan  at 
> scala.collection.immutable.List.map(List.scala:293)  at 
> org.apache.spark.sql.execution.SparkPlanInfo$.fromSparkPlan(SparkPlanInfo.scala:75)
>   at 
> org.apache.spark.sql.execution.SparkPlanInfo$.$anonfun$fromSparkPlan$3(SparkPlanInfo.scala:75)
>   at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
> 
> 
> at 
> org.apache.spark.sql.execution.SparkPlanInfo$.fromSparkPlan(SparkPlanInfo.scala:75)
>   at 
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.onUpdatePlan(AdaptiveSparkPlanExec.scala:708)
>   at 
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$2(AdaptiveSparkPlanExec.scala:239)
>   at scala.runtime.java8.JFunction1$mcVJ$sp.apply(JFunction1$mcVJ$sp.java:23) 
>  at scala.Option.foreach(Option.scala:407)  at 
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$1(AdaptiveSparkPlanExec.scala:239)
>   at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)  at 
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:226)
>   at 
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:365)
>   at 
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:338)
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-37995) TPCDS 1TB q72 fails when spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcastOnly is false

2022-01-24 Thread Kapil Singh (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-37995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17481561#comment-17481561
 ] 

Kapil Singh commented on SPARK-37995:
-

Seems it is related to this part of 
[PlanAdaptiveDynamicPruningFilters:79|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanAdaptiveDynamicPruningFilters.scala#L79]

 
{code:java}
// Here we can't call the QueryExecution.prepareExecutedPlan() method to
// get the sparkPlan as Non-AQE use case, which will cause the physical
// plan optimization rules be inserted twice, once in AQE framework and
// another in prepareExecutedPlan() method.
val sparkPlan = QueryExecution.createSparkPlan(session, planner, aggregate) 
{code}
In non-AQE *preparedExecutedPlan* ** also used to plan subquery inside current 
dynamic pruning expression but the *createSparkPlan* does not. This leaves the 
inner subquery in logical phase thus cast exception later.

 

> TPCDS 1TB q72 fails when 
> spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcastOnly is false
> 
>
> Key: SPARK-37995
> URL: https://issues.apache.org/jira/browse/SPARK-37995
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.2.0
>Reporter: Kapil Singh
>Priority: Major
> Attachments: full-stacktrace.txt
>
>
> TPCDS 1TB q72 fails in 3.2 Spark when 
> spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcastOnly is false. We 
> have been running with this config in 3.1 as well and it worked fine in that 
> version. This used to add a subquery dpp in q72.
> Relevant stack trace
> {code:java}
> rror: java.lang.ClassCastException: 
> org.apache.spark.sql.catalyst.plans.logical.Project cannot be cast to 
> org.apache.spark.sql.execution.SparkPlan  at 
> scala.collection.immutable.List.map(List.scala:293)  at 
> org.apache.spark.sql.execution.SparkPlanInfo$.fromSparkPlan(SparkPlanInfo.scala:75)
>   at 
> org.apache.spark.sql.execution.SparkPlanInfo$.$anonfun$fromSparkPlan$3(SparkPlanInfo.scala:75)
>   at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
> 
> 
> at 
> org.apache.spark.sql.execution.SparkPlanInfo$.fromSparkPlan(SparkPlanInfo.scala:75)
>   at 
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.onUpdatePlan(AdaptiveSparkPlanExec.scala:708)
>   at 
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$2(AdaptiveSparkPlanExec.scala:239)
>   at scala.runtime.java8.JFunction1$mcVJ$sp.apply(JFunction1$mcVJ$sp.java:23) 
>  at scala.Option.foreach(Option.scala:407)  at 
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$1(AdaptiveSparkPlanExec.scala:239)
>   at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)  at 
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:226)
>   at 
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:365)
>   at 
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:338)
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-37995) TPCDS 1TB q72 fails when spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcastOnly is false

2022-01-24 Thread Kapil Singh (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-37995?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kapil Singh updated SPARK-37995:

Attachment: full-stacktrace.txt

> TPCDS 1TB q72 fails when 
> spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcastOnly is false
> 
>
> Key: SPARK-37995
> URL: https://issues.apache.org/jira/browse/SPARK-37995
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.2.0
>Reporter: Kapil Singh
>Priority: Major
> Attachments: full-stacktrace.txt
>
>
> TPCDS 1TB q72 fails in 3.2 Spark when 
> spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcastOnly is false. We 
> have been running with this config in 3.1 as well and it worked fine in that 
> version. This used to add a subquery dpp in q72.
> Relevant stack trace
> {code:java}
> rror: java.lang.ClassCastException: 
> org.apache.spark.sql.catalyst.plans.logical.Project cannot be cast to 
> org.apache.spark.sql.execution.SparkPlan  at 
> scala.collection.immutable.List.map(List.scala:293)  at 
> org.apache.spark.sql.execution.SparkPlanInfo$.fromSparkPlan(SparkPlanInfo.scala:75)
>   at 
> org.apache.spark.sql.execution.SparkPlanInfo$.$anonfun$fromSparkPlan$3(SparkPlanInfo.scala:75)
>   at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
> 
> 
> at 
> org.apache.spark.sql.execution.SparkPlanInfo$.fromSparkPlan(SparkPlanInfo.scala:75)
>   at 
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.onUpdatePlan(AdaptiveSparkPlanExec.scala:708)
>   at 
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$2(AdaptiveSparkPlanExec.scala:239)
>   at scala.runtime.java8.JFunction1$mcVJ$sp.apply(JFunction1$mcVJ$sp.java:23) 
>  at scala.Option.foreach(Option.scala:407)  at 
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$1(AdaptiveSparkPlanExec.scala:239)
>   at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)  at 
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:226)
>   at 
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:365)
>   at 
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:338)
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-37995) TPCDS 1TB q72 fails when spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcastOnly is false

2022-01-24 Thread Kapil Singh (Jira)
Kapil Singh created SPARK-37995:
---

 Summary: TPCDS 1TB q72 fails when 
spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcastOnly is false
 Key: SPARK-37995
 URL: https://issues.apache.org/jira/browse/SPARK-37995
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 3.2.0
Reporter: Kapil Singh


TPCDS 1TB q72 fails in 3.2 Spark when 
spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcastOnly is false. We 
have been running with this config in 3.1 as well and it worked fine in that 
version. This used to add a subquery dpp in q72.

Relevant stack trace
{code:java}
rror: java.lang.ClassCastException: 
org.apache.spark.sql.catalyst.plans.logical.Project cannot be cast to 
org.apache.spark.sql.execution.SparkPlan  at 
scala.collection.immutable.List.map(List.scala:293)  at 
org.apache.spark.sql.execution.SparkPlanInfo$.fromSparkPlan(SparkPlanInfo.scala:75)
  at 
org.apache.spark.sql.execution.SparkPlanInfo$.$anonfun$fromSparkPlan$3(SparkPlanInfo.scala:75)
  at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)


at 
org.apache.spark.sql.execution.SparkPlanInfo$.fromSparkPlan(SparkPlanInfo.scala:75)
  at 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.onUpdatePlan(AdaptiveSparkPlanExec.scala:708)
  at 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$2(AdaptiveSparkPlanExec.scala:239)
  at scala.runtime.java8.JFunction1$mcVJ$sp.apply(JFunction1$mcVJ$sp.java:23)  
at scala.Option.foreach(Option.scala:407)  at 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$1(AdaptiveSparkPlanExec.scala:239)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)  at 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:226)
  at 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:365)
  at 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:338)
 {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-16894) take function for returning the first n elements of array column

2016-11-10 Thread Kapil Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16894?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15653494#comment-15653494
 ] 

Kapil Singh commented on SPARK-16894:
-

How is it different from array_contains? Why is array_contains a built-in 
function instead of a UDF?

> take function for returning the first n elements of array column
> 
>
> Key: SPARK-16894
> URL: https://issues.apache.org/jira/browse/SPARK-16894
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Reporter: Kapil Singh
>
> take(inputArray, n)
> Returns array containing first n elements of inputArray



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-16892) flatten function to get flat array (or map) column from array of array (or array of map) column

2016-11-07 Thread Kapil Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15646584#comment-15646584
 ] 

Kapil Singh commented on SPARK-16892:
-

It's not for flattening Rows. It's for flattening columns. The columns 
themselves can be of array of array or array of map types. How would you 
flatten them to obtain columns of array and map types respectively? Also this 
is for DataFrame expressions/functions.

> flatten function to get flat array (or map) column from array of array (or 
> array of map) column
> ---
>
> Key: SPARK-16892
> URL: https://issues.apache.org/jira/browse/SPARK-16892
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Reporter: Kapil Singh
>
> flatten(input)
> Converts input of array of array type into flat array type by inserting 
> elements of all element arrays into single array. Example:
> input: [[1, 2, 3], [4, 5], [-1, -2, 0]]
> output: [1, 2, 3, 4, 5, -1, -2, 0]
> Converts input of array of map type into flat map type by inserting key-value 
> pairs of all element maps into single map. Example:
> input: [(1 -> "one", 2 -> "two"), (0 -> "zero"), (4 -> "four")]
> output: (1 -> "one", 2 -> "two", 0 -> "zero", 4 -> "four")



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-16894) take function for returning the first n elements of array column

2016-11-07 Thread Kapil Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16894?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15646576#comment-15646576
 ] 

Kapil Singh commented on SPARK-16894:
-

This is not about selecting first n elements/columns from a Row. It's for 
selecting first n elements of an array type column. So for every record/Row the 
input column has some m elements but the result column has only first n 
elements of the input column. This operation is similar to scala collection's 
take operation. The scope of the operation is cell values and not Row.

> take function for returning the first n elements of array column
> 
>
> Key: SPARK-16894
> URL: https://issues.apache.org/jira/browse/SPARK-16894
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Reporter: Kapil Singh
>
> take(inputArray, n)
> Returns array containing first n elements of inputArray



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-16894) take function for returning the first n elements of array column

2016-11-07 Thread Kapil Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16894?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15646563#comment-15646563
 ] 

Kapil Singh commented on SPARK-16894:
-

The use case is similar to scala collection's take method. So, for example, one 
of the input columns is an array containing product category hierarchy e.g. 
[apparel, men, t-shirt, printed, ...] and I'm only interested in first n (say 
3) categories. I want a function/expression on DataFrame so that I can get an 
output column containing only first 3 categories e.g. [apparel, men, t-shirt] 

> take function for returning the first n elements of array column
> 
>
> Key: SPARK-16894
> URL: https://issues.apache.org/jira/browse/SPARK-16894
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Reporter: Kapil Singh
>
> take(inputArray, n)
> Returns array containing first n elements of inputArray



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18091) Deep if expressions cause Generated SpecificUnsafeProjection code to exceed JVM code size limit

2016-10-25 Thread Kapil Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15604587#comment-15604587
 ] 

Kapil Singh commented on SPARK-18091:
-

I've started working on this

> Deep if expressions cause Generated SpecificUnsafeProjection code to exceed 
> JVM code size limit
> ---
>
> Key: SPARK-18091
> URL: https://issues.apache.org/jira/browse/SPARK-18091
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.1
>Reporter: Kapil Singh
>Priority: Critical
>
> *Problem Description:*
> I have an application in which a lot of if-else decisioning is involved to 
> generate output. I'm getting following exception:
> Caused by: org.codehaus.janino.JaninoRuntimeException: Code of method 
> "(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass$SpecificUnsafeProjection;Lorg/apache/spark/sql/catalyst/InternalRow;)V"
>  of class 
> "org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection"
>  grows beyond 64 KB
>   at org.codehaus.janino.CodeContext.makeSpace(CodeContext.java:941)
>   at org.codehaus.janino.CodeContext.write(CodeContext.java:874)
>   at org.codehaus.janino.CodeContext.writeBranch(CodeContext.java:965)
>   at org.codehaus.janino.UnitCompiler.writeBranch(UnitCompiler.java:10261)
> *Steps to Reproduce:*
> I've come up with a unit test which I was able to run in 
> CodeGenerationSuite.scala:
> {code}
> test("split large if expressions into blocks due to JVM code size limit") {
> val row = 
> create_row("afafFAFFsqcategory2dadDADcategory8sasasadscategory24", 0)
> val inputStr = 'a.string.at(0)
> val inputIdx = 'a.int.at(1)
> val length = 10
> val valuesToCompareTo = for (i <- 1 to (length + 1)) yield ("category" + 
> i)
> val initCondition = EqualTo(RegExpExtract(inputStr, Literal("category1"), 
> inputIdx), valuesToCompareTo(0))
> var res: Expression = If(initCondition, Literal("category1"), 
> Literal("NULL"))
> var cummulativeCondition: Expression = Not(initCondition)
> for (index <- 1 to length) {
>   val valueExtractedFromInput = RegExpExtract(inputStr, 
> Literal("category" + (index + 1).toString), inputIdx)
>   val currComparee = If(cummulativeCondition, valueExtractedFromInput, 
> Literal("NULL"))
>   val currCondition = EqualTo(currComparee, valuesToCompareTo(index))
>   val combinedCond = And(cummulativeCondition, currCondition)
>   res = If(combinedCond, If(combinedCond, valueExtractedFromInput, 
> Literal("NULL")), res)
>   cummulativeCondition = And(Not(currCondition), cummulativeCondition)
> }
> val expressions = Seq(res)
> val plan = GenerateUnsafeProjection.generate(expressions, true)
> val actual = plan(row).toSeq(expressions.map(_.dataType))
> val expected = Seq(UTF8String.fromString("category2"))
> if (!checkResult(actual, expected)) {
>   fail(s"Incorrect Evaluation: expressions: $expressions, actual: 
> $actual, expected: $expected")
> }
>   }
> {code}
> *Root Cause:*
> Current splitting of Projection codes doesn't (and can't) take care of 
> splitting the generated code for individual output column expressions. So it 
> can grow to exceed JVM limit.
> *Note:* This issue seems related to SPARK-14887 but I'm not sure whether the 
> root cause is same
>  
> *Proposed Fix:*
> If expression should place it's predicate, true value and false value 
> expressions' generated code in separate methods in context and call these 
> methods instead of putting the whole code directly in its generated code



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-18091) Deep if expressions cause Generated SpecificUnsafeProjection code to exceed JVM code size limit

2016-10-25 Thread Kapil Singh (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18091?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kapil Singh updated SPARK-18091:

Description: 
*Problem Description:*
I have an application in which a lot of if-else decisioning is involved to 
generate output. I'm getting following exception:
Caused by: org.codehaus.janino.JaninoRuntimeException: Code of method 
"(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass$SpecificUnsafeProjection;Lorg/apache/spark/sql/catalyst/InternalRow;)V"
 of class 
"org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection"
 grows beyond 64 KB
at org.codehaus.janino.CodeContext.makeSpace(CodeContext.java:941)
at org.codehaus.janino.CodeContext.write(CodeContext.java:874)
at org.codehaus.janino.CodeContext.writeBranch(CodeContext.java:965)
at org.codehaus.janino.UnitCompiler.writeBranch(UnitCompiler.java:10261)

*Steps to Reproduce:*
I've come up with a unit test which I was able to run in 
CodeGenerationSuite.scala:
{code}
test("split large if expressions into blocks due to JVM code size limit") {
val row = 
create_row("afafFAFFsqcategory2dadDADcategory8sasasadscategory24", 0)
val inputStr = 'a.string.at(0)
val inputIdx = 'a.int.at(1)

val length = 10
val valuesToCompareTo = for (i <- 1 to (length + 1)) yield ("category" + i)

val initCondition = EqualTo(RegExpExtract(inputStr, Literal("category1"), 
inputIdx), valuesToCompareTo(0))
var res: Expression = If(initCondition, Literal("category1"), 
Literal("NULL"))
var cummulativeCondition: Expression = Not(initCondition)
for (index <- 1 to length) {
  val valueExtractedFromInput = RegExpExtract(inputStr, Literal("category" 
+ (index + 1).toString), inputIdx)
  val currComparee = If(cummulativeCondition, valueExtractedFromInput, 
Literal("NULL"))
  val currCondition = EqualTo(currComparee, valuesToCompareTo(index))
  val combinedCond = And(cummulativeCondition, currCondition)
  res = If(combinedCond, If(combinedCond, valueExtractedFromInput, 
Literal("NULL")), res)
  cummulativeCondition = And(Not(currCondition), cummulativeCondition)
}

val expressions = Seq(res)
val plan = GenerateUnsafeProjection.generate(expressions, true)
val actual = plan(row).toSeq(expressions.map(_.dataType))
val expected = Seq(UTF8String.fromString("category2"))

if (!checkResult(actual, expected)) {
  fail(s"Incorrect Evaluation: expressions: $expressions, actual: $actual, 
expected: $expected")
}
  }
{code}

*Root Cause:*
Current splitting of Projection codes doesn't (and can't) take care of 
splitting the generated code for individual output column expressions. So it 
can grow to exceed JVM limit.

*Note:* This issue seems related to SPARK-14887 but I'm not sure whether the 
root cause is same
 
*Proposed Fix:*
If expression should place it's predicate, true value and false value 
expressions' generated code in separate methods in context and call these 
methods instead of putting the whole code directly in its generated code

  was:
*Problem Description:*
I have an application in which a lot of if-else decisioning is involved to 
generate output. I'm getting following exception:
Caused by: org.codehaus.janino.JaninoRuntimeException: Code of method 
"(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass$SpecificUnsafeProjection;Lorg/apache/spark/sql/catalyst/InternalRow;)V"
 of class 
"org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection"
 grows beyond 64 KB
at org.codehaus.janino.CodeContext.makeSpace(CodeContext.java:941)
at org.codehaus.janino.CodeContext.write(CodeContext.java:874)
at org.codehaus.janino.CodeContext.writeBranch(CodeContext.java:965)
at org.codehaus.janino.UnitCompiler.writeBranch(UnitCompiler.java:10261)

*Steps to Reproduce:*
I've come up with a unit test which I was able to run in 
CodeGenerationSuite.scala:
{code}
test("split large if expressions into blocks due to JVM code size limit") {
val row = 
create_row("afafFAFFsqcategory2dadDADcategory8sasasadscategory24", 0)
val inputStr = 'a.string.at(0)
val inputIdx = 'a.int.at(1)

val length = 10
val valuesToCompareTo = for (i <- 1 to (length + 1)) yield ("category" + i)

val initCondition = EqualTo(RegExpExtract(inputStr, Literal("category1"), 
inputIdx), valuesToCompareTo(0))
var res: Expression = If(initCondition, Literal("category1"), 
Literal("NULL"))
var cummulativeCondition: Expression = Not(initCondition)
for (index <- 1 to length) {
  val valueExtractedFromInput = RegExpExtract(inputStr, Literal("category" 
+ (index + 1).toString), inputIdx)
  val currComparee = If(cummulativeCondition, valueExtractedFromInput, 
Literal("NULL"))
  val currCondition = EqualTo(currComparee, valuesToCompareTo(index))
  val combinedCond = 

[jira] [Created] (SPARK-18091) Deep if expressions cause Generated SpecificUnsafeProjection code to exceed JVM code size limit

2016-10-25 Thread Kapil Singh (JIRA)
Kapil Singh created SPARK-18091:
---

 Summary: Deep if expressions cause Generated 
SpecificUnsafeProjection code to exceed JVM code size limit
 Key: SPARK-18091
 URL: https://issues.apache.org/jira/browse/SPARK-18091
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 1.6.1
Reporter: Kapil Singh
Priority: Critical


*Problem Description:*
I have an application in which a lot of if-else decisioning is involved to 
generate output. I'm getting following exception:
Caused by: org.codehaus.janino.JaninoRuntimeException: Code of method 
"(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass$SpecificUnsafeProjection;Lorg/apache/spark/sql/catalyst/InternalRow;)V"
 of class 
"org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection"
 grows beyond 64 KB
at org.codehaus.janino.CodeContext.makeSpace(CodeContext.java:941)
at org.codehaus.janino.CodeContext.write(CodeContext.java:874)
at org.codehaus.janino.CodeContext.writeBranch(CodeContext.java:965)
at org.codehaus.janino.UnitCompiler.writeBranch(UnitCompiler.java:10261)

*Steps to Reproduce:*
I've come up with a unit test which I was able to run in 
CodeGenerationSuite.scala:
{code}
test("split large if expressions into blocks due to JVM code size limit") {
val row = 
create_row("afafFAFFsqcategory2dadDADcategory8sasasadscategory24", 0)
val inputStr = 'a.string.at(0)
val inputIdx = 'a.int.at(1)

val length = 10
val valuesToCompareTo = for (i <- 1 to (length + 1)) yield ("category" + i)

val initCondition = EqualTo(RegExpExtract(inputStr, Literal("category1"), 
inputIdx), valuesToCompareTo(0))
var res: Expression = If(initCondition, Literal("category1"), 
Literal("NULL"))
var cummulativeCondition: Expression = Not(initCondition)
for (index <- 1 to length) {
  val valueExtractedFromInput = RegExpExtract(inputStr, Literal("category" 
+ (index + 1).toString), inputIdx)
  val currComparee = If(cummulativeCondition, valueExtractedFromInput, 
Literal("NULL"))
  val currCondition = EqualTo(currComparee, valuesToCompareTo(index))
  val combinedCond = And(cummulativeCondition, currCondition)
  res = If(combinedCond, If(combinedCond, valueExtractedFromInput, 
Literal("NULL")), res)
  cummulativeCondition = And(Not(currCondition), cummulativeCondition)
}

val expressions = Seq(res)
val plan = GenerateUnsafeProjection.generate(expressions, true)
val actual = plan(row).toSeq(expressions.map(_.dataType))
val expected = Seq(UTF8String.fromString("category2"))

if (!checkResult(actual, expected)) {
  fail(s"Incorrect Evaluation: expressions: $expressions, actual: $actual, 
expected: $expected")
}
  }
{code}

*Root Cause:*
Current splitting of Projection codes doesn't (and can't) take care of 
splitting the generated code for individual output column expressions. So it 
can grow to exceed JVM limit.

*Proposed Fix:*
If expression should place it's predicate, true value and false value 
expressions' generated code in separate methods in context and call these 
methods instead of putting the whole code directly in its generated code



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-18090) NegativeArraySize exception while reading parquet when inferred type and provided type for partition column are different

2016-10-25 Thread Kapil Singh (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18090?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kapil Singh updated SPARK-18090:

Description: 
*Problem Description:*
Reading a small parquet file (single column, single record), with provided 
schema (StructType(Seq(StructField("field1",StringType,true), 
StructField("hour",StringType,true),StructField("batch",StringType,true and 
with spark.sql.sources.partitionColumnTypeInference.enabled not set (i.e. 
defaulting to true) from a path like 
"/hour=2016072313/batch=720b044894e14dcea63829bb4686c7e3" gives 
following exception:
java.lang.NegativeArraySizeException
at 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder.grow(BufferHolder.java:45)
at 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(UnsafeRowWriter.java:196)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
 Source)
at 
org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$17$$anonfun$apply$8.apply(DataSourceStrategy.scala:239)
at 
org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$17$$anonfun$apply$8.apply(DataSourceStrategy.scala:238)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
which is completely wrong behavior.

*Steps to Reproduce:*
Run following commands from Spark shell (after updating paths):
{code}
val df = sc.parallelize(Seq(("one", "2016072313", 
"720b044894e14dcea63829bb4686c7e3"))).toDF("field1", "hour", "batch")
df.write.partitionBy("hour", 
"batch").parquet("/home//SmallParquetForTest")
import org.apache.spark.sql.types._
val schema = StructType(Seq(StructField("field1",StringType,true), 
StructField("hour",StringType,true),StructField("batch",StringType,true)))
val dfRead = 
sqlContext.read.schema(sparkSchema).parquet("file:///home//SmallParquetForTest")
dfRead.show()
{code}

*Root Cause:*
I did some analysis by debugging this in Spark and found out that the partition 
Projection uses inferred schema and generates a row with "hour" as integer. 
Later on final projection uses provided schema and reads "hour" as string from 
the row generated by partition projection. While reading "hour" as string, it's 
integer value 2016072313 is interpreted as size of the string to be read which 
causes byte buffer size overflow.

*Expected Behavior:*
Either there should be an error saying inferred type and provided type for 
partition columns do not match or provided type should be used while generating 
partition projection.

  was:
*Problem Description:*
Reading a small parquet file (single column, single record), with provided 
schema (StructType(Seq(StructField("field1",StringType,true), 
StructField("hour",StringType,true),StructField("batch",StringType,true and 
with spark.sql.sources.partitionColumnTypeInference.enabled not set (i.e. 
defaulting to true) from a path like 
"/hour=2016072313/batch=720b044894e14dcea63829bb4686c7e3" gives 
following exception:
java.lang.NegativeArraySizeException
at 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder.grow(BufferHolder.java:45)
at 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(UnsafeRowWriter.java:196)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
 Source)
at 
org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$17$$anonfun$apply$8.apply(DataSourceStrategy.scala:239)
at 
org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$17$$anonfun$apply$8.apply(DataSourceStrategy.scala:238)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
which is completely wrong behavior.

*Steps to Reproduce:*
Run following commands from Spark shell (after updating paths):
{code:scala}
val df = sc.parallelize(Seq(("one", "2016072313", 
"720b044894e14dcea63829bb4686c7e3"))).toDF("field1", "hour", "batch")
df.write.partitionBy("hour", 
"batch").parquet("/home//SmallParquetForTest")
import org.apache.spark.sql.types._
val schema = StructType(Seq(StructField("field1",StringType,true), 
StructField("hour",StringType,true),StructField("batch",StringType,true)))
val dfRead = 
sqlContext.read.schema(sparkSchema).parquet("file:///home//SmallParquetForTest")
dfRead.show()
{code}

*Root Cause:*
I did some analysis by debugging this in Spark and found out that the partition 
Projection uses inferred schema and generates a row with "hour" as integer. 
Later on final projection uses provided schema and reads "hour" as string from 
the row generated by 

[jira] [Updated] (SPARK-18090) NegativeArraySize exception while reading parquet when inferred type and provided type for partition column are different

2016-10-25 Thread Kapil Singh (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18090?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kapil Singh updated SPARK-18090:

Description: 
*Problem Description:*
Reading a small parquet file (single column, single record), with provided 
schema (StructType(Seq(StructField("field1",StringType,true), 
StructField("hour",StringType,true),StructField("batch",StringType,true and 
with spark.sql.sources.partitionColumnTypeInference.enabled not set (i.e. 
defaulting to true) from a path like 
"/hour=2016072313/batch=720b044894e14dcea63829bb4686c7e3" gives 
following exception:
java.lang.NegativeArraySizeException
at 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder.grow(BufferHolder.java:45)
at 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(UnsafeRowWriter.java:196)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
 Source)
at 
org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$17$$anonfun$apply$8.apply(DataSourceStrategy.scala:239)
at 
org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$17$$anonfun$apply$8.apply(DataSourceStrategy.scala:238)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
which is completely wrong behavior.

*Steps to Reproduce:*
Run following commands from Spark shell (after updating paths):
{code:scala}
val df = sc.parallelize(Seq(("one", "2016072313", 
"720b044894e14dcea63829bb4686c7e3"))).toDF("field1", "hour", "batch")
df.write.partitionBy("hour", 
"batch").parquet("/home//SmallParquetForTest")
import org.apache.spark.sql.types._
val schema = StructType(Seq(StructField("field1",StringType,true), 
StructField("hour",StringType,true),StructField("batch",StringType,true)))
val dfRead = 
sqlContext.read.schema(sparkSchema).parquet("file:///home//SmallParquetForTest")
dfRead.show()
{code}

*Root Cause:*
I did some analysis by debugging this in Spark and found out that the partition 
Projection uses inferred schema and generates a row with "hour" as integer. 
Later on final projection uses provided schema and reads "hour" as string from 
the row generated by partition projection. While reading "hour" as string, it's 
integer value 2016072313 is interpreted as size of the string to be read which 
causes byte buffer size overflow.

*Expected Behavior:*
Either there should be an error saying inferred type and provided type for 
partition columns do not match or provided type should be used while generating 
partition projection.

  was:
*Problem Description:*
Reading a small parquet file (single column, single record), with provided 
schema (StructType(Seq(StructField("field1",StringType,true), 
StructField("hour",StringType,true),StructField("batch",StringType,true and 
with spark.sql.sources.partitionColumnTypeInference.enabled not set (i.e. 
defaulting to true) from a path like 
"/hour=2016072313/batch=720b044894e14dcea63829bb4686c7e3" gives 
following exception:
java.lang.NegativeArraySizeException
at 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder.grow(BufferHolder.java:45)
at 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(UnsafeRowWriter.java:196)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
 Source)
at 
org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$17$$anonfun$apply$8.apply(DataSourceStrategy.scala:239)
at 
org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$17$$anonfun$apply$8.apply(DataSourceStrategy.scala:238)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
which is completely wrong behavior.

*Steps to Reproduce:*
Run following commands from Spark shell (after updating paths):
val df = sc.parallelize(Seq(("one", "2016072313", 
"720b044894e14dcea63829bb4686c7e3"))).toDF("field1", "hour", "batch")
df.write.partitionBy("hour", 
"batch").parquet("/home//SmallParquetForTest")
import org.apache.spark.sql.types._
val schema = StructType(Seq(StructField("field1",StringType,true), 
StructField("hour",StringType,true),StructField("batch",StringType,true)))
val dfRead = 
sqlContext.read.schema(sparkSchema).parquet("file:///home//SmallParquetForTest")
dfRead.show()

*Root Cause:*
I did some analysis by debugging this in Spark and found out that the partition 
Projection uses inferred schema and generates a row with "hour" as integer. 
Later on final projection uses provided schema and reads "hour" as string from 
the row generated by partition 

[jira] [Created] (SPARK-18090) NegativeArraySize exception while reading parquet when inferred type and provided type for partition column are different

2016-10-25 Thread Kapil Singh (JIRA)
Kapil Singh created SPARK-18090:
---

 Summary: NegativeArraySize exception while reading parquet when 
inferred type and provided type for partition column are different
 Key: SPARK-18090
 URL: https://issues.apache.org/jira/browse/SPARK-18090
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 1.6.1
Reporter: Kapil Singh


*Problem Description:*
Reading a small parquet file (single column, single record), with provided 
schema (StructType(Seq(StructField("field1",StringType,true), 
StructField("hour",StringType,true),StructField("batch",StringType,true and 
with spark.sql.sources.partitionColumnTypeInference.enabled not set (i.e. 
defaulting to true) from a path like 
"/hour=2016072313/batch=720b044894e14dcea63829bb4686c7e3" gives 
following exception:
java.lang.NegativeArraySizeException
at 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder.grow(BufferHolder.java:45)
at 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(UnsafeRowWriter.java:196)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
 Source)
at 
org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$17$$anonfun$apply$8.apply(DataSourceStrategy.scala:239)
at 
org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$17$$anonfun$apply$8.apply(DataSourceStrategy.scala:238)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
which is completely wrong behavior.

*Steps to Reproduce:*
Run following commands from Spark shell (after updating paths):
val df = sc.parallelize(Seq(("one", "2016072313", 
"720b044894e14dcea63829bb4686c7e3"))).toDF("field1", "hour", "batch")
df.write.partitionBy("hour", 
"batch").parquet("/home//SmallParquetForTest")
import org.apache.spark.sql.types._
val schema = StructType(Seq(StructField("field1",StringType,true), 
StructField("hour",StringType,true),StructField("batch",StringType,true)))
val dfRead = 
sqlContext.read.schema(sparkSchema).parquet("file:///home//SmallParquetForTest")
dfRead.show()

*Root Cause:*
I did some analysis by debugging this in Spark and found out that the partition 
Projection uses inferred schema and generates a row with "hour" as integer. 
Later on final projection uses provided schema and reads "hour" as string from 
the row generated by partition projection. While reading "hour" as string, it's 
integer value 2016072313 is interpreted as size of the string to be read which 
causes byte buffer size overflow.

*Expected Behavior:*
Either there should be an error saying inferred type and provided type for 
partition columns do not match or provided type should be used while generating 
partition projection.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-16889) Add formatMessage Column expression for formatting strings in java.text.MessageFormat style in Scala API

2016-08-08 Thread Kapil Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16889?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15411808#comment-15411808
 ] 

Kapil Singh commented on SPARK-16889:
-

What if the pattern is user-provided or worse if it's a column instead of a 
literal string? Then it's not feasible to use %s, %f based notation.

> Add formatMessage Column expression for formatting strings in 
> java.text.MessageFormat style in Scala API 
> -
>
> Key: SPARK-16889
> URL: https://issues.apache.org/jira/browse/SPARK-16889
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Reporter: Kapil Singh
>
> format_string formats the arguments in printf-style and has following major 
> cons compared to proposed function for formatting java.text.MessageFormat:
> 1. MessageFormat syntax is more readable since it is more explicit
> java.util.Formatter syntax: "Argument '%s' shall not be negative. The given 
> value was %f."
> java.text.MessageFormat syntax: "Argument '{0}' shall not be negative. The 
> given value was {1}."
> 2. Formatter forces user to declare the argument type (e.g. "%s" or "%f"), 
> while MessageFormat infers it from the object type. For example if the 
> argument could be a string or a number, then Formatter forces us to use the 
> "%s" type (passing a string to "%f" causes an exception). However a number 
> formatted with "%s" is formatted using Number.toString(), which produce an 
> unlocalized value. By contrast, MessageFormat produces localized values 
> dynamically for all recognized types.
> To address these drawbacks, a MessageFormat function should be added.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-16894) take function for returning the first n elements of array column

2016-08-04 Thread Kapil Singh (JIRA)
Kapil Singh created SPARK-16894:
---

 Summary: take function for returning the first n elements of array 
column
 Key: SPARK-16894
 URL: https://issues.apache.org/jira/browse/SPARK-16894
 Project: Spark
  Issue Type: New Feature
  Components: SQL
Reporter: Kapil Singh


take(inputArray, n)

Returns array containing first n elements of inputArray



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-16892) flatten function to get flat array (or map) column from array of array (or array of map) column

2016-08-04 Thread Kapil Singh (JIRA)
Kapil Singh created SPARK-16892:
---

 Summary: flatten function to get flat array (or map) column from 
array of array (or array of map) column
 Key: SPARK-16892
 URL: https://issues.apache.org/jira/browse/SPARK-16892
 Project: Spark
  Issue Type: New Feature
  Components: SQL
Reporter: Kapil Singh


flatten(input)

Converts input of array of array type into flat array type by inserting 
elements of all element arrays into single array. Example:
input: [[1, 2, 3], [4, 5], [-1, -2, 0]]
output: [1, 2, 3, 4, 5, -1, -2, 0]

Converts input of array of map type into flat map type by inserting key-value 
pairs of all element maps into single map. Example:
input: [(1 -> "one", 2 -> "two"), (0 -> "zero"), (4 -> "four")]
output: (1 -> "one", 2 -> "two", 0 -> "zero", 4 -> "four")



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-16891) arrayFilter function for filtering elements of array column based on a predicate

2016-08-04 Thread Kapil Singh (JIRA)
Kapil Singh created SPARK-16891:
---

 Summary: arrayFilter function for filtering elements of array 
column based on a predicate
 Key: SPARK-16891
 URL: https://issues.apache.org/jira/browse/SPARK-16891
 Project: Spark
  Issue Type: New Feature
  Components: SQL
Reporter: Kapil Singh


arrayFilter(inputArray, predicateArray) //predicateArray is an array of booleans

Returns an array containing the elements of inputArray for which elements at 
corresponding indices in predicateArray are/evaluate to true.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-16890) substring returns wrong result for positive position

2016-08-04 Thread Kapil Singh (JIRA)
Kapil Singh created SPARK-16890:
---

 Summary: substring returns wrong result for positive position
 Key: SPARK-16890
 URL: https://issues.apache.org/jira/browse/SPARK-16890
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.0.0, 1.6.2, 1.5.2
Reporter: Kapil Singh


If position > 0 then substring function reduces it by 1. So if it is invoked 
like substring("First", 1, 4) then result is "Fir" instead of "irs". This is 
because UTF8String's substring method does this fancy modification before 
invoking substring on underlying String.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-16889) Add formatMessage Column expression for formatting strings in java.text.MessageFormat style in Scala API

2016-08-04 Thread Kapil Singh (JIRA)
Kapil Singh created SPARK-16889:
---

 Summary: Add formatMessage Column expression for formatting 
strings in java.text.MessageFormat style in Scala API 
 Key: SPARK-16889
 URL: https://issues.apache.org/jira/browse/SPARK-16889
 Project: Spark
  Issue Type: New Feature
  Components: SQL
Reporter: Kapil Singh


format_string formats the arguments in printf-style and has following major 
cons compared to proposed function for formatting java.text.MessageFormat:

1. MessageFormat syntax is more readable since it is more explicit
java.util.Formatter syntax: "Argument '%s' shall not be negative. The given 
value was %f."
java.text.MessageFormat syntax: "Argument '{0}' shall not be negative. The 
given value was {1}."

2. Formatter forces user to declare the argument type (e.g. "%s" or "%f"), 
while MessageFormat infers it from the object type. For example if the argument 
could be a string or a number, then Formatter forces us to use the "%s" type 
(passing a string to "%f" causes an exception). However a number formatted with 
"%s" is formatted using Number.toString(), which produce an unlocalized value. 
By contrast, MessageFormat produces localized values dynamically for all 
recognized types.

To address these drawbacks, a MessageFormat function should be added.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-16876) Add match Column expression for regular expression matching in Scala API

2016-08-03 Thread Kapil Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15407200#comment-15407200
 ] 

Kapil Singh commented on SPARK-16876:
-

Yes these are similar, but SPARK-16203 is for PySpark while this ticket is for 
Scala

> Add match Column expression for regular expression matching in Scala API 
> -
>
> Key: SPARK-16876
> URL: https://issues.apache.org/jira/browse/SPARK-16876
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Reporter: Kapil Singh
>Priority: Minor
>
> RegExpExtract expression gets only the i_th_ regular expression match. A 
> match expression should be there to get all the matches as an array of 
> strings.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-16876) Add match Column expression for regular expression matching in Scala API

2016-08-03 Thread Kapil Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15405961#comment-15405961
 ] 

Kapil Singh commented on SPARK-16876:
-

For example, consider a data field in which miscellaneous product metadata is 
coming as single string 
"product_id=325173517;ref=abcorg;product_id=8712176531;ref=xyzorg" and all 
product ids are to be extracted. There is no direct way to achieve this through 
existing expressions. 

> Add match Column expression for regular expression matching in Scala API 
> -
>
> Key: SPARK-16876
> URL: https://issues.apache.org/jira/browse/SPARK-16876
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Reporter: Kapil Singh
>Priority: Minor
>
> RegExpExtract expression gets only the i_th_ regular expression match. A 
> match expression should be there to get all the matches as an array of 
> strings.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-16876) Add match Column expression for regular expression matching in Scala API

2016-08-03 Thread Kapil Singh (JIRA)
Kapil Singh created SPARK-16876:
---

 Summary: Add match Column expression for regular expression 
matching in Scala API 
 Key: SPARK-16876
 URL: https://issues.apache.org/jira/browse/SPARK-16876
 Project: Spark
  Issue Type: New Feature
  Components: SQL
Reporter: Kapil Singh


RegExpExtract expression gets only the i_th_ regular expression match. A match 
expression should be there to get all the matches as an array of strings.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-16876) Add match Column expression for regular expression matching in Scala API

2016-08-03 Thread Kapil Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15405808#comment-15405808
 ] 

Kapil Singh commented on SPARK-16876:
-

I'm working on this.

> Add match Column expression for regular expression matching in Scala API 
> -
>
> Key: SPARK-16876
> URL: https://issues.apache.org/jira/browse/SPARK-16876
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Reporter: Kapil Singh
>
> RegExpExtract expression gets only the i_th_ regular expression match. A 
> match expression should be there to get all the matches as an array of 
> strings.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12194) Add Sink for reporting Spark Metrics to OpenTSDB

2015-12-07 Thread Kapil Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15046331#comment-15046331
 ] 

Kapil Singh commented on SPARK-12194:
-

I'm working on this.

> Add Sink for reporting Spark Metrics to OpenTSDB
> 
>
> Key: SPARK-12194
> URL: https://issues.apache.org/jira/browse/SPARK-12194
> Project: Spark
>  Issue Type: New Feature
>  Components: Spark Core
>Affects Versions: 1.5.2
>Reporter: Kapil Singh
>
> Add OpenTSDB Sink to the currently supported metric sinks. Since OpenTSDB is 
> a popular open-source Time Series Database (based on HBase), this will make 
> it convenient for those who want metrics data for time series analysis 
> purposes.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-12194) Add Sink for reporting Spark Metrics to OpenTSDB

2015-12-07 Thread Kapil Singh (JIRA)
Kapil Singh created SPARK-12194:
---

 Summary: Add Sink for reporting Spark Metrics to OpenTSDB
 Key: SPARK-12194
 URL: https://issues.apache.org/jira/browse/SPARK-12194
 Project: Spark
  Issue Type: New Feature
  Components: Spark Core
Affects Versions: 1.5.2
Reporter: Kapil Singh


Add OpenTSDB Sink to the currently supported metric sinks. Since OpenTSDB is a 
popular open-source Time Series Database (based on HBase), this will make it 
convenient for those who want metrics data for time series analysis purposes.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-11447) Null comparison requires type information but type extraction fails for complex types

2015-11-03 Thread Kapil Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-11447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14987096#comment-14987096
 ] 

Kapil Singh commented on SPARK-11447:
-

On second look, I seem to have identified the issue. Take a look at lines 
283-286 here:
https://github.com/apache/spark/blob/a01cbf5daac148f39cd97299780f542abc41d1e9/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala

If one of the types in BinaryComparison is StringType and other is NullType, 
during analyzed plan computation, this forces DoubleType on the StringType. 
Later while enforcing this cast (lines 340-343 in 
https://github.com/apache/spark/blob/a01cbf5daac148f39cd97299780f542abc41d1e9/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala),
 conversion to double fails if string is not actually a number and a default 
null value is assigned to result. This manifests as null comparison resulting 
true for all string values. 

> Null comparison requires type information but type extraction fails for 
> complex types
> -
>
> Key: SPARK-11447
> URL: https://issues.apache.org/jira/browse/SPARK-11447
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.5.1
>Reporter: Kapil Singh
>
> While comparing a Column to a null literal, comparison works only if type of 
> null literal matches type of the Column it's being compared to. Example scala 
> code (can be run from spark shell):
> import org.apache.spark.sql._
> import org.apache.spark.sql.types._
> import org.apache.spark.sql.catalyst.expressions._
> val inputRowsData = Seq(Seq("abc"),Seq(null),Seq("xyz"))
> val inputRows = for(seq <- inputRowsData) yield Row.fromSeq(seq)
> val dfSchema = StructType(Seq(StructField("column", StringType, true)))
> val df = sqlContext.createDataFrame(sc.makeRDD(inputRows), dfSchema)
> //DOESN'T WORK
> val filteredDF = df.filter(df("column") <=> (new Column(Literal(null
> //WORKS
> val filteredDF = df.filter(df("column") <=> (new Column(Literal.create(null, 
> SparkleFunctions.dataType(df("column"))
> Why should type information be required for a null comparison? If it's 
> required, it's not always possible to extract type information from complex  
> types (e.g. StructType). Following scala code (can be run from spark shell), 
> throws org.apache.spark.sql.catalyst.analysis.UnresolvedException:
> import org.apache.spark.sql._
> import org.apache.spark.sql.types._
> import org.apache.spark.sql.catalyst.expressions._
> val inputRowsData = Seq(Seq(Row.fromSeq(Seq("abc", 
> "def"))),Seq(Row.fromSeq(Seq(null, "123"))),Seq(Row.fromSeq(Seq("ghi", 
> "jkl"
> val inputRows = for(seq <- inputRowsData) yield Row.fromSeq(seq)
> val dfSchema = StructType(Seq(StructField("column", 
> StructType(Seq(StructField("p1", StringType, true), StructField("p2", 
> StringType, true))), true)))
> val filteredDF = df.filter(df("column")("p1") <=> (new 
> Column(Literal.create(null, SparkleFunctions.dataType(df("column")("p1"))
> org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to 
> dataType on unresolved object, tree: column#0[p1]
>   at 
> org.apache.spark.sql.catalyst.analysis.UnresolvedExtractValue.dataType(unresolved.scala:243)
>   at 
> org.apache.spark.sql.ArithmeticFunctions$class.dataType(ArithmeticFunctions.scala:76)
>   at 
> org.apache.spark.sql.SparkleFunctions$.dataType(SparkleFunctions.scala:14)
>   at 
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:38)
>   at 
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:43)
>   at 
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:45)
>   at 
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:47)
>   at 
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:49)
>   at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:51)
>   at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:53)
>   at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:55)
>   at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:57)
>   at $iwC$$iwC$$iwC$$iwC$$iwC.(:59)
>   at $iwC$$iwC$$iwC$$iwC.(:61)
>   at $iwC$$iwC$$iwC.(:63)
>   at $iwC$$iwC.(:65)
>   at $iwC.(:67)
>   at (:69)
>   at .(:73)
>   at .()
>   at .(:7)
>   at .()
>   at $print()
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
>   at 
> 

[jira] [Commented] (SPARK-11447) Null comparison requires type information but type extraction fails for complex types

2015-11-02 Thread Kapil Singh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-11447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14986552#comment-14986552
 ] 

Kapil Singh commented on SPARK-11447:
-

Hi Kevin,

My bad, should have been more explicit. The code runs but the result is 
incorrect:

val filteredDF = df.filter(df("column") <=> (new Column(Literal(null
filteredDF.show

gives:

+--+
|column|
+--+
|   abc|
|  null|
|   xyz|
+--+

which is not correct. The result should include only the row with null value.

In the second version where I'm passing type information:

val filteredDF = df.filter(df("column") <=> (new Column(Literal.create(null, 
SparkleFunctions.dataType(df("column"))
filteredDF.show

result is:

+--+
|column|
+--+
|  null|
+--+

which is correct. Here SparkleFunctions.dataType is just a method I defined to 
be able to get type information from Column.expr outside "org.apache.spark.sql" 
package:

package org.apache.spark.sql
object SparkleFunctions {
  def dataType(c: Column): DataType = c.expr.dataType
}

> Null comparison requires type information but type extraction fails for 
> complex types
> -
>
> Key: SPARK-11447
> URL: https://issues.apache.org/jira/browse/SPARK-11447
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.5.1
>Reporter: Kapil Singh
>
> While comparing a Column to a null literal, comparison works only if type of 
> null literal matches type of the Column it's being compared to. Example scala 
> code (can be run from spark shell):
> import org.apache.spark.sql._
> import org.apache.spark.sql.types._
> import org.apache.spark.sql.catalyst.expressions._
> val inputRowsData = Seq(Seq("abc"),Seq(null),Seq("xyz"))
> val inputRows = for(seq <- inputRowsData) yield Row.fromSeq(seq)
> val dfSchema = StructType(Seq(StructField("column", StringType, true)))
> val df = sqlContext.createDataFrame(sc.makeRDD(inputRows), dfSchema)
> //DOESN'T WORK
> val filteredDF = df.filter(df("column") <=> (new Column(Literal(null
> //WORKS
> val filteredDF = df.filter(df("column") <=> (new Column(Literal.create(null, 
> SparkleFunctions.dataType(df("column"))
> Why should type information be required for a null comparison? If it's 
> required, it's not always possible to extract type information from complex  
> types (e.g. StructType). Following scala code (can be run from spark shell), 
> throws org.apache.spark.sql.catalyst.analysis.UnresolvedException:
> import org.apache.spark.sql._
> import org.apache.spark.sql.types._
> import org.apache.spark.sql.catalyst.expressions._
> val inputRowsData = Seq(Seq(Row.fromSeq(Seq("abc", 
> "def"))),Seq(Row.fromSeq(Seq(null, "123"))),Seq(Row.fromSeq(Seq("ghi", 
> "jkl"
> val inputRows = for(seq <- inputRowsData) yield Row.fromSeq(seq)
> val dfSchema = StructType(Seq(StructField("column", 
> StructType(Seq(StructField("p1", StringType, true), StructField("p2", 
> StringType, true))), true)))
> val filteredDF = df.filter(df("column")("p1") <=> (new 
> Column(Literal.create(null, SparkleFunctions.dataType(df("column")("p1"))
> org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to 
> dataType on unresolved object, tree: column#0[p1]
>   at 
> org.apache.spark.sql.catalyst.analysis.UnresolvedExtractValue.dataType(unresolved.scala:243)
>   at 
> org.apache.spark.sql.ArithmeticFunctions$class.dataType(ArithmeticFunctions.scala:76)
>   at 
> org.apache.spark.sql.SparkleFunctions$.dataType(SparkleFunctions.scala:14)
>   at 
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:38)
>   at 
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:43)
>   at 
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:45)
>   at 
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:47)
>   at 
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:49)
>   at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:51)
>   at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:53)
>   at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:55)
>   at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:57)
>   at $iwC$$iwC$$iwC$$iwC$$iwC.(:59)
>   at $iwC$$iwC$$iwC$$iwC.(:61)
>   at $iwC$$iwC$$iwC.(:63)
>   at $iwC$$iwC.(:65)
>   at $iwC.(:67)
>   at (:69)
>   at .(:73)
>   at .()
>   at .(:7)
>   at .()
>   at $print()
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> 

[jira] [Created] (SPARK-11447) Null comparison requires type information but type extraction fails for complex types

2015-11-01 Thread Kapil Singh (JIRA)
Kapil Singh created SPARK-11447:
---

 Summary: Null comparison requires type information but type 
extraction fails for complex types
 Key: SPARK-11447
 URL: https://issues.apache.org/jira/browse/SPARK-11447
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 1.5.1
Reporter: Kapil Singh


While comparing a Column to a null literal, comparison works only if type of 
null literal matches type of the Column it's being compared to. Example scala 
code (can be run from spark shell):


import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.catalyst.expressions._

val inputRowsData = Seq(Seq("abc"),Seq(null),Seq("xyz"))
val inputRows = for(seq <- inputRowsData) yield Row.fromSeq(seq)
val dfSchema = StructType(Seq(StructField("column", StringType, true)))
val df = sqlContext.createDataFrame(sc.makeRDD(inputRows), dfSchema)

//DOESN'T WORK
val filteredDF = df.filter(df("column") <=> (new Column(Literal(null

//WORKS
val filteredDF = df.filter(df("column") <=> (new Column(Literal.create(null, 
SparkleFunctions.dataType(df("column"))


Why should type information be required for a null comparison? If it's 
required, it's not always possible to extract type information from complex  
types (e.g. StructType). Following scala code (can be run from spark shell), 
throws org.apache.spark.sql.catalyst.analysis.UnresolvedException:


import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.catalyst.expressions._

val inputRowsData = Seq(Seq(Row.fromSeq(Seq("abc", 
"def"))),Seq(Row.fromSeq(Seq(null, "123"))),Seq(Row.fromSeq(Seq("ghi", "jkl"
val inputRows = for(seq <- inputRowsData) yield Row.fromSeq(seq)
val dfSchema = StructType(Seq(StructField("column", 
StructType(Seq(StructField("p1", StringType, true), StructField("p2", 
StringType, true))), true)))

val filteredDF = df.filter(df("column")("p1") <=> (new 
Column(Literal.create(null, SparkleFunctions.dataType(df("column")("p1"))

org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to 
dataType on unresolved object, tree: column#0[p1]
at 
org.apache.spark.sql.catalyst.analysis.UnresolvedExtractValue.dataType(unresolved.scala:243)
at 
org.apache.spark.sql.ArithmeticFunctions$class.dataType(ArithmeticFunctions.scala:76)
at 
org.apache.spark.sql.SparkleFunctions$.dataType(SparkleFunctions.scala:14)
at 
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:38)
at 
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:43)
at 
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:45)
at 
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:47)
at 
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:49)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:51)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:53)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:55)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:57)
at $iwC$$iwC$$iwC$$iwC$$iwC.(:59)
at $iwC$$iwC$$iwC$$iwC.(:61)
at $iwC$$iwC$$iwC.(:63)
at $iwC$$iwC.(:65)
at $iwC.(:67)
at (:69)
at .(:73)
at .()
at .(:7)
at .()
at $print()
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
at 
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1340)
at 
org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
at 
org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
at 
org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
at 
org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
at 
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
at 
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
at