[GitHub] spark pull request #21815: [SPARK-23731][SQL] Make FileSourceScanExec canoni...

2018-07-20 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/21815#discussion_r204098057
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala 
---
@@ -270,7 +270,7 @@ case class FileSourceScanExec(
   private val pushedDownFilters = 
dataFilters.flatMap(DataSourceStrategy.translateFilter)
   logInfo(s"Pushed Filters: ${pushedDownFilters.mkString(",")}")
 
-  override val metadata: Map[String, String] = {
+  override lazy val metadata: Map[String, String] = {
--- End diff --

Ouch. I'd have never thought about any code with `RDD` and physical 
operators on the executor-side (!) Learnt it today.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21815: [SPARK-23731][SQL] Make FileSourceScanExec canoni...

2018-07-20 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/21815


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21815: [SPARK-23731][SQL] Make FileSourceScanExec canoni...

2018-07-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21815#discussion_r203980465
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/FileSourceScanExecSuite.scala
 ---
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.test.SharedSQLContext
+
+class FileSourceScanExecSuite extends SharedSQLContext {
+  test("FileSourceScanExec should be canonicalizable on executor side") {
--- End diff --

`SparkPlanSuite` SGTM


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21815: [SPARK-23731][SQL] Make FileSourceScanExec canoni...

2018-07-20 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21815#discussion_r203979619
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/FileSourceScanExecSuite.scala
 ---
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.test.SharedSQLContext
+
+class FileSourceScanExecSuite extends SharedSQLContext {
+  test("FileSourceScanExec should be canonicalizable on executor side") {
--- End diff --

I found `SparkPlanSuite` could be another place to add to address your 
comment. Let me stick to `FileSourceScanExec` but please let me know if you 
prefer this please. I don't mind changing it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21815: [SPARK-23731][SQL] Make FileSourceScanExec canoni...

2018-07-20 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21815#discussion_r203979151
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/FileSourceScanExecSuite.scala
 ---
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.test.SharedSQLContext
+
+class FileSourceScanExecSuite extends SharedSQLContext {
+  test("FileSourceScanExec should be canonicalizable on executor side") {
--- End diff --

I think I can actually put this under `SparkPlanSuite`. Let me put this it 
in.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21815: [SPARK-23731][SQL] Make FileSourceScanExec canoni...

2018-07-20 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21815#discussion_r203976429
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/FileSourceScanExecSuite.scala
 ---
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.test.SharedSQLContext
+
+class FileSourceScanExecSuite extends SharedSQLContext {
+  test("FileSourceScanExec should be canonicalizable on executor side") {
--- End diff --

There's few things bothering for that actually - it's kind of messy to 
create `FileSourceScanExec` without `SparkSession` (and also without other 
utils from `SharedSQLContext`), and `QueryPlanSuite` is under `catalyst` 
whereas this plan itself is under `execution` in SQL core.

And, I actually believe this PR more targets to make the plan 
canonicalizable after it's de/serialized since this plan itself is serializable 
and deserializable already but it's not canonicalizable after that.

Let me try to clean up based on your comment anyway.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21815: [SPARK-23731][SQL] Make FileSourceScanExec canoni...

2018-07-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21815#discussion_r203947912
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/FileSourceScanExecSuite.scala
 ---
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.test.SharedSQLContext
+
+class FileSourceScanExecSuite extends SharedSQLContext {
+  test("FileSourceScanExec should be canonicalizable on executor side") {
--- End diff --

I'd like to put this test in `QueryPlanSuite`, with name `SPARK-: query 
plans can be serialized and deserialized`.

In the test we don't need to trigger a job, just call 
`spark.env.serializer` to serialize and deserialize the `FileSourceScanExec`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21815: [SPARK-23731][SQL] Make FileSourceScanExec canoni...

2018-07-19 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21815#discussion_r203682853
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/FileSourceScanExecSuite.scala
 ---
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.test.SharedSQLContext
+
+class FileSourceScanExecSuite extends SharedSQLContext {
+  test("FileSourceScanExec should be canonicalizable in executor side") {
+withTempPath { path =>
+  spark.range(1).toDF().write.parquet(path.getAbsolutePath)
+  val df = spark.read.parquet(path.getAbsolutePath)
+  val fileSourceScanExec =
+
df.queryExecution.sparkPlan.find(_.isInstanceOf[FileSourceScanExec]).get
+  try {
+spark.range(1).foreach(_ => fileSourceScanExec.canonicalized)
+  } catch {
+case e: Throwable => fail("FileSourceScanExec was not 
canonicalizable", e)
--- End diff --

Hm, this gives an explicit scope about which condition is a failure case 
though. I believe this is a rather pattern. If both are okay, let me just keep 
in this way.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21815: [SPARK-23731][SQL] Make FileSourceScanExec canoni...

2018-07-19 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21815#discussion_r203680931
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala 
---
@@ -270,7 +270,7 @@ case class FileSourceScanExec(
   private val pushedDownFilters = 
dataFilters.flatMap(DataSourceStrategy.translateFilter)
   logInfo(s"Pushed Filters: ${pushedDownFilters.mkString(",")}")
 
-  override val metadata: Map[String, String] = {
+  override lazy val metadata: Map[String, String] = {
--- End diff --

it can be in executor side actually:

```
at 
org.apache.spark.sql.execution.FileSourceScanExec.(DataSourceScanExec.scala:275)
at 
org.apache.spark.sql.execution.FileSourceScanExec.doCanonicalize(DataSourceScanExec.scala:526)
at 
org.apache.spark.sql.execution.FileSourceScanExec.doCanonicalize(DataSourceScanExec.scala:159)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:211)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:210)
at 
org.apache.spark.sql.execution.FileSourceScanExecSuite$$anonfun$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$1.apply(FileSourceScanExecSuite.scala:30)
at 
org.apache.spark.sql.execution.FileSourceScanExecSuite$$anonfun$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$1.apply(FileSourceScanExecSuite.scala:30)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at 
org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:921)
at 
org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:921)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2083)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2083)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:367)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21815: [SPARK-23731][SQL] Make FileSourceScanExec canoni...

2018-07-19 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21815#discussion_r203680278
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/FileSourceScanExecSuite.scala
 ---
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.test.SharedSQLContext
+
+class FileSourceScanExecSuite extends SharedSQLContext {
+  test("FileSourceScanExec should be canonicalizable in executor side") {
+withTempPath { path =>
+  spark.range(1).toDF().write.parquet(path.getAbsolutePath)
+  val df = spark.read.parquet(path.getAbsolutePath)
+  val fileSourceScanExec =
+
df.queryExecution.sparkPlan.find(_.isInstanceOf[FileSourceScanExec]).get
+  try {
+spark.range(1).foreach(_ => fileSourceScanExec.canonicalized)
--- End diff --

Yes, I think of course it is.. it took me a while to make a small and 
simple test for it.. Hope leave it out of this PR's scope though.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21815: [SPARK-23731][SQL] Make FileSourceScanExec canoni...

2018-07-19 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21815#discussion_r203680097
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala ---
@@ -69,7 +67,9 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] 
with Logging with Serializ
 
   /** Overridden make copy also propagates sqlContext to copied plan. */
   override def makeCopy(newArgs: Array[AnyRef]): SparkPlan = {
-SparkSession.setActiveSession(sqlContext.sparkSession)
+if (sqlContext != null) {
--- End diff --

I see, thanks, I wondered because it seems a more generic issue, easier to 
happen, but probably we never met it as all the trials included 
`FileSourceScanExec` which caused an earlier failure... thanks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21815: [SPARK-23731][SQL] Make FileSourceScanExec canoni...

2018-07-19 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21815#discussion_r203679895
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala 
---
@@ -166,10 +166,10 @@ case class FileSourceScanExec(
 override val tableIdentifier: Option[TableIdentifier])
   extends DataSourceScanExec with ColumnarBatchScan  {
 
-  override val supportsBatch: Boolean = relation.fileFormat.supportBatch(
+  override lazy val supportsBatch: Boolean = 
relation.fileFormat.supportBatch(
 relation.sparkSession, StructType.fromAttributes(output))
 
-  override val needsUnsafeRowConversion: Boolean = {
+  override lazy val needsUnsafeRowConversion: Boolean = {
 if (relation.fileFormat.isInstanceOf[ParquetSource]) {
   
SparkSession.getActiveSession.get.sessionState.conf.parquetVectorizedReaderEnabled
--- End diff --

Let's leave this out of this PR's scope. That's more like making the plan 
workable whereas this PR targets the plan can be canonicalized.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21815: [SPARK-23731][SQL] Make FileSourceScanExec canoni...

2018-07-19 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21815#discussion_r203679141
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala ---
@@ -69,7 +67,9 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] 
with Logging with Serializ
 
   /** Overridden make copy also propagates sqlContext to copied plan. */
   override def makeCopy(newArgs: Array[AnyRef]): SparkPlan = {
-SparkSession.setActiveSession(sqlContext.sparkSession)
+if (sqlContext != null) {
--- End diff --

Because it looks failed ahead. Once we go with `lazy` then it's discovered 
later (the exception message in the PR description).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21815: [SPARK-23731][SQL] Make FileSourceScanExec canoni...

2018-07-19 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21815#discussion_r203671394
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala 
---
@@ -199,7 +199,7 @@ case class FileSourceScanExec(
 ret
   }
 
-  override val (outputPartitioning, outputOrdering): (Partitioning, 
Seq[SortOrder]) = {
+  override lazy val (outputPartitioning, outputOrdering): (Partitioning, 
Seq[SortOrder]) = {
--- End diff --

it'd be computed anyway, though, when we create a new `FileSourceScanExec` 
in the canonicalization process, if it is not lazy, so I'd say that this is 
needed, as well as all the others.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21815: [SPARK-23731][SQL] Make FileSourceScanExec canoni...

2018-07-19 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21815#discussion_r203669396
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala ---
@@ -69,7 +67,9 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] 
with Logging with Serializ
 
   /** Overridden make copy also propagates sqlContext to copied plan. */
   override def makeCopy(newArgs: Array[AnyRef]): SparkPlan = {
-SparkSession.setActiveSession(sqlContext.sparkSession)
+if (sqlContext != null) {
--- End diff --

just curious, why wasn't the `makeCopy` problem discovered in the previous 
PR/investigation?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21815: [SPARK-23731][SQL] Make FileSourceScanExec canoni...

2018-07-19 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21815#discussion_r203668475
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala 
---
@@ -166,10 +166,10 @@ case class FileSourceScanExec(
 override val tableIdentifier: Option[TableIdentifier])
   extends DataSourceScanExec with ColumnarBatchScan  {
 
-  override val supportsBatch: Boolean = relation.fileFormat.supportBatch(
+  override lazy val supportsBatch: Boolean = 
relation.fileFormat.supportBatch(
--- End diff --

nit: can we maybe add a comment about the reason we are making them lazy?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21815: [SPARK-23731][SQL] Make FileSourceScanExec canoni...

2018-07-19 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21815#discussion_r203669375
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/FileSourceScanExecSuite.scala
 ---
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.test.SharedSQLContext
+
+class FileSourceScanExecSuite extends SharedSQLContext {
+  test("FileSourceScanExec should be canonicalizable in executor side") {
+withTempPath { path =>
+  spark.range(1).toDF().write.parquet(path.getAbsolutePath)
+  val df = spark.read.parquet(path.getAbsolutePath)
+  val fileSourceScanExec =
+
df.queryExecution.sparkPlan.find(_.isInstanceOf[FileSourceScanExec]).get
+  try {
+spark.range(1).foreach(_ => fileSourceScanExec.canonicalized)
--- End diff --

not sure whether it is feasible (maybe in a followup?), but it would be 
great if we can test the canonicalization of all the Exec nodes in order to 
prevent such issue in the future... what do you think?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21815: [SPARK-23731][SQL] Make FileSourceScanExec canoni...

2018-07-19 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/21815#discussion_r203666346
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/FileSourceScanExecSuite.scala
 ---
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.test.SharedSQLContext
+
+class FileSourceScanExecSuite extends SharedSQLContext {
+  test("FileSourceScanExec should be canonicalizable in executor side") {
+withTempPath { path =>
+  spark.range(1).toDF().write.parquet(path.getAbsolutePath)
--- End diff --

Redundant `toDF`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21815: [SPARK-23731][SQL] Make FileSourceScanExec canoni...

2018-07-19 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/21815#discussion_r203665574
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala 
---
@@ -270,7 +270,7 @@ case class FileSourceScanExec(
   private val pushedDownFilters = 
dataFilters.flatMap(DataSourceStrategy.translateFilter)
   logInfo(s"Pushed Filters: ${pushedDownFilters.mkString(",")}")
 
-  override val metadata: Map[String, String] = {
+  override lazy val metadata: Map[String, String] = {
--- End diff --

That's driver-only too, isn't it? Why is this `lazy` required?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21815: [SPARK-23731][SQL] Make FileSourceScanExec canoni...

2018-07-19 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/21815#discussion_r203666893
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/FileSourceScanExecSuite.scala
 ---
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.test.SharedSQLContext
+
+class FileSourceScanExecSuite extends SharedSQLContext {
+  test("FileSourceScanExec should be canonicalizable in executor side") {
+withTempPath { path =>
+  spark.range(1).toDF().write.parquet(path.getAbsolutePath)
+  val df = spark.read.parquet(path.getAbsolutePath)
+  val fileSourceScanExec =
+
df.queryExecution.sparkPlan.find(_.isInstanceOf[FileSourceScanExec]).get
--- End diff --

This `isInstanceOf` is a bit non-Scala IMHO and I'd prefer `collectFirst { 
case op: FileSourceScanExec => op }` instead.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21815: [SPARK-23731][SQL] Make FileSourceScanExec canoni...

2018-07-19 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/21815#discussion_r203666125
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/FileSourceScanExecSuite.scala
 ---
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.test.SharedSQLContext
+
+class FileSourceScanExecSuite extends SharedSQLContext {
+  test("FileSourceScanExec should be canonicalizable in executor side") {
--- End diff --

nit: s/in/on


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21815: [SPARK-23731][SQL] Make FileSourceScanExec canoni...

2018-07-19 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/21815#discussion_r203667943
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/FileSourceScanExecSuite.scala
 ---
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.test.SharedSQLContext
+
+class FileSourceScanExecSuite extends SharedSQLContext {
+  test("FileSourceScanExec should be canonicalizable in executor side") {
+withTempPath { path =>
+  spark.range(1).toDF().write.parquet(path.getAbsolutePath)
+  val df = spark.read.parquet(path.getAbsolutePath)
+  val fileSourceScanExec =
+
df.queryExecution.sparkPlan.find(_.isInstanceOf[FileSourceScanExec]).get
+  try {
+spark.range(1).foreach(_ => fileSourceScanExec.canonicalized)
+  } catch {
+case e: Throwable => fail("FileSourceScanExec was not 
canonicalizable", e)
--- End diff --

It's a named test so I'd get rid of the `try-catch` block because:

1. It's going to fail the test anyway
2. The title of the test matches the `fail` message.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21815: [SPARK-23731][SQL] Make FileSourceScanExec canoni...

2018-07-19 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/21815#discussion_r203664621
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala 
---
@@ -166,10 +166,10 @@ case class FileSourceScanExec(
 override val tableIdentifier: Option[TableIdentifier])
   extends DataSourceScanExec with ColumnarBatchScan  {
 
-  override val supportsBatch: Boolean = relation.fileFormat.supportBatch(
+  override lazy val supportsBatch: Boolean = 
relation.fileFormat.supportBatch(
 relation.sparkSession, StructType.fromAttributes(output))
 
-  override val needsUnsafeRowConversion: Boolean = {
+  override lazy val needsUnsafeRowConversion: Boolean = {
 if (relation.fileFormat.isInstanceOf[ParquetSource]) {
   
SparkSession.getActiveSession.get.sessionState.conf.parquetVectorizedReaderEnabled
--- End diff --

Since you mentioned `SparkSession`, that line caught my attention where the 
active `SparkSession` is accessed using `SparkSession.getActiveSession.get` not 
`relation.sparkSession` as is the case for other places. I think that's 
something worth considering changing since we're at it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21815: [SPARK-23731][SQL] Make FileSourceScanExec canoni...

2018-07-19 Thread jaceklaskowski
Github user jaceklaskowski commented on a diff in the pull request:

https://github.com/apache/spark/pull/21815#discussion_r203665187
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala 
---
@@ -199,7 +199,7 @@ case class FileSourceScanExec(
 ret
   }
 
-  override val (outputPartitioning, outputOrdering): (Partitioning, 
Seq[SortOrder]) = {
+  override lazy val (outputPartitioning, outputOrdering): (Partitioning, 
Seq[SortOrder]) = {
--- End diff --

That happens on the driver so no need for the `lazy` here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21815: [SPARK-23731][SQL] Make FileSourceScanExec canoni...

2018-07-19 Thread HyukjinKwon
GitHub user HyukjinKwon opened a pull request:

https://github.com/apache/spark/pull/21815

[SPARK-23731][SQL] Make FileSourceScanExec canonicalizable in executor side

## What changes were proposed in this pull request?

### What's problem?

In some cases, sub scalar query could throw a NPE, which is caused in 
execution side.

```
java.lang.NullPointerException
at 
org.apache.spark.sql.execution.FileSourceScanExec.(DataSourceScanExec.scala:169)
at 
org.apache.spark.sql.execution.FileSourceScanExec.doCanonicalize(DataSourceScanExec.scala:526)
at 
org.apache.spark.sql.execution.FileSourceScanExec.doCanonicalize(DataSourceScanExec.scala:159)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:211)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:210)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$3.apply(QueryPlan.scala:225)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$3.apply(QueryPlan.scala:225)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:296)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:225)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:211)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:210)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan.sameResult(QueryPlan.scala:258)
at 
org.apache.spark.sql.execution.ScalarSubquery.semanticEquals(subquery.scala:58)
at 
org.apache.spark.sql.catalyst.expressions.EquivalentExpressions$Expr.equals(EquivalentExpressions.scala:36)
at 
scala.collection.mutable.HashTable$class.elemEquals(HashTable.scala:364)
at scala.collection.mutable.HashMap.elemEquals(HashMap.scala:40)
at 
scala.collection.mutable.HashTable$class.scala$collection$mutable$HashTable$$findEntry0(HashTable.scala:139)
at 
scala.collection.mutable.HashTable$class.findEntry(HashTable.scala:135)
at scala.collection.mutable.HashMap.findEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap.get(HashMap.scala:70)
at 
org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExpr(EquivalentExpressions.scala:56)
at 
org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExprTree(EquivalentExpressions.scala:97)
at 
org.apache.spark.sql.catalyst.expressions.EquivalentExpressions$$anonfun$addExprTree$1.apply(EquivalentExpressions.scala:98)
at 
org.apache.spark.sql.catalyst.expressions.EquivalentExpressions$$anonfun$addExprTree$1.apply(EquivalentExpressions.scala:98)
at scala.collection.immutable.List.foreach(List.scala:392)
at 
org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExprTree(EquivalentExpressions.scala:98)
at 
org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext$$anonfun$subexpressionElimination$1.apply(CodeGenerator.scala:1102)
at 
org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext$$anonfun$subexpressionElimination$1.apply(CodeGenerator.scala:1102)
at scala.collection.immutable.List.foreach(List.scala:392)
at 
org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.subexpressionElimination(CodeGenerator.scala:1102)
at 
org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.generateExpressions(CodeGenerator.scala:1154)
at 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.createCode(GenerateUnsafeProjection.scala:270)
at 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:319)
at 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.generate(GenerateUnsafeProjection.scala:308)
at 
org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.create(Projection.scala:181)
at 
org.apache.spark.sql.execution.ProjectExec$$anonfun$9.apply(basicPhysicalOperators.scala:71)
at 
org.apache.spark.sql.execution.ProjectExec$$anonfun$9.apply(basicPhysicalOperators.scala:70)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$apply$24.apply(RDD.scala:818)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$apply$24.apply(RDD.scala:818)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at