[jira] [Comment Edited] (SPARK-19809) NullPointerException on empty ORC file

2017-05-28 Thread Hyukjin Kwon (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19809?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16027987#comment-16027987
 ] 

Hyukjin Kwon edited comment on SPARK-19809 at 5/29/17 3:21 AM:
---

Yea, I agree that it should be dependent on the format 
specification/implementation, whether it is malformed or not. I think Parquet 
itself treats 0 bytes files as malformed file because it should read footer but 
it throws an exception up to my knowledge. 

The former case looks filtering out the whole partitions in 
{{FileSourceScanExec}}. Parquet requires to read the footers and it throws an 
exception, for example, I manually updated the code path to not skip the 
partitions so that the parquet reader is actually being called as below:

{code}
java.lang.RuntimeException: file:/.../tmp.abc is not a Parquet file (too small)
at 
org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:466)
at 
org.apache.parquet.hadoop.ParquetFileReader.(ParquetFileReader.java:568)
at 
org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:492)
at 
org.apache.parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:166)
at 
org.apache.parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:147)
{code}

If we don't specify the schema, it also throws an exception as below:

{code}
spark.read.parquet(".../tmp.abc").show()
{code}

{code}
java.io.IOException: Could not read footer for file: 
FileStatus{path=file:/.../tmp.abc; isDirectory=false; length=0; replication=0; 
blocksize=0; modification_time=0; access_time=0; owner=; group=; 
permission=rw-rw-rw-; isSymlink=false}
at 
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:498)
at 
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:485)
at 
scala.collection.parallel.AugmentedIterableIterator$class.flatmap2combiner(RemainsIterator.scala:132)
at 
scala.collection.parallel.immutable.ParVector$ParVectorIterator.flatmap2combiner(ParVector.scala:62)
at 
scala.collection.parallel.ParIterableLike$FlatMap.leaf(ParIterableLike.scala:1072)
{code}

Assuming it is treated as a malformed file (per the ORC JIRA you pointed out 
above) for the current status, it looks a malformed file and it sounds we 
should be able to skip this in client side whether it should be dealt with 
{{spark.sql.files.ignoreCorruptFiles}} or not.

For example, I found a related JIRA - 
https://issues.apache.org/jira/browse/AVRO-1530 and 
https://issues.apache.org/jira/browse/HIVE-11977. _If I read this correctly_, 
Avro looks decided not to change the behaviour but Hive deals with it.

Only for this issue, I also agree that this could be a subset of the issues you 
pointed out.


was (Author: hyukjin.kwon):
Yea, I agree that it should be dependent on the format 
specification/implementation, whether it is malformed or not. I think Parquet 
itself treats 0 bytes files as malformed file because it should read footer but 
it throws an exception up to my knowledge. 

The former case looks filtering out the whole partitions in 
{{DataSourceScanExec}}. Parquet requires to read the footers and it throws an 
exception, for example, I manually updated the code path to not skip the 
partitions so that the parquet reader is actually being called as below:

{code}
java.lang.RuntimeException: file:/.../tmp.abc is not a Parquet file (too small)
at 
org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:466)
at 
org.apache.parquet.hadoop.ParquetFileReader.(ParquetFileReader.java:568)
at 
org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:492)
at 
org.apache.parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:166)
at 
org.apache.parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:147)
{code}

If we don't specify the schema, it also throws an exception as below:

{code}
spark.read.parquet(".../tmp.abc").show()
{code}

{code}
java.io.IOException: Could not read footer for file: 
FileStatus{path=file:/.../tmp.abc; isDirectory=false; length=0; replication=0; 
blocksize=0; modification_time=0; access_time=0; owner=; group=; 
permission=rw-rw-rw-; isSymlink=false}
at 
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:498)
at 
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:485)
at 
scala.collection.parallel.AugmentedIterableIterator$class.flatmap2combiner(RemainsIterator.scala:132)

[jira] [Commented] (SPARK-19809) NullPointerException on empty ORC file

2017-05-28 Thread Dongjoon Hyun (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19809?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16028008#comment-16028008
 ] 

Dongjoon Hyun commented on SPARK-19809:
---

Great investigation! Thank you.

> NullPointerException on empty ORC file
> --
>
> Key: SPARK-19809
> URL: https://issues.apache.org/jira/browse/SPARK-19809
> Project: Spark
>  Issue Type: Bug
>  Components: Input/Output
>Affects Versions: 1.6.3, 2.0.2, 2.1.1
>Reporter: Michał Dawid
>
> When reading from hive ORC table if there are some 0 byte files we get 
> NullPointerException:
> {code}java.lang.NullPointerException
>   at 
> org.apache.hadoop.hive.ql.io.orc.OrcInputFormat$BISplitStrategy.getSplits(OrcInputFormat.java:560)
>   at 
> org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.generateSplitsInfo(OrcInputFormat.java:1010)
>   at 
> org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.getSplits(OrcInputFormat.java:1048)
>   at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:199)
>   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:242)
>   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:240)
>   at scala.Option.getOrElse(Option.scala:120)
>   at org.apache.spark.rdd.RDD.partitions(RDD.scala:240)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
>   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:242)
>   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:240)
>   at scala.Option.getOrElse(Option.scala:120)
>   at org.apache.spark.rdd.RDD.partitions(RDD.scala:240)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
>   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:242)
>   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:240)
>   at scala.Option.getOrElse(Option.scala:120)
>   at org.apache.spark.rdd.RDD.partitions(RDD.scala:240)
>   at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
>   at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>   at scala.collection.immutable.List.foreach(List.scala:318)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>   at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66)
>   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:242)
>   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:240)
>   at scala.Option.getOrElse(Option.scala:120)
>   at org.apache.spark.rdd.RDD.partitions(RDD.scala:240)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
>   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:242)
>   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:240)
>   at scala.Option.getOrElse(Option.scala:120)
>   at org.apache.spark.rdd.RDD.partitions(RDD.scala:240)
>   at 
> org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:190)
>   at 
> org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:165)
>   at 
> org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:174)
>   at 
> org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1499)
>   at 
> org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1499)
>   at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
>   at 
> org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2086)
>   at 
> org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$execute$1(DataFrame.scala:1498)
>   at 
> org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$collect(DataFrame.scala:1505)
>   at 
> org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1375)
>   at 
> org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1374)
>   at org.apache.spark.sql.DataFrame.withCallback(DataFrame.scala:2099)
>   at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1374)
>   at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1456)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> 

[jira] [Commented] (SPARK-19809) NullPointerException on empty ORC file

2017-05-28 Thread Hyukjin Kwon (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19809?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16027987#comment-16027987
 ] 

Hyukjin Kwon commented on SPARK-19809:
--

Yea, I agree that it should be dependent on the format 
specification/implementation, whether it is malformed or not. I think Parquet 
itself treats 0 bytes files as malformed file because it should read footer but 
it throws an exception up to my knowledge. 

The former case looks filtering out the whole partitions in 
{{DataSourceScanExec}}. Parquet requires to read the footers and it throws an 
exception, for example, I manually updated the code path to not skip the 
partitions so that the parquet reader is actually being called as below:

{code}
java.lang.RuntimeException: file:/.../tmp.abc is not a Parquet file (too small)
at 
org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:466)
at 
org.apache.parquet.hadoop.ParquetFileReader.(ParquetFileReader.java:568)
at 
org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:492)
at 
org.apache.parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:166)
at 
org.apache.parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:147)
{code}

If we don't specify the schema, it also throws an exception as below:

{code}
spark.read.parquet(".../tmp.abc").show()
{code}

{code}
java.io.IOException: Could not read footer for file: 
FileStatus{path=file:/.../tmp.abc; isDirectory=false; length=0; replication=0; 
blocksize=0; modification_time=0; access_time=0; owner=; group=; 
permission=rw-rw-rw-; isSymlink=false}
at 
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:498)
at 
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:485)
at 
scala.collection.parallel.AugmentedIterableIterator$class.flatmap2combiner(RemainsIterator.scala:132)
at 
scala.collection.parallel.immutable.ParVector$ParVectorIterator.flatmap2combiner(ParVector.scala:62)
at 
scala.collection.parallel.ParIterableLike$FlatMap.leaf(ParIterableLike.scala:1072)
{code}

Assuming it is treated as a malformed file (per the ORC JIRA you pointed out 
above) for the current status, it looks a malformed file and it sounds we 
should be able to skip this in client side whether it should be dealt with 
{{spark.sql.files.ignoreCorruptFiles}} or not.

For example, I found a related JIRA - 
https://issues.apache.org/jira/browse/AVRO-1530 and 
https://issues.apache.org/jira/browse/HIVE-11977. _If I read this correctly_, 
Avro looks decided not to change the behaviour but Hive deals with it.

Only for this issue, I also agree that this could be a subset of the issues you 
pointed out.

> NullPointerException on empty ORC file
> --
>
> Key: SPARK-19809
> URL: https://issues.apache.org/jira/browse/SPARK-19809
> Project: Spark
>  Issue Type: Bug
>  Components: Input/Output
>Affects Versions: 1.6.3, 2.0.2, 2.1.1
>Reporter: Michał Dawid
>
> When reading from hive ORC table if there are some 0 byte files we get 
> NullPointerException:
> {code}java.lang.NullPointerException
>   at 
> org.apache.hadoop.hive.ql.io.orc.OrcInputFormat$BISplitStrategy.getSplits(OrcInputFormat.java:560)
>   at 
> org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.generateSplitsInfo(OrcInputFormat.java:1010)
>   at 
> org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.getSplits(OrcInputFormat.java:1048)
>   at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:199)
>   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:242)
>   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:240)
>   at scala.Option.getOrElse(Option.scala:120)
>   at org.apache.spark.rdd.RDD.partitions(RDD.scala:240)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
>   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:242)
>   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:240)
>   at scala.Option.getOrElse(Option.scala:120)
>   at org.apache.spark.rdd.RDD.partitions(RDD.scala:240)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
>   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:242)
>   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:240)
>   at scala.Option.getOrElse(Option.scala:120)
>   at org.apache.spark.rdd.RDD.partitions(RDD.scala:240)
>   at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
>   at 

[jira] [Commented] (SPARK-20895) Support fast execution based on an optimized plan and parameter placeholders

2017-05-28 Thread Takeshi Yamamuro (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16027976#comment-16027976
 ] 

Takeshi Yamamuro commented on SPARK-20895:
--

okay, thanks

> Support fast execution based on an optimized plan and parameter placeholders
> 
>
> Key: SPARK-20895
> URL: https://issues.apache.org/jira/browse/SPARK-20895
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.1.1
>Reporter: Takeshi Yamamuro
>Priority: Minor
>
> In database scenarios, users sometimes use parameterized queries for repeated 
> execution (e.g., by using prepared statements).
> So, I think this functionality is also useful for Spark users.
> What I suggest here seems to be like:
> My prototype here: 
> https://github.com/apache/spark/compare/master...maropu:PreparedStmt2
> {code}
> scala> Seq((1, 2), (2, 3)).toDF("col1", "col2").createOrReplaceTempView("t")
> // Define a query with a parameter placeholder named `val`
> scala> val df = sql("SELECT * FROM t WHERE col1 = $val")
> scala> df.explain
> == Physical Plan ==
> *Project [_1#13 AS col1#16, _2#14 AS col2#17]
> +- *Filter (_1#13 = cast(parameterholder(val) as int))
>+- LocalTableScan [_1#13, _2#14]
> // Apply optimizer rules and get an optimized logical plan with the parameter 
> placeholder
> scala> val preparedDf = df.prepared
> // Bind an actual value and do execution
> scala> preparedDf.bindParam("val", 1).show()
> +++
> |col1|col2|
> +++
> |   1|   2|
> +++
> {code}
> To implement this, my prototype adds a new expression leaf node named 
> `ParameterHolder`.
> In a binding phase, this node is replaced with `Literal` including an actual 
> value by using `bindParam`.
> Currently, Spark sometimes consumes much time to rewrite logical plans in 
> `Optimizer` (e.g. constant propagation desribed in SPARK-19846).
> So, I feel this approach is also helpful in that case:
> {code}
> def timer[R](f: => {}): Unit = {
>   val count = 9
>   val iters = (0 until count).map { i =>
> val t0 = System.nanoTime()
> f
> val t1 = System.nanoTime()
> val elapsed = t1 - t0 + 0.0
> println(s"#$i: ${elapsed / 10.0}")
> elapsed
>   }
>   println("Avg. Elapsed Time: " + ((iters.sum / count) / 10.0) + "s")
> }
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.types._
> val numCols = 50
> val df = spark.range(100).selectExpr((0 until numCols).map(i => s"id AS 
> _c$i"): _*)
> // Add conditions to take much time in Optimizer
> val filter = (0 until 128).foldLeft(lit(false))((e, i) => 
> e.or(df.col(df.columns(i % numCols)) === (rand() * 10).cast("int")))
> val df2 = df.filter(filter).sort(df.columns(0))
> // Regular path
> timer {
>   df2.filter(df2.col(df2.columns(0)) === lit(3)).collect
>   df2.filter(df2.col(df2.columns(0)) === lit(4)).collect
>   df2.filter(df2.col(df2.columns(0)) === lit(5)).collect
>   df2.filter(df2.col(df2.columns(0)) === lit(6)).collect
>   df2.filter(df2.col(df2.columns(0)) === lit(7)).collect
>   df2.filter(df2.col(df2.columns(0)) === lit(8)).collect
> }
> #0: 24.178487906
> #1: 22.619839888
> #2: 22.318617035
> #3: 22.131305502
> #4: 22.532095611
> #5: 22.245152778
> #6: 22.314114847
> #7: 22.284385952
> #8: 22.053593855
> Avg. Elapsed Time: 22.51973259712s
> // Prepared path
> val df3b = df2.filter(df2.col(df2.columns(0)) === param("val")).prepared
> timer {
>   df3b.bindParam("val", 3).collect
>   df3b.bindParam("val", 4).collect
>   df3b.bindParam("val", 5).collect
>   df3b.bindParam("val", 6).collect
>   df3b.bindParam("val", 7).collect
>   df3b.bindParam("val", 8).collect
> }
> #0: 0.744693912
> #1: 0.743187129
> #2: 0.74513
> #3: 0.721668718
> #4: 0.757573342
> #5: 0.763240883
> #6: 0.731287275
> #7: 0.728740601
> #8: 0.674275592
> Avg. Elapsed Time: 0.734418606112s
> {code}
> I'm not sure this approach is acceptable, so welcome any suggestion and 
> advice.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Closed] (SPARK-20895) Support fast execution based on an optimized plan and parameter placeholders

2017-05-28 Thread Reynold Xin (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20895?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Reynold Xin closed SPARK-20895.
---
Resolution: Later

> Support fast execution based on an optimized plan and parameter placeholders
> 
>
> Key: SPARK-20895
> URL: https://issues.apache.org/jira/browse/SPARK-20895
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.1.1
>Reporter: Takeshi Yamamuro
>Priority: Minor
>
> In database scenarios, users sometimes use parameterized queries for repeated 
> execution (e.g., by using prepared statements).
> So, I think this functionality is also useful for Spark users.
> What I suggest here seems to be like:
> My prototype here: 
> https://github.com/apache/spark/compare/master...maropu:PreparedStmt2
> {code}
> scala> Seq((1, 2), (2, 3)).toDF("col1", "col2").createOrReplaceTempView("t")
> // Define a query with a parameter placeholder named `val`
> scala> val df = sql("SELECT * FROM t WHERE col1 = $val")
> scala> df.explain
> == Physical Plan ==
> *Project [_1#13 AS col1#16, _2#14 AS col2#17]
> +- *Filter (_1#13 = cast(parameterholder(val) as int))
>+- LocalTableScan [_1#13, _2#14]
> // Apply optimizer rules and get an optimized logical plan with the parameter 
> placeholder
> scala> val preparedDf = df.prepared
> // Bind an actual value and do execution
> scala> preparedDf.bindParam("val", 1).show()
> +++
> |col1|col2|
> +++
> |   1|   2|
> +++
> {code}
> To implement this, my prototype adds a new expression leaf node named 
> `ParameterHolder`.
> In a binding phase, this node is replaced with `Literal` including an actual 
> value by using `bindParam`.
> Currently, Spark sometimes consumes much time to rewrite logical plans in 
> `Optimizer` (e.g. constant propagation desribed in SPARK-19846).
> So, I feel this approach is also helpful in that case:
> {code}
> def timer[R](f: => {}): Unit = {
>   val count = 9
>   val iters = (0 until count).map { i =>
> val t0 = System.nanoTime()
> f
> val t1 = System.nanoTime()
> val elapsed = t1 - t0 + 0.0
> println(s"#$i: ${elapsed / 10.0}")
> elapsed
>   }
>   println("Avg. Elapsed Time: " + ((iters.sum / count) / 10.0) + "s")
> }
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.types._
> val numCols = 50
> val df = spark.range(100).selectExpr((0 until numCols).map(i => s"id AS 
> _c$i"): _*)
> // Add conditions to take much time in Optimizer
> val filter = (0 until 128).foldLeft(lit(false))((e, i) => 
> e.or(df.col(df.columns(i % numCols)) === (rand() * 10).cast("int")))
> val df2 = df.filter(filter).sort(df.columns(0))
> // Regular path
> timer {
>   df2.filter(df2.col(df2.columns(0)) === lit(3)).collect
>   df2.filter(df2.col(df2.columns(0)) === lit(4)).collect
>   df2.filter(df2.col(df2.columns(0)) === lit(5)).collect
>   df2.filter(df2.col(df2.columns(0)) === lit(6)).collect
>   df2.filter(df2.col(df2.columns(0)) === lit(7)).collect
>   df2.filter(df2.col(df2.columns(0)) === lit(8)).collect
> }
> #0: 24.178487906
> #1: 22.619839888
> #2: 22.318617035
> #3: 22.131305502
> #4: 22.532095611
> #5: 22.245152778
> #6: 22.314114847
> #7: 22.284385952
> #8: 22.053593855
> Avg. Elapsed Time: 22.51973259712s
> // Prepared path
> val df3b = df2.filter(df2.col(df2.columns(0)) === param("val")).prepared
> timer {
>   df3b.bindParam("val", 3).collect
>   df3b.bindParam("val", 4).collect
>   df3b.bindParam("val", 5).collect
>   df3b.bindParam("val", 6).collect
>   df3b.bindParam("val", 7).collect
>   df3b.bindParam("val", 8).collect
> }
> #0: 0.744693912
> #1: 0.743187129
> #2: 0.74513
> #3: 0.721668718
> #4: 0.757573342
> #5: 0.763240883
> #6: 0.731287275
> #7: 0.728740601
> #8: 0.674275592
> Avg. Elapsed Time: 0.734418606112s
> {code}
> I'm not sure this approach is acceptable, so welcome any suggestion and 
> advice.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20895) Support fast execution based on an optimized plan and parameter placeholders

2017-05-28 Thread Reynold Xin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16027972#comment-16027972
 ] 

Reynold Xin commented on SPARK-20895:
-

Can you create a separate ticket to discuss prepare and cursors? Might be 
useful to have a paragraph or two to explain what they do. Thanks.


> Support fast execution based on an optimized plan and parameter placeholders
> 
>
> Key: SPARK-20895
> URL: https://issues.apache.org/jira/browse/SPARK-20895
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.1.1
>Reporter: Takeshi Yamamuro
>Priority: Minor
>
> In database scenarios, users sometimes use parameterized queries for repeated 
> execution (e.g., by using prepared statements).
> So, I think this functionality is also useful for Spark users.
> What I suggest here seems to be like:
> My prototype here: 
> https://github.com/apache/spark/compare/master...maropu:PreparedStmt2
> {code}
> scala> Seq((1, 2), (2, 3)).toDF("col1", "col2").createOrReplaceTempView("t")
> // Define a query with a parameter placeholder named `val`
> scala> val df = sql("SELECT * FROM t WHERE col1 = $val")
> scala> df.explain
> == Physical Plan ==
> *Project [_1#13 AS col1#16, _2#14 AS col2#17]
> +- *Filter (_1#13 = cast(parameterholder(val) as int))
>+- LocalTableScan [_1#13, _2#14]
> // Apply optimizer rules and get an optimized logical plan with the parameter 
> placeholder
> scala> val preparedDf = df.prepared
> // Bind an actual value and do execution
> scala> preparedDf.bindParam("val", 1).show()
> +++
> |col1|col2|
> +++
> |   1|   2|
> +++
> {code}
> To implement this, my prototype adds a new expression leaf node named 
> `ParameterHolder`.
> In a binding phase, this node is replaced with `Literal` including an actual 
> value by using `bindParam`.
> Currently, Spark sometimes consumes much time to rewrite logical plans in 
> `Optimizer` (e.g. constant propagation desribed in SPARK-19846).
> So, I feel this approach is also helpful in that case:
> {code}
> def timer[R](f: => {}): Unit = {
>   val count = 9
>   val iters = (0 until count).map { i =>
> val t0 = System.nanoTime()
> f
> val t1 = System.nanoTime()
> val elapsed = t1 - t0 + 0.0
> println(s"#$i: ${elapsed / 10.0}")
> elapsed
>   }
>   println("Avg. Elapsed Time: " + ((iters.sum / count) / 10.0) + "s")
> }
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.types._
> val numCols = 50
> val df = spark.range(100).selectExpr((0 until numCols).map(i => s"id AS 
> _c$i"): _*)
> // Add conditions to take much time in Optimizer
> val filter = (0 until 128).foldLeft(lit(false))((e, i) => 
> e.or(df.col(df.columns(i % numCols)) === (rand() * 10).cast("int")))
> val df2 = df.filter(filter).sort(df.columns(0))
> // Regular path
> timer {
>   df2.filter(df2.col(df2.columns(0)) === lit(3)).collect
>   df2.filter(df2.col(df2.columns(0)) === lit(4)).collect
>   df2.filter(df2.col(df2.columns(0)) === lit(5)).collect
>   df2.filter(df2.col(df2.columns(0)) === lit(6)).collect
>   df2.filter(df2.col(df2.columns(0)) === lit(7)).collect
>   df2.filter(df2.col(df2.columns(0)) === lit(8)).collect
> }
> #0: 24.178487906
> #1: 22.619839888
> #2: 22.318617035
> #3: 22.131305502
> #4: 22.532095611
> #5: 22.245152778
> #6: 22.314114847
> #7: 22.284385952
> #8: 22.053593855
> Avg. Elapsed Time: 22.51973259712s
> // Prepared path
> val df3b = df2.filter(df2.col(df2.columns(0)) === param("val")).prepared
> timer {
>   df3b.bindParam("val", 3).collect
>   df3b.bindParam("val", 4).collect
>   df3b.bindParam("val", 5).collect
>   df3b.bindParam("val", 6).collect
>   df3b.bindParam("val", 7).collect
>   df3b.bindParam("val", 8).collect
> }
> #0: 0.744693912
> #1: 0.743187129
> #2: 0.74513
> #3: 0.721668718
> #4: 0.757573342
> #5: 0.763240883
> #6: 0.731287275
> #7: 0.728740601
> #8: 0.674275592
> Avg. Elapsed Time: 0.734418606112s
> {code}
> I'm not sure this approach is acceptable, so welcome any suggestion and 
> advice.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-20881) Clearly document the mechanism to choose between two sources of statistics

2017-05-28 Thread Xiao Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20881?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiao Li resolved SPARK-20881.
-
   Resolution: Fixed
 Assignee: Zhenhua Wang
Fix Version/s: 2.3.0

> Clearly document the mechanism to choose between two sources of statistics
> --
>
> Key: SPARK-20881
> URL: https://issues.apache.org/jira/browse/SPARK-20881
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Zhenhua Wang
>Assignee: Zhenhua Wang
> Fix For: 2.3.0
>
>
> Currently in Spark, statistics are generated by "analyze" commands. 
> When user updates the table and collects stats in Hive, "totalSize"/"numRows" 
> will be updated in metastore. 
> Then, in spark side, table stats becomes stale and is different from Hive's 
> stats. 
> This is expected. Currently, we have two sources of statistics, i.e. Spark's 
> stats and Hive's stats. In our design, once Spark's stats is available, we 
> respect it over Hive's stats.
> If a user generated stats at Spark side, it's his responsibility to update 
> Spark's stats by re-running analyze commands.
> But we should clearly document in related code the mechanism to choose 
> between these two sources of stats.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-20841) Support table column aliases in FROM clause

2017-05-28 Thread Xiao Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20841?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiao Li reassigned SPARK-20841:
---

Assignee: Takeshi Yamamuro

> Support table column aliases in FROM clause
> ---
>
> Key: SPARK-20841
> URL: https://issues.apache.org/jira/browse/SPARK-20841
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.0.0
>Reporter: Josh Rosen
>Assignee: Takeshi Yamamuro
>Priority: Minor
> Fix For: 2.3.0
>
>
> Some SQL dialects support a relatively obscure "table column aliases" feature 
> where you can rename columns when aliasing a relation in a {{FROM}} clause. 
> For example:
> {code}
> SELECT * FROM onecolumn AS a(x) JOIN onecolumn AS b(y) ON a.x = b.y
> {code}
> Spark does not currently support this. I would like to add support for this 
> in order to allow me to run a corpus of existing queries which depend on this 
> syntax.
> There's a good writeup on this at 
> http://modern-sql.com/feature/table-column-aliases, which has additional 
> examples and describes other databases' degrees of support for this feature.
> One tricky thing to figure out will be whether FROM clause column aliases 
> take precedence over aliases in the SELECT clause. When adding support for 
> this, we should make sure to add sufficient testing of several corner-cases, 
> including:
> * Aliasing in both the SELECT and FROM clause
> * Aliasing columns in the FROM clause both with and without an explicit AS.
> * Aliasing the wrong number of columns in the FROM clause, both greater and 
> fewer columns than were selected in the SELECT clause.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-20841) Support table column aliases in FROM clause

2017-05-28 Thread Xiao Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20841?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiao Li resolved SPARK-20841.
-
   Resolution: Fixed
Fix Version/s: 2.3.0

> Support table column aliases in FROM clause
> ---
>
> Key: SPARK-20841
> URL: https://issues.apache.org/jira/browse/SPARK-20841
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.0.0
>Reporter: Josh Rosen
>Assignee: Takeshi Yamamuro
>Priority: Minor
> Fix For: 2.3.0
>
>
> Some SQL dialects support a relatively obscure "table column aliases" feature 
> where you can rename columns when aliasing a relation in a {{FROM}} clause. 
> For example:
> {code}
> SELECT * FROM onecolumn AS a(x) JOIN onecolumn AS b(y) ON a.x = b.y
> {code}
> Spark does not currently support this. I would like to add support for this 
> in order to allow me to run a corpus of existing queries which depend on this 
> syntax.
> There's a good writeup on this at 
> http://modern-sql.com/feature/table-column-aliases, which has additional 
> examples and describes other databases' degrees of support for this feature.
> One tricky thing to figure out will be whether FROM clause column aliases 
> take precedence over aliases in the SELECT clause. When adding support for 
> this, we should make sure to add sufficient testing of several corner-cases, 
> including:
> * Aliasing in both the SELECT and FROM clause
> * Aliasing columns in the FROM clause both with and without an explicit AS.
> * Aliasing the wrong number of columns in the FROM clause, both greater and 
> fewer columns than were selected in the SELECT clause.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20877) Investigate if tests will time out on CRAN

2017-05-28 Thread Shivaram Venkataraman (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16027870#comment-16027870
 ] 

Shivaram Venkataraman commented on SPARK-20877:
---

I managed to get the tests to pass on a Windows VM (Note that the VM has only 1 
core and 4G of memory ). The timing breakdown is at 
https://gist.github.com/shivaram/dc235c50b6369cbc60d859c25b13670d and the 
overall run time was close to 1hr.  I think AppVeyor might have a beefier 
machine ?

Anyways the most expensive tests to run remain to be the same across linux and 
windows -- I think we can disable them when running on CRAN / Windows ? Are 
there other options we have to make these tests run faster ?

> Investigate if tests will time out on CRAN
> --
>
> Key: SPARK-20877
> URL: https://issues.apache.org/jira/browse/SPARK-20877
> Project: Spark
>  Issue Type: Sub-task
>  Components: SparkR
>Affects Versions: 2.2.0
>Reporter: Felix Cheung
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-20881) Clearly document the mechanism to choose between two sources of statistics

2017-05-28 Thread Zhenhua Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20881?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhenhua Wang updated SPARK-20881:
-
Summary: Clearly document the mechanism to choose between two sources of 
statistics  (was: Use Hive's stats in metastore when cbo is disabled)

> Clearly document the mechanism to choose between two sources of statistics
> --
>
> Key: SPARK-20881
> URL: https://issues.apache.org/jira/browse/SPARK-20881
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Zhenhua Wang
>
> Currently statistics are generated by "analyze command" in Spark. 
> However, when user updates the table and collects stats in Hive, 
> "totalSize"/"numRows" will be updated in metastore. 
> Now, in spark side, table stats become stale. 
> If cbo is enabled, this is ok because we suppose user will handle this and 
> re-run the command to update  stats. 
> If cbo is disabled, then we should fallback to original way and respect 
> hive's stats. But in current implementation, spark's stats always override 
> hive's stats, no matter cbo is enabled or disabled.
> The right thing to do is to use (don't override) hive's stats when cbo is 
> disabled.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-20881) Clearly document the mechanism to choose between two sources of statistics

2017-05-28 Thread Zhenhua Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20881?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhenhua Wang updated SPARK-20881:
-
Description: 
Currently in Spark, statistics are generated by "analyze" commands. 
When user updates the table and collects stats in Hive, "totalSize"/"numRows" 
will be updated in metastore. 
Then, in spark side, table stats becomes stale and is different from Hive's 
stats. 

This is expected. Currently, we have two sources of statistics, i.e. Spark's 
stats and Hive's stats. In our design, once Spark's stats is available, we 
respect it over Hive's stats.
If a user generated stats at Spark side, it's his responsibility to update 
Spark's stats by re-running analyze commands.

But we should clearly document in related code the mechanism to choose between 
these two sources of stats.


  was:
Currently statistics are generated by "analyze command" in Spark. 

However, when user updates the table and collects stats in Hive, 
"totalSize"/"numRows" will be updated in metastore. 

Now, in spark side, table stats become stale. 
If cbo is enabled, this is ok because we suppose user will handle this and 
re-run the command to update  stats. 
If cbo is disabled, then we should fallback to original way and respect hive's 
stats. But in current implementation, spark's stats always override hive's 
stats, no matter cbo is enabled or disabled.

The right thing to do is to use (don't override) hive's stats when cbo is 
disabled.


> Clearly document the mechanism to choose between two sources of statistics
> --
>
> Key: SPARK-20881
> URL: https://issues.apache.org/jira/browse/SPARK-20881
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Zhenhua Wang
>
> Currently in Spark, statistics are generated by "analyze" commands. 
> When user updates the table and collects stats in Hive, "totalSize"/"numRows" 
> will be updated in metastore. 
> Then, in spark side, table stats becomes stale and is different from Hive's 
> stats. 
> This is expected. Currently, we have two sources of statistics, i.e. Spark's 
> stats and Hive's stats. In our design, once Spark's stats is available, we 
> respect it over Hive's stats.
> If a user generated stats at Spark side, it's his responsibility to update 
> Spark's stats by re-running analyze commands.
> But we should clearly document in related code the mechanism to choose 
> between these two sources of stats.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-20910) Build-in SQL Function Support - UUID

2017-05-28 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20910?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-20910:


Assignee: Apache Spark

> Build-in SQL Function Support - UUID
> 
>
> Key: SPARK-20910
> URL: https://issues.apache.org/jira/browse/SPARK-20910
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Yuming Wang
>Assignee: Apache Spark
>  Labels: starter
>
> {code:sql}
> UUID()
> {code}
> Returns a universally unique identifier (UUID) string.
> Ref: 
> https://dev.mysql.com/doc/refman/5.5/en/miscellaneous-functions.html#function_uuid



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-20910) Build-in SQL Function Support - UUID

2017-05-28 Thread Apache Spark (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20910?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-20910:


Assignee: (was: Apache Spark)

> Build-in SQL Function Support - UUID
> 
>
> Key: SPARK-20910
> URL: https://issues.apache.org/jira/browse/SPARK-20910
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Yuming Wang
>  Labels: starter
>
> {code:sql}
> UUID()
> {code}
> Returns a universally unique identifier (UUID) string.
> Ref: 
> https://dev.mysql.com/doc/refman/5.5/en/miscellaneous-functions.html#function_uuid



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20910) Build-in SQL Function Support - UUID

2017-05-28 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16027752#comment-16027752
 ] 

Apache Spark commented on SPARK-20910:
--

User 'wangyum' has created a pull request for this issue:
https://github.com/apache/spark/pull/18136

> Build-in SQL Function Support - UUID
> 
>
> Key: SPARK-20910
> URL: https://issues.apache.org/jira/browse/SPARK-20910
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Yuming Wang
>  Labels: starter
>
> {code:sql}
> UUID()
> {code}
> Returns a universally unique identifier (UUID) string.
> Ref: 
> https://dev.mysql.com/doc/refman/5.5/en/miscellaneous-functions.html#function_uuid



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-20910) Build-in SQL Function Support - UUID

2017-05-28 Thread Yuming Wang (JIRA)
Yuming Wang created SPARK-20910:
---

 Summary: Build-in SQL Function Support - UUID
 Key: SPARK-20910
 URL: https://issues.apache.org/jira/browse/SPARK-20910
 Project: Spark
  Issue Type: Sub-task
  Components: SQL
Affects Versions: 2.2.0
Reporter: Yuming Wang


{code:sql}
UUID()
{code}
Returns a universally unique identifier (UUID) string.

Ref: 
https://dev.mysql.com/doc/refman/5.5/en/miscellaneous-functions.html#function_uuid



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20882) Executor is waiting for ShuffleBlockFetcherIterator

2017-05-28 Thread cen yuhai (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16027743#comment-16027743
 ] 

cen yuhai commented on SPARK-20882:
---

[~zsxwing] It is my problem... It is none of spark's busniess.

> Executor is waiting for ShuffleBlockFetcherIterator
> ---
>
> Key: SPARK-20882
> URL: https://issues.apache.org/jira/browse/SPARK-20882
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.0, 2.1.1
>Reporter: cen yuhai
> Attachments: executor_jstack, executor_log, screenshot-1.png, 
> screenshot-2.png, screenshot-3.png
>
>
> This bug is like https://issues.apache.org/jira/browse/SPARK-19300.
> but I have updated my client netty version to 4.0.43.Final.
> The shuffle service handler is still 4.0.42.Final
> spark.sql.adaptive.enabled is true
> {code}
> "Executor task launch worker for task 4808985" #5373 daemon prio=5 os_prio=0 
> tid=0x7f54ef437000 nid=0x1aed0 waiting on condition [0x7f53aebfe000]
> java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> parking to wait for <0x000498c249c0> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:189)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
> at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
> at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:332)
> at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:58)
> at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at 
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
> at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at 
> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:199)
> at 
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:97)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
> at org.apache.spark.scheduler.Task.run(Task.scala:114)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:323)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1147)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:622)
> at java.lang.Thread.run(Thread.java:834)
> {code}
> {code}
> 17/05/26 12:04:06 DEBUG ShuffleBlockFetcherIterator: remainingBlocks: 
> Set(shuffle_5_1431_805, shuffle_5_1431_808, shuffle_5_1431_806, 
> shuffle_5_1431_809, shuffle_5_1431_807)
> 17/05/26 12:04:06 DEBUG ShuffleBlockFetcherIterator: remainingBlocks: 
> Set(shuffle_5_1431_808, shuffle_5_1431_806, shuffle_5_1431_809, 
> shuffle_5_1431_807)
> 17/05/26 12:04:06 DEBUG ShuffleBlockFetcherIterator: remainingBlocks: 
> Set(shuffle_5_1431_808, shuffle_5_1431_809, shuffle_5_1431_807)
> 17/05/26 12:04:06 DEBUG ShuffleBlockFetcherIterator: remainingBlocks: 
> Set(shuffle_5_1431_808, shuffle_5_1431_809)
> 17/05/26 12:04:06 DEBUG ShuffleBlockFetcherIterator: remainingBlocks: 
> Set(shuffle_5_1431_809)
> 17/05/26 12:04:06 DEBUG ShuffleBlockFetcherIterator: remainingBlocks: Set()
> 17/05/26 12:04:06 DEBUG ShuffleBlockFetcherIterator: Number of requests in 
> flight 21
> 17/05/26 12:04:06 DEBUG ShuffleBlockFetcherIterator: Number of requests in 
> flight 20
> 17/05/26 12:04:06 DEBUG ShuffleBlockFetcherIterator: Number of requests in 
> flight 19
> 17/05/26 12:04:06 DEBUG ShuffleBlockFetcherIterator: Number of requests in 
> flight 18
> 17/05/26 12:04:06 DEBUG ShuffleBlockFetcherIterator: Number of requests in 
> flight 17
> 17/05/26 12:04:06 DEBUG ShuffleBlockFetcherIterator: Number of requests in 
> flight 16
> 17/05/26 12:04:06 DEBUG ShuffleBlockFetcherIterator: Number of requests in 
> flight 15
> 17/05/26 12:04:06 DEBUG ShuffleBlockFetcherIterator: Number of requests in 
> flight 14
> 17/05/26 12:04:06 DEBUG ShuffleBlockFetcherIterator: Number of requests in 
> flight 13
> 17/05/26 12:04:06 DEBUG ShuffleBlockFetcherIterator: Number of requests in 
> flight 12
> 17/05/26 12:04:06 DEBUG 

[jira] [Commented] (SPARK-20895) Support fast execution based on an optimized plan and parameter placeholders

2017-05-28 Thread Takeshi Yamamuro (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16027742#comment-16027742
 ] 

Takeshi Yamamuro commented on SPARK-20895:
--

I saw the issues tickets in https://issues.apache.org/jira/browse/SPARK-19846 
and https://issues.apache.org/jira/browse/SPARK-19875. But, yea, your 
suggestion seems reasonable to me (Later, I'll close this as 'Won't fix'). BTW, 
(this is not directly related to this ticket though...), is it still worth 
implementing `PREPARE` statement in Spark? This is the ANSI/SQL standard 
(http://developer.mimer.com/documentation/latest_html/Mimer_SQL_Engine_DocSet/Embedded_SQL9.html)
 and it is some useful for users who frequently use the same queries.  

> Support fast execution based on an optimized plan and parameter placeholders
> 
>
> Key: SPARK-20895
> URL: https://issues.apache.org/jira/browse/SPARK-20895
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.1.1
>Reporter: Takeshi Yamamuro
>Priority: Minor
>
> In database scenarios, users sometimes use parameterized queries for repeated 
> execution (e.g., by using prepared statements).
> So, I think this functionality is also useful for Spark users.
> What I suggest here seems to be like:
> My prototype here: 
> https://github.com/apache/spark/compare/master...maropu:PreparedStmt2
> {code}
> scala> Seq((1, 2), (2, 3)).toDF("col1", "col2").createOrReplaceTempView("t")
> // Define a query with a parameter placeholder named `val`
> scala> val df = sql("SELECT * FROM t WHERE col1 = $val")
> scala> df.explain
> == Physical Plan ==
> *Project [_1#13 AS col1#16, _2#14 AS col2#17]
> +- *Filter (_1#13 = cast(parameterholder(val) as int))
>+- LocalTableScan [_1#13, _2#14]
> // Apply optimizer rules and get an optimized logical plan with the parameter 
> placeholder
> scala> val preparedDf = df.prepared
> // Bind an actual value and do execution
> scala> preparedDf.bindParam("val", 1).show()
> +++
> |col1|col2|
> +++
> |   1|   2|
> +++
> {code}
> To implement this, my prototype adds a new expression leaf node named 
> `ParameterHolder`.
> In a binding phase, this node is replaced with `Literal` including an actual 
> value by using `bindParam`.
> Currently, Spark sometimes consumes much time to rewrite logical plans in 
> `Optimizer` (e.g. constant propagation desribed in SPARK-19846).
> So, I feel this approach is also helpful in that case:
> {code}
> def timer[R](f: => {}): Unit = {
>   val count = 9
>   val iters = (0 until count).map { i =>
> val t0 = System.nanoTime()
> f
> val t1 = System.nanoTime()
> val elapsed = t1 - t0 + 0.0
> println(s"#$i: ${elapsed / 10.0}")
> elapsed
>   }
>   println("Avg. Elapsed Time: " + ((iters.sum / count) / 10.0) + "s")
> }
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.types._
> val numCols = 50
> val df = spark.range(100).selectExpr((0 until numCols).map(i => s"id AS 
> _c$i"): _*)
> // Add conditions to take much time in Optimizer
> val filter = (0 until 128).foldLeft(lit(false))((e, i) => 
> e.or(df.col(df.columns(i % numCols)) === (rand() * 10).cast("int")))
> val df2 = df.filter(filter).sort(df.columns(0))
> // Regular path
> timer {
>   df2.filter(df2.col(df2.columns(0)) === lit(3)).collect
>   df2.filter(df2.col(df2.columns(0)) === lit(4)).collect
>   df2.filter(df2.col(df2.columns(0)) === lit(5)).collect
>   df2.filter(df2.col(df2.columns(0)) === lit(6)).collect
>   df2.filter(df2.col(df2.columns(0)) === lit(7)).collect
>   df2.filter(df2.col(df2.columns(0)) === lit(8)).collect
> }
> #0: 24.178487906
> #1: 22.619839888
> #2: 22.318617035
> #3: 22.131305502
> #4: 22.532095611
> #5: 22.245152778
> #6: 22.314114847
> #7: 22.284385952
> #8: 22.053593855
> Avg. Elapsed Time: 22.51973259712s
> // Prepared path
> val df3b = df2.filter(df2.col(df2.columns(0)) === param("val")).prepared
> timer {
>   df3b.bindParam("val", 3).collect
>   df3b.bindParam("val", 4).collect
>   df3b.bindParam("val", 5).collect
>   df3b.bindParam("val", 6).collect
>   df3b.bindParam("val", 7).collect
>   df3b.bindParam("val", 8).collect
> }
> #0: 0.744693912
> #1: 0.743187129
> #2: 0.74513
> #3: 0.721668718
> #4: 0.757573342
> #5: 0.763240883
> #6: 0.731287275
> #7: 0.728740601
> #8: 0.674275592
> Avg. Elapsed Time: 0.734418606112s
> {code}
> I'm not sure this approach is acceptable, so welcome any suggestion and 
> advice.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Issue Comment Deleted] (SPARK-20909) Build-in SQL Function Support - DAYOFWEEK

2017-05-28 Thread Yuming Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20909?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuming Wang updated SPARK-20909:

Comment: was deleted

(was: I'm work on this.)

> Build-in SQL Function Support - DAYOFWEEK
> -
>
> Key: SPARK-20909
> URL: https://issues.apache.org/jira/browse/SPARK-20909
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Yuming Wang
>  Labels: starter
>
> {noformat}
> DAYOFWEEK(date)
> {noformat}
> Return the weekday index of the argument.
> Ref: 
> https://dev.mysql.com/doc/refman/5.5/en/date-and-time-functions.html#function_dayofweek



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-20199) GradientBoostedTreesModel doesn't have featureSubsetStrategy parameter

2017-05-28 Thread pralabhkumar (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16027725#comment-16027725
 ] 

pralabhkumar edited comment on SPARK-20199 at 5/28/17 7:12 AM:
---

[~peng.m...@intel.com][~facai] [~srowen]

Please review the pull request /approach,.


was (Author: pralabhkumar):
[~arushkharbanda][~peng.m...@intel.com][~facai] [~srowen]

Please review the pull request /approach,.

> GradientBoostedTreesModel doesn't have  featureSubsetStrategy parameter
> ---
>
> Key: SPARK-20199
> URL: https://issues.apache.org/jira/browse/SPARK-20199
> Project: Spark
>  Issue Type: Improvement
>  Components: ML, MLlib
>Affects Versions: 2.1.0
>Reporter: pralabhkumar
>
> Spark GradientBoostedTreesModel doesn't have featureSubsetStrategy . It Uses 
> random forest internally ,which have featureSubsetStrategy hardcoded "all". 
> It should be provided by the user to have randomness at the feature level.
> This parameter is available in H2O and XGBoost. 
> Sample from H2O.ai 
> gbmParams._col_sample_rate
> Please provide the parameter . 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20199) GradientBoostedTreesModel doesn't have featureSubsetStrategy parameter

2017-05-28 Thread pralabhkumar (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16027725#comment-16027725
 ] 

pralabhkumar commented on SPARK-20199:
--

[~arushkharbanda][~peng.m...@intel.com][~facai] [~srowen]

Please review the pull request /approach,.

> GradientBoostedTreesModel doesn't have  featureSubsetStrategy parameter
> ---
>
> Key: SPARK-20199
> URL: https://issues.apache.org/jira/browse/SPARK-20199
> Project: Spark
>  Issue Type: Improvement
>  Components: ML, MLlib
>Affects Versions: 2.1.0
>Reporter: pralabhkumar
>
> Spark GradientBoostedTreesModel doesn't have featureSubsetStrategy . It Uses 
> random forest internally ,which have featureSubsetStrategy hardcoded "all". 
> It should be provided by the user to have randomness at the feature level.
> This parameter is available in H2O and XGBoost. 
> Sample from H2O.ai 
> gbmParams._col_sample_rate
> Please provide the parameter . 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20877) Investigate if tests will time out on CRAN

2017-05-28 Thread Felix Cheung (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16027720#comment-16027720
 ] 

Felix Cheung commented on SPARK-20877:
--

Have a run on AppVeyor
https://ci.appveyor.com/project/ApacheSoftwareFoundation/spark/build/1360-master

it ran for 31 min (vs. on Jenkins, <7min)

> Investigate if tests will time out on CRAN
> --
>
> Key: SPARK-20877
> URL: https://issues.apache.org/jira/browse/SPARK-20877
> Project: Spark
>  Issue Type: Sub-task
>  Components: SparkR
>Affects Versions: 2.2.0
>Reporter: Felix Cheung
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org