[jira] [Comment Edited] (SPARK-19809) NullPointerException on empty ORC file
[ https://issues.apache.org/jira/browse/SPARK-19809?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16027987#comment-16027987 ] Hyukjin Kwon edited comment on SPARK-19809 at 5/29/17 3:21 AM: --- Yea, I agree that it should be dependent on the format specification/implementation, whether it is malformed or not. I think Parquet itself treats 0 bytes files as malformed file because it should read footer but it throws an exception up to my knowledge. The former case looks filtering out the whole partitions in {{FileSourceScanExec}}. Parquet requires to read the footers and it throws an exception, for example, I manually updated the code path to not skip the partitions so that the parquet reader is actually being called as below: {code} java.lang.RuntimeException: file:/.../tmp.abc is not a Parquet file (too small) at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:466) at org.apache.parquet.hadoop.ParquetFileReader.(ParquetFileReader.java:568) at org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:492) at org.apache.parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:166) at org.apache.parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:147) {code} If we don't specify the schema, it also throws an exception as below: {code} spark.read.parquet(".../tmp.abc").show() {code} {code} java.io.IOException: Could not read footer for file: FileStatus{path=file:/.../tmp.abc; isDirectory=false; length=0; replication=0; blocksize=0; modification_time=0; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false} at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:498) at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:485) at scala.collection.parallel.AugmentedIterableIterator$class.flatmap2combiner(RemainsIterator.scala:132) at scala.collection.parallel.immutable.ParVector$ParVectorIterator.flatmap2combiner(ParVector.scala:62) at scala.collection.parallel.ParIterableLike$FlatMap.leaf(ParIterableLike.scala:1072) {code} Assuming it is treated as a malformed file (per the ORC JIRA you pointed out above) for the current status, it looks a malformed file and it sounds we should be able to skip this in client side whether it should be dealt with {{spark.sql.files.ignoreCorruptFiles}} or not. For example, I found a related JIRA - https://issues.apache.org/jira/browse/AVRO-1530 and https://issues.apache.org/jira/browse/HIVE-11977. _If I read this correctly_, Avro looks decided not to change the behaviour but Hive deals with it. Only for this issue, I also agree that this could be a subset of the issues you pointed out. was (Author: hyukjin.kwon): Yea, I agree that it should be dependent on the format specification/implementation, whether it is malformed or not. I think Parquet itself treats 0 bytes files as malformed file because it should read footer but it throws an exception up to my knowledge. The former case looks filtering out the whole partitions in {{DataSourceScanExec}}. Parquet requires to read the footers and it throws an exception, for example, I manually updated the code path to not skip the partitions so that the parquet reader is actually being called as below: {code} java.lang.RuntimeException: file:/.../tmp.abc is not a Parquet file (too small) at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:466) at org.apache.parquet.hadoop.ParquetFileReader.(ParquetFileReader.java:568) at org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:492) at org.apache.parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:166) at org.apache.parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:147) {code} If we don't specify the schema, it also throws an exception as below: {code} spark.read.parquet(".../tmp.abc").show() {code} {code} java.io.IOException: Could not read footer for file: FileStatus{path=file:/.../tmp.abc; isDirectory=false; length=0; replication=0; blocksize=0; modification_time=0; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false} at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:498) at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:485) at scala.collection.parallel.AugmentedIterableIterator$class.flatmap2combiner(RemainsIterator.scala:132)
[jira] [Commented] (SPARK-19809) NullPointerException on empty ORC file
[ https://issues.apache.org/jira/browse/SPARK-19809?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16028008#comment-16028008 ] Dongjoon Hyun commented on SPARK-19809: --- Great investigation! Thank you. > NullPointerException on empty ORC file > -- > > Key: SPARK-19809 > URL: https://issues.apache.org/jira/browse/SPARK-19809 > Project: Spark > Issue Type: Bug > Components: Input/Output >Affects Versions: 1.6.3, 2.0.2, 2.1.1 >Reporter: Michał Dawid > > When reading from hive ORC table if there are some 0 byte files we get > NullPointerException: > {code}java.lang.NullPointerException > at > org.apache.hadoop.hive.ql.io.orc.OrcInputFormat$BISplitStrategy.getSplits(OrcInputFormat.java:560) > at > org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.generateSplitsInfo(OrcInputFormat.java:1010) > at > org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.getSplits(OrcInputFormat.java:1048) > at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:199) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:242) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:240) > at scala.Option.getOrElse(Option.scala:120) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:240) > at > org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:242) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:240) > at scala.Option.getOrElse(Option.scala:120) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:240) > at > org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:242) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:240) > at scala.Option.getOrElse(Option.scala:120) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:240) > at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66) > at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at scala.collection.immutable.List.foreach(List.scala:318) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at scala.collection.AbstractTraversable.map(Traversable.scala:105) > at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:242) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:240) > at scala.Option.getOrElse(Option.scala:120) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:240) > at > org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:242) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:240) > at scala.Option.getOrElse(Option.scala:120) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:240) > at > org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:190) > at > org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:165) > at > org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:174) > at > org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1499) > at > org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1499) > at > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56) > at > org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2086) > at > org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$execute$1(DataFrame.scala:1498) > at > org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$collect(DataFrame.scala:1505) > at > org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1375) > at > org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1374) > at org.apache.spark.sql.DataFrame.withCallback(DataFrame.scala:2099) > at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1374) > at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1456) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at >
[jira] [Commented] (SPARK-19809) NullPointerException on empty ORC file
[ https://issues.apache.org/jira/browse/SPARK-19809?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16027987#comment-16027987 ] Hyukjin Kwon commented on SPARK-19809: -- Yea, I agree that it should be dependent on the format specification/implementation, whether it is malformed or not. I think Parquet itself treats 0 bytes files as malformed file because it should read footer but it throws an exception up to my knowledge. The former case looks filtering out the whole partitions in {{DataSourceScanExec}}. Parquet requires to read the footers and it throws an exception, for example, I manually updated the code path to not skip the partitions so that the parquet reader is actually being called as below: {code} java.lang.RuntimeException: file:/.../tmp.abc is not a Parquet file (too small) at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:466) at org.apache.parquet.hadoop.ParquetFileReader.(ParquetFileReader.java:568) at org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:492) at org.apache.parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:166) at org.apache.parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:147) {code} If we don't specify the schema, it also throws an exception as below: {code} spark.read.parquet(".../tmp.abc").show() {code} {code} java.io.IOException: Could not read footer for file: FileStatus{path=file:/.../tmp.abc; isDirectory=false; length=0; replication=0; blocksize=0; modification_time=0; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false} at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:498) at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:485) at scala.collection.parallel.AugmentedIterableIterator$class.flatmap2combiner(RemainsIterator.scala:132) at scala.collection.parallel.immutable.ParVector$ParVectorIterator.flatmap2combiner(ParVector.scala:62) at scala.collection.parallel.ParIterableLike$FlatMap.leaf(ParIterableLike.scala:1072) {code} Assuming it is treated as a malformed file (per the ORC JIRA you pointed out above) for the current status, it looks a malformed file and it sounds we should be able to skip this in client side whether it should be dealt with {{spark.sql.files.ignoreCorruptFiles}} or not. For example, I found a related JIRA - https://issues.apache.org/jira/browse/AVRO-1530 and https://issues.apache.org/jira/browse/HIVE-11977. _If I read this correctly_, Avro looks decided not to change the behaviour but Hive deals with it. Only for this issue, I also agree that this could be a subset of the issues you pointed out. > NullPointerException on empty ORC file > -- > > Key: SPARK-19809 > URL: https://issues.apache.org/jira/browse/SPARK-19809 > Project: Spark > Issue Type: Bug > Components: Input/Output >Affects Versions: 1.6.3, 2.0.2, 2.1.1 >Reporter: Michał Dawid > > When reading from hive ORC table if there are some 0 byte files we get > NullPointerException: > {code}java.lang.NullPointerException > at > org.apache.hadoop.hive.ql.io.orc.OrcInputFormat$BISplitStrategy.getSplits(OrcInputFormat.java:560) > at > org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.generateSplitsInfo(OrcInputFormat.java:1010) > at > org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.getSplits(OrcInputFormat.java:1048) > at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:199) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:242) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:240) > at scala.Option.getOrElse(Option.scala:120) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:240) > at > org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:242) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:240) > at scala.Option.getOrElse(Option.scala:120) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:240) > at > org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:242) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:240) > at scala.Option.getOrElse(Option.scala:120) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:240) > at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66) > at
[jira] [Commented] (SPARK-20895) Support fast execution based on an optimized plan and parameter placeholders
[ https://issues.apache.org/jira/browse/SPARK-20895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16027976#comment-16027976 ] Takeshi Yamamuro commented on SPARK-20895: -- okay, thanks > Support fast execution based on an optimized plan and parameter placeholders > > > Key: SPARK-20895 > URL: https://issues.apache.org/jira/browse/SPARK-20895 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.1.1 >Reporter: Takeshi Yamamuro >Priority: Minor > > In database scenarios, users sometimes use parameterized queries for repeated > execution (e.g., by using prepared statements). > So, I think this functionality is also useful for Spark users. > What I suggest here seems to be like: > My prototype here: > https://github.com/apache/spark/compare/master...maropu:PreparedStmt2 > {code} > scala> Seq((1, 2), (2, 3)).toDF("col1", "col2").createOrReplaceTempView("t") > // Define a query with a parameter placeholder named `val` > scala> val df = sql("SELECT * FROM t WHERE col1 = $val") > scala> df.explain > == Physical Plan == > *Project [_1#13 AS col1#16, _2#14 AS col2#17] > +- *Filter (_1#13 = cast(parameterholder(val) as int)) >+- LocalTableScan [_1#13, _2#14] > // Apply optimizer rules and get an optimized logical plan with the parameter > placeholder > scala> val preparedDf = df.prepared > // Bind an actual value and do execution > scala> preparedDf.bindParam("val", 1).show() > +++ > |col1|col2| > +++ > | 1| 2| > +++ > {code} > To implement this, my prototype adds a new expression leaf node named > `ParameterHolder`. > In a binding phase, this node is replaced with `Literal` including an actual > value by using `bindParam`. > Currently, Spark sometimes consumes much time to rewrite logical plans in > `Optimizer` (e.g. constant propagation desribed in SPARK-19846). > So, I feel this approach is also helpful in that case: > {code} > def timer[R](f: => {}): Unit = { > val count = 9 > val iters = (0 until count).map { i => > val t0 = System.nanoTime() > f > val t1 = System.nanoTime() > val elapsed = t1 - t0 + 0.0 > println(s"#$i: ${elapsed / 10.0}") > elapsed > } > println("Avg. Elapsed Time: " + ((iters.sum / count) / 10.0) + "s") > } > import org.apache.spark.sql.Row > import org.apache.spark.sql.types._ > val numCols = 50 > val df = spark.range(100).selectExpr((0 until numCols).map(i => s"id AS > _c$i"): _*) > // Add conditions to take much time in Optimizer > val filter = (0 until 128).foldLeft(lit(false))((e, i) => > e.or(df.col(df.columns(i % numCols)) === (rand() * 10).cast("int"))) > val df2 = df.filter(filter).sort(df.columns(0)) > // Regular path > timer { > df2.filter(df2.col(df2.columns(0)) === lit(3)).collect > df2.filter(df2.col(df2.columns(0)) === lit(4)).collect > df2.filter(df2.col(df2.columns(0)) === lit(5)).collect > df2.filter(df2.col(df2.columns(0)) === lit(6)).collect > df2.filter(df2.col(df2.columns(0)) === lit(7)).collect > df2.filter(df2.col(df2.columns(0)) === lit(8)).collect > } > #0: 24.178487906 > #1: 22.619839888 > #2: 22.318617035 > #3: 22.131305502 > #4: 22.532095611 > #5: 22.245152778 > #6: 22.314114847 > #7: 22.284385952 > #8: 22.053593855 > Avg. Elapsed Time: 22.51973259712s > // Prepared path > val df3b = df2.filter(df2.col(df2.columns(0)) === param("val")).prepared > timer { > df3b.bindParam("val", 3).collect > df3b.bindParam("val", 4).collect > df3b.bindParam("val", 5).collect > df3b.bindParam("val", 6).collect > df3b.bindParam("val", 7).collect > df3b.bindParam("val", 8).collect > } > #0: 0.744693912 > #1: 0.743187129 > #2: 0.74513 > #3: 0.721668718 > #4: 0.757573342 > #5: 0.763240883 > #6: 0.731287275 > #7: 0.728740601 > #8: 0.674275592 > Avg. Elapsed Time: 0.734418606112s > {code} > I'm not sure this approach is acceptable, so welcome any suggestion and > advice. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Closed] (SPARK-20895) Support fast execution based on an optimized plan and parameter placeholders
[ https://issues.apache.org/jira/browse/SPARK-20895?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Reynold Xin closed SPARK-20895. --- Resolution: Later > Support fast execution based on an optimized plan and parameter placeholders > > > Key: SPARK-20895 > URL: https://issues.apache.org/jira/browse/SPARK-20895 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.1.1 >Reporter: Takeshi Yamamuro >Priority: Minor > > In database scenarios, users sometimes use parameterized queries for repeated > execution (e.g., by using prepared statements). > So, I think this functionality is also useful for Spark users. > What I suggest here seems to be like: > My prototype here: > https://github.com/apache/spark/compare/master...maropu:PreparedStmt2 > {code} > scala> Seq((1, 2), (2, 3)).toDF("col1", "col2").createOrReplaceTempView("t") > // Define a query with a parameter placeholder named `val` > scala> val df = sql("SELECT * FROM t WHERE col1 = $val") > scala> df.explain > == Physical Plan == > *Project [_1#13 AS col1#16, _2#14 AS col2#17] > +- *Filter (_1#13 = cast(parameterholder(val) as int)) >+- LocalTableScan [_1#13, _2#14] > // Apply optimizer rules and get an optimized logical plan with the parameter > placeholder > scala> val preparedDf = df.prepared > // Bind an actual value and do execution > scala> preparedDf.bindParam("val", 1).show() > +++ > |col1|col2| > +++ > | 1| 2| > +++ > {code} > To implement this, my prototype adds a new expression leaf node named > `ParameterHolder`. > In a binding phase, this node is replaced with `Literal` including an actual > value by using `bindParam`. > Currently, Spark sometimes consumes much time to rewrite logical plans in > `Optimizer` (e.g. constant propagation desribed in SPARK-19846). > So, I feel this approach is also helpful in that case: > {code} > def timer[R](f: => {}): Unit = { > val count = 9 > val iters = (0 until count).map { i => > val t0 = System.nanoTime() > f > val t1 = System.nanoTime() > val elapsed = t1 - t0 + 0.0 > println(s"#$i: ${elapsed / 10.0}") > elapsed > } > println("Avg. Elapsed Time: " + ((iters.sum / count) / 10.0) + "s") > } > import org.apache.spark.sql.Row > import org.apache.spark.sql.types._ > val numCols = 50 > val df = spark.range(100).selectExpr((0 until numCols).map(i => s"id AS > _c$i"): _*) > // Add conditions to take much time in Optimizer > val filter = (0 until 128).foldLeft(lit(false))((e, i) => > e.or(df.col(df.columns(i % numCols)) === (rand() * 10).cast("int"))) > val df2 = df.filter(filter).sort(df.columns(0)) > // Regular path > timer { > df2.filter(df2.col(df2.columns(0)) === lit(3)).collect > df2.filter(df2.col(df2.columns(0)) === lit(4)).collect > df2.filter(df2.col(df2.columns(0)) === lit(5)).collect > df2.filter(df2.col(df2.columns(0)) === lit(6)).collect > df2.filter(df2.col(df2.columns(0)) === lit(7)).collect > df2.filter(df2.col(df2.columns(0)) === lit(8)).collect > } > #0: 24.178487906 > #1: 22.619839888 > #2: 22.318617035 > #3: 22.131305502 > #4: 22.532095611 > #5: 22.245152778 > #6: 22.314114847 > #7: 22.284385952 > #8: 22.053593855 > Avg. Elapsed Time: 22.51973259712s > // Prepared path > val df3b = df2.filter(df2.col(df2.columns(0)) === param("val")).prepared > timer { > df3b.bindParam("val", 3).collect > df3b.bindParam("val", 4).collect > df3b.bindParam("val", 5).collect > df3b.bindParam("val", 6).collect > df3b.bindParam("val", 7).collect > df3b.bindParam("val", 8).collect > } > #0: 0.744693912 > #1: 0.743187129 > #2: 0.74513 > #3: 0.721668718 > #4: 0.757573342 > #5: 0.763240883 > #6: 0.731287275 > #7: 0.728740601 > #8: 0.674275592 > Avg. Elapsed Time: 0.734418606112s > {code} > I'm not sure this approach is acceptable, so welcome any suggestion and > advice. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20895) Support fast execution based on an optimized plan and parameter placeholders
[ https://issues.apache.org/jira/browse/SPARK-20895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16027972#comment-16027972 ] Reynold Xin commented on SPARK-20895: - Can you create a separate ticket to discuss prepare and cursors? Might be useful to have a paragraph or two to explain what they do. Thanks. > Support fast execution based on an optimized plan and parameter placeholders > > > Key: SPARK-20895 > URL: https://issues.apache.org/jira/browse/SPARK-20895 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.1.1 >Reporter: Takeshi Yamamuro >Priority: Minor > > In database scenarios, users sometimes use parameterized queries for repeated > execution (e.g., by using prepared statements). > So, I think this functionality is also useful for Spark users. > What I suggest here seems to be like: > My prototype here: > https://github.com/apache/spark/compare/master...maropu:PreparedStmt2 > {code} > scala> Seq((1, 2), (2, 3)).toDF("col1", "col2").createOrReplaceTempView("t") > // Define a query with a parameter placeholder named `val` > scala> val df = sql("SELECT * FROM t WHERE col1 = $val") > scala> df.explain > == Physical Plan == > *Project [_1#13 AS col1#16, _2#14 AS col2#17] > +- *Filter (_1#13 = cast(parameterholder(val) as int)) >+- LocalTableScan [_1#13, _2#14] > // Apply optimizer rules and get an optimized logical plan with the parameter > placeholder > scala> val preparedDf = df.prepared > // Bind an actual value and do execution > scala> preparedDf.bindParam("val", 1).show() > +++ > |col1|col2| > +++ > | 1| 2| > +++ > {code} > To implement this, my prototype adds a new expression leaf node named > `ParameterHolder`. > In a binding phase, this node is replaced with `Literal` including an actual > value by using `bindParam`. > Currently, Spark sometimes consumes much time to rewrite logical plans in > `Optimizer` (e.g. constant propagation desribed in SPARK-19846). > So, I feel this approach is also helpful in that case: > {code} > def timer[R](f: => {}): Unit = { > val count = 9 > val iters = (0 until count).map { i => > val t0 = System.nanoTime() > f > val t1 = System.nanoTime() > val elapsed = t1 - t0 + 0.0 > println(s"#$i: ${elapsed / 10.0}") > elapsed > } > println("Avg. Elapsed Time: " + ((iters.sum / count) / 10.0) + "s") > } > import org.apache.spark.sql.Row > import org.apache.spark.sql.types._ > val numCols = 50 > val df = spark.range(100).selectExpr((0 until numCols).map(i => s"id AS > _c$i"): _*) > // Add conditions to take much time in Optimizer > val filter = (0 until 128).foldLeft(lit(false))((e, i) => > e.or(df.col(df.columns(i % numCols)) === (rand() * 10).cast("int"))) > val df2 = df.filter(filter).sort(df.columns(0)) > // Regular path > timer { > df2.filter(df2.col(df2.columns(0)) === lit(3)).collect > df2.filter(df2.col(df2.columns(0)) === lit(4)).collect > df2.filter(df2.col(df2.columns(0)) === lit(5)).collect > df2.filter(df2.col(df2.columns(0)) === lit(6)).collect > df2.filter(df2.col(df2.columns(0)) === lit(7)).collect > df2.filter(df2.col(df2.columns(0)) === lit(8)).collect > } > #0: 24.178487906 > #1: 22.619839888 > #2: 22.318617035 > #3: 22.131305502 > #4: 22.532095611 > #5: 22.245152778 > #6: 22.314114847 > #7: 22.284385952 > #8: 22.053593855 > Avg. Elapsed Time: 22.51973259712s > // Prepared path > val df3b = df2.filter(df2.col(df2.columns(0)) === param("val")).prepared > timer { > df3b.bindParam("val", 3).collect > df3b.bindParam("val", 4).collect > df3b.bindParam("val", 5).collect > df3b.bindParam("val", 6).collect > df3b.bindParam("val", 7).collect > df3b.bindParam("val", 8).collect > } > #0: 0.744693912 > #1: 0.743187129 > #2: 0.74513 > #3: 0.721668718 > #4: 0.757573342 > #5: 0.763240883 > #6: 0.731287275 > #7: 0.728740601 > #8: 0.674275592 > Avg. Elapsed Time: 0.734418606112s > {code} > I'm not sure this approach is acceptable, so welcome any suggestion and > advice. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-20881) Clearly document the mechanism to choose between two sources of statistics
[ https://issues.apache.org/jira/browse/SPARK-20881?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiao Li resolved SPARK-20881. - Resolution: Fixed Assignee: Zhenhua Wang Fix Version/s: 2.3.0 > Clearly document the mechanism to choose between two sources of statistics > -- > > Key: SPARK-20881 > URL: https://issues.apache.org/jira/browse/SPARK-20881 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.2.0 >Reporter: Zhenhua Wang >Assignee: Zhenhua Wang > Fix For: 2.3.0 > > > Currently in Spark, statistics are generated by "analyze" commands. > When user updates the table and collects stats in Hive, "totalSize"/"numRows" > will be updated in metastore. > Then, in spark side, table stats becomes stale and is different from Hive's > stats. > This is expected. Currently, we have two sources of statistics, i.e. Spark's > stats and Hive's stats. In our design, once Spark's stats is available, we > respect it over Hive's stats. > If a user generated stats at Spark side, it's his responsibility to update > Spark's stats by re-running analyze commands. > But we should clearly document in related code the mechanism to choose > between these two sources of stats. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-20841) Support table column aliases in FROM clause
[ https://issues.apache.org/jira/browse/SPARK-20841?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiao Li reassigned SPARK-20841: --- Assignee: Takeshi Yamamuro > Support table column aliases in FROM clause > --- > > Key: SPARK-20841 > URL: https://issues.apache.org/jira/browse/SPARK-20841 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.0.0 >Reporter: Josh Rosen >Assignee: Takeshi Yamamuro >Priority: Minor > Fix For: 2.3.0 > > > Some SQL dialects support a relatively obscure "table column aliases" feature > where you can rename columns when aliasing a relation in a {{FROM}} clause. > For example: > {code} > SELECT * FROM onecolumn AS a(x) JOIN onecolumn AS b(y) ON a.x = b.y > {code} > Spark does not currently support this. I would like to add support for this > in order to allow me to run a corpus of existing queries which depend on this > syntax. > There's a good writeup on this at > http://modern-sql.com/feature/table-column-aliases, which has additional > examples and describes other databases' degrees of support for this feature. > One tricky thing to figure out will be whether FROM clause column aliases > take precedence over aliases in the SELECT clause. When adding support for > this, we should make sure to add sufficient testing of several corner-cases, > including: > * Aliasing in both the SELECT and FROM clause > * Aliasing columns in the FROM clause both with and without an explicit AS. > * Aliasing the wrong number of columns in the FROM clause, both greater and > fewer columns than were selected in the SELECT clause. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-20841) Support table column aliases in FROM clause
[ https://issues.apache.org/jira/browse/SPARK-20841?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiao Li resolved SPARK-20841. - Resolution: Fixed Fix Version/s: 2.3.0 > Support table column aliases in FROM clause > --- > > Key: SPARK-20841 > URL: https://issues.apache.org/jira/browse/SPARK-20841 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.0.0 >Reporter: Josh Rosen >Assignee: Takeshi Yamamuro >Priority: Minor > Fix For: 2.3.0 > > > Some SQL dialects support a relatively obscure "table column aliases" feature > where you can rename columns when aliasing a relation in a {{FROM}} clause. > For example: > {code} > SELECT * FROM onecolumn AS a(x) JOIN onecolumn AS b(y) ON a.x = b.y > {code} > Spark does not currently support this. I would like to add support for this > in order to allow me to run a corpus of existing queries which depend on this > syntax. > There's a good writeup on this at > http://modern-sql.com/feature/table-column-aliases, which has additional > examples and describes other databases' degrees of support for this feature. > One tricky thing to figure out will be whether FROM clause column aliases > take precedence over aliases in the SELECT clause. When adding support for > this, we should make sure to add sufficient testing of several corner-cases, > including: > * Aliasing in both the SELECT and FROM clause > * Aliasing columns in the FROM clause both with and without an explicit AS. > * Aliasing the wrong number of columns in the FROM clause, both greater and > fewer columns than were selected in the SELECT clause. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20877) Investigate if tests will time out on CRAN
[ https://issues.apache.org/jira/browse/SPARK-20877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16027870#comment-16027870 ] Shivaram Venkataraman commented on SPARK-20877: --- I managed to get the tests to pass on a Windows VM (Note that the VM has only 1 core and 4G of memory ). The timing breakdown is at https://gist.github.com/shivaram/dc235c50b6369cbc60d859c25b13670d and the overall run time was close to 1hr. I think AppVeyor might have a beefier machine ? Anyways the most expensive tests to run remain to be the same across linux and windows -- I think we can disable them when running on CRAN / Windows ? Are there other options we have to make these tests run faster ? > Investigate if tests will time out on CRAN > -- > > Key: SPARK-20877 > URL: https://issues.apache.org/jira/browse/SPARK-20877 > Project: Spark > Issue Type: Sub-task > Components: SparkR >Affects Versions: 2.2.0 >Reporter: Felix Cheung > -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-20881) Clearly document the mechanism to choose between two sources of statistics
[ https://issues.apache.org/jira/browse/SPARK-20881?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhenhua Wang updated SPARK-20881: - Summary: Clearly document the mechanism to choose between two sources of statistics (was: Use Hive's stats in metastore when cbo is disabled) > Clearly document the mechanism to choose between two sources of statistics > -- > > Key: SPARK-20881 > URL: https://issues.apache.org/jira/browse/SPARK-20881 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.2.0 >Reporter: Zhenhua Wang > > Currently statistics are generated by "analyze command" in Spark. > However, when user updates the table and collects stats in Hive, > "totalSize"/"numRows" will be updated in metastore. > Now, in spark side, table stats become stale. > If cbo is enabled, this is ok because we suppose user will handle this and > re-run the command to update stats. > If cbo is disabled, then we should fallback to original way and respect > hive's stats. But in current implementation, spark's stats always override > hive's stats, no matter cbo is enabled or disabled. > The right thing to do is to use (don't override) hive's stats when cbo is > disabled. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-20881) Clearly document the mechanism to choose between two sources of statistics
[ https://issues.apache.org/jira/browse/SPARK-20881?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhenhua Wang updated SPARK-20881: - Description: Currently in Spark, statistics are generated by "analyze" commands. When user updates the table and collects stats in Hive, "totalSize"/"numRows" will be updated in metastore. Then, in spark side, table stats becomes stale and is different from Hive's stats. This is expected. Currently, we have two sources of statistics, i.e. Spark's stats and Hive's stats. In our design, once Spark's stats is available, we respect it over Hive's stats. If a user generated stats at Spark side, it's his responsibility to update Spark's stats by re-running analyze commands. But we should clearly document in related code the mechanism to choose between these two sources of stats. was: Currently statistics are generated by "analyze command" in Spark. However, when user updates the table and collects stats in Hive, "totalSize"/"numRows" will be updated in metastore. Now, in spark side, table stats become stale. If cbo is enabled, this is ok because we suppose user will handle this and re-run the command to update stats. If cbo is disabled, then we should fallback to original way and respect hive's stats. But in current implementation, spark's stats always override hive's stats, no matter cbo is enabled or disabled. The right thing to do is to use (don't override) hive's stats when cbo is disabled. > Clearly document the mechanism to choose between two sources of statistics > -- > > Key: SPARK-20881 > URL: https://issues.apache.org/jira/browse/SPARK-20881 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.2.0 >Reporter: Zhenhua Wang > > Currently in Spark, statistics are generated by "analyze" commands. > When user updates the table and collects stats in Hive, "totalSize"/"numRows" > will be updated in metastore. > Then, in spark side, table stats becomes stale and is different from Hive's > stats. > This is expected. Currently, we have two sources of statistics, i.e. Spark's > stats and Hive's stats. In our design, once Spark's stats is available, we > respect it over Hive's stats. > If a user generated stats at Spark side, it's his responsibility to update > Spark's stats by re-running analyze commands. > But we should clearly document in related code the mechanism to choose > between these two sources of stats. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-20910) Build-in SQL Function Support - UUID
[ https://issues.apache.org/jira/browse/SPARK-20910?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-20910: Assignee: Apache Spark > Build-in SQL Function Support - UUID > > > Key: SPARK-20910 > URL: https://issues.apache.org/jira/browse/SPARK-20910 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.2.0 >Reporter: Yuming Wang >Assignee: Apache Spark > Labels: starter > > {code:sql} > UUID() > {code} > Returns a universally unique identifier (UUID) string. > Ref: > https://dev.mysql.com/doc/refman/5.5/en/miscellaneous-functions.html#function_uuid -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-20910) Build-in SQL Function Support - UUID
[ https://issues.apache.org/jira/browse/SPARK-20910?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-20910: Assignee: (was: Apache Spark) > Build-in SQL Function Support - UUID > > > Key: SPARK-20910 > URL: https://issues.apache.org/jira/browse/SPARK-20910 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.2.0 >Reporter: Yuming Wang > Labels: starter > > {code:sql} > UUID() > {code} > Returns a universally unique identifier (UUID) string. > Ref: > https://dev.mysql.com/doc/refman/5.5/en/miscellaneous-functions.html#function_uuid -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20910) Build-in SQL Function Support - UUID
[ https://issues.apache.org/jira/browse/SPARK-20910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16027752#comment-16027752 ] Apache Spark commented on SPARK-20910: -- User 'wangyum' has created a pull request for this issue: https://github.com/apache/spark/pull/18136 > Build-in SQL Function Support - UUID > > > Key: SPARK-20910 > URL: https://issues.apache.org/jira/browse/SPARK-20910 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.2.0 >Reporter: Yuming Wang > Labels: starter > > {code:sql} > UUID() > {code} > Returns a universally unique identifier (UUID) string. > Ref: > https://dev.mysql.com/doc/refman/5.5/en/miscellaneous-functions.html#function_uuid -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-20910) Build-in SQL Function Support - UUID
Yuming Wang created SPARK-20910: --- Summary: Build-in SQL Function Support - UUID Key: SPARK-20910 URL: https://issues.apache.org/jira/browse/SPARK-20910 Project: Spark Issue Type: Sub-task Components: SQL Affects Versions: 2.2.0 Reporter: Yuming Wang {code:sql} UUID() {code} Returns a universally unique identifier (UUID) string. Ref: https://dev.mysql.com/doc/refman/5.5/en/miscellaneous-functions.html#function_uuid -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20882) Executor is waiting for ShuffleBlockFetcherIterator
[ https://issues.apache.org/jira/browse/SPARK-20882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16027743#comment-16027743 ] cen yuhai commented on SPARK-20882: --- [~zsxwing] It is my problem... It is none of spark's busniess. > Executor is waiting for ShuffleBlockFetcherIterator > --- > > Key: SPARK-20882 > URL: https://issues.apache.org/jira/browse/SPARK-20882 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.0, 2.1.1 >Reporter: cen yuhai > Attachments: executor_jstack, executor_log, screenshot-1.png, > screenshot-2.png, screenshot-3.png > > > This bug is like https://issues.apache.org/jira/browse/SPARK-19300. > but I have updated my client netty version to 4.0.43.Final. > The shuffle service handler is still 4.0.42.Final > spark.sql.adaptive.enabled is true > {code} > "Executor task launch worker for task 4808985" #5373 daemon prio=5 os_prio=0 > tid=0x7f54ef437000 nid=0x1aed0 waiting on condition [0x7f53aebfe000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > parking to wait for <0x000498c249c0> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:189) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) > at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:332) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:58) > at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) > at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) > at > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:199) > at > org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63) > at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:97) > at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54) > at org.apache.spark.scheduler.Task.run(Task.scala:114) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:323) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1147) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:622) > at java.lang.Thread.run(Thread.java:834) > {code} > {code} > 17/05/26 12:04:06 DEBUG ShuffleBlockFetcherIterator: remainingBlocks: > Set(shuffle_5_1431_805, shuffle_5_1431_808, shuffle_5_1431_806, > shuffle_5_1431_809, shuffle_5_1431_807) > 17/05/26 12:04:06 DEBUG ShuffleBlockFetcherIterator: remainingBlocks: > Set(shuffle_5_1431_808, shuffle_5_1431_806, shuffle_5_1431_809, > shuffle_5_1431_807) > 17/05/26 12:04:06 DEBUG ShuffleBlockFetcherIterator: remainingBlocks: > Set(shuffle_5_1431_808, shuffle_5_1431_809, shuffle_5_1431_807) > 17/05/26 12:04:06 DEBUG ShuffleBlockFetcherIterator: remainingBlocks: > Set(shuffle_5_1431_808, shuffle_5_1431_809) > 17/05/26 12:04:06 DEBUG ShuffleBlockFetcherIterator: remainingBlocks: > Set(shuffle_5_1431_809) > 17/05/26 12:04:06 DEBUG ShuffleBlockFetcherIterator: remainingBlocks: Set() > 17/05/26 12:04:06 DEBUG ShuffleBlockFetcherIterator: Number of requests in > flight 21 > 17/05/26 12:04:06 DEBUG ShuffleBlockFetcherIterator: Number of requests in > flight 20 > 17/05/26 12:04:06 DEBUG ShuffleBlockFetcherIterator: Number of requests in > flight 19 > 17/05/26 12:04:06 DEBUG ShuffleBlockFetcherIterator: Number of requests in > flight 18 > 17/05/26 12:04:06 DEBUG ShuffleBlockFetcherIterator: Number of requests in > flight 17 > 17/05/26 12:04:06 DEBUG ShuffleBlockFetcherIterator: Number of requests in > flight 16 > 17/05/26 12:04:06 DEBUG ShuffleBlockFetcherIterator: Number of requests in > flight 15 > 17/05/26 12:04:06 DEBUG ShuffleBlockFetcherIterator: Number of requests in > flight 14 > 17/05/26 12:04:06 DEBUG ShuffleBlockFetcherIterator: Number of requests in > flight 13 > 17/05/26 12:04:06 DEBUG ShuffleBlockFetcherIterator: Number of requests in > flight 12 > 17/05/26 12:04:06 DEBUG
[jira] [Commented] (SPARK-20895) Support fast execution based on an optimized plan and parameter placeholders
[ https://issues.apache.org/jira/browse/SPARK-20895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16027742#comment-16027742 ] Takeshi Yamamuro commented on SPARK-20895: -- I saw the issues tickets in https://issues.apache.org/jira/browse/SPARK-19846 and https://issues.apache.org/jira/browse/SPARK-19875. But, yea, your suggestion seems reasonable to me (Later, I'll close this as 'Won't fix'). BTW, (this is not directly related to this ticket though...), is it still worth implementing `PREPARE` statement in Spark? This is the ANSI/SQL standard (http://developer.mimer.com/documentation/latest_html/Mimer_SQL_Engine_DocSet/Embedded_SQL9.html) and it is some useful for users who frequently use the same queries. > Support fast execution based on an optimized plan and parameter placeholders > > > Key: SPARK-20895 > URL: https://issues.apache.org/jira/browse/SPARK-20895 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.1.1 >Reporter: Takeshi Yamamuro >Priority: Minor > > In database scenarios, users sometimes use parameterized queries for repeated > execution (e.g., by using prepared statements). > So, I think this functionality is also useful for Spark users. > What I suggest here seems to be like: > My prototype here: > https://github.com/apache/spark/compare/master...maropu:PreparedStmt2 > {code} > scala> Seq((1, 2), (2, 3)).toDF("col1", "col2").createOrReplaceTempView("t") > // Define a query with a parameter placeholder named `val` > scala> val df = sql("SELECT * FROM t WHERE col1 = $val") > scala> df.explain > == Physical Plan == > *Project [_1#13 AS col1#16, _2#14 AS col2#17] > +- *Filter (_1#13 = cast(parameterholder(val) as int)) >+- LocalTableScan [_1#13, _2#14] > // Apply optimizer rules and get an optimized logical plan with the parameter > placeholder > scala> val preparedDf = df.prepared > // Bind an actual value and do execution > scala> preparedDf.bindParam("val", 1).show() > +++ > |col1|col2| > +++ > | 1| 2| > +++ > {code} > To implement this, my prototype adds a new expression leaf node named > `ParameterHolder`. > In a binding phase, this node is replaced with `Literal` including an actual > value by using `bindParam`. > Currently, Spark sometimes consumes much time to rewrite logical plans in > `Optimizer` (e.g. constant propagation desribed in SPARK-19846). > So, I feel this approach is also helpful in that case: > {code} > def timer[R](f: => {}): Unit = { > val count = 9 > val iters = (0 until count).map { i => > val t0 = System.nanoTime() > f > val t1 = System.nanoTime() > val elapsed = t1 - t0 + 0.0 > println(s"#$i: ${elapsed / 10.0}") > elapsed > } > println("Avg. Elapsed Time: " + ((iters.sum / count) / 10.0) + "s") > } > import org.apache.spark.sql.Row > import org.apache.spark.sql.types._ > val numCols = 50 > val df = spark.range(100).selectExpr((0 until numCols).map(i => s"id AS > _c$i"): _*) > // Add conditions to take much time in Optimizer > val filter = (0 until 128).foldLeft(lit(false))((e, i) => > e.or(df.col(df.columns(i % numCols)) === (rand() * 10).cast("int"))) > val df2 = df.filter(filter).sort(df.columns(0)) > // Regular path > timer { > df2.filter(df2.col(df2.columns(0)) === lit(3)).collect > df2.filter(df2.col(df2.columns(0)) === lit(4)).collect > df2.filter(df2.col(df2.columns(0)) === lit(5)).collect > df2.filter(df2.col(df2.columns(0)) === lit(6)).collect > df2.filter(df2.col(df2.columns(0)) === lit(7)).collect > df2.filter(df2.col(df2.columns(0)) === lit(8)).collect > } > #0: 24.178487906 > #1: 22.619839888 > #2: 22.318617035 > #3: 22.131305502 > #4: 22.532095611 > #5: 22.245152778 > #6: 22.314114847 > #7: 22.284385952 > #8: 22.053593855 > Avg. Elapsed Time: 22.51973259712s > // Prepared path > val df3b = df2.filter(df2.col(df2.columns(0)) === param("val")).prepared > timer { > df3b.bindParam("val", 3).collect > df3b.bindParam("val", 4).collect > df3b.bindParam("val", 5).collect > df3b.bindParam("val", 6).collect > df3b.bindParam("val", 7).collect > df3b.bindParam("val", 8).collect > } > #0: 0.744693912 > #1: 0.743187129 > #2: 0.74513 > #3: 0.721668718 > #4: 0.757573342 > #5: 0.763240883 > #6: 0.731287275 > #7: 0.728740601 > #8: 0.674275592 > Avg. Elapsed Time: 0.734418606112s > {code} > I'm not sure this approach is acceptable, so welcome any suggestion and > advice. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Issue Comment Deleted] (SPARK-20909) Build-in SQL Function Support - DAYOFWEEK
[ https://issues.apache.org/jira/browse/SPARK-20909?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuming Wang updated SPARK-20909: Comment: was deleted (was: I'm work on this.) > Build-in SQL Function Support - DAYOFWEEK > - > > Key: SPARK-20909 > URL: https://issues.apache.org/jira/browse/SPARK-20909 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.2.0 >Reporter: Yuming Wang > Labels: starter > > {noformat} > DAYOFWEEK(date) > {noformat} > Return the weekday index of the argument. > Ref: > https://dev.mysql.com/doc/refman/5.5/en/date-and-time-functions.html#function_dayofweek -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-20199) GradientBoostedTreesModel doesn't have featureSubsetStrategy parameter
[ https://issues.apache.org/jira/browse/SPARK-20199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16027725#comment-16027725 ] pralabhkumar edited comment on SPARK-20199 at 5/28/17 7:12 AM: --- [~peng.m...@intel.com][~facai] [~srowen] Please review the pull request /approach,. was (Author: pralabhkumar): [~arushkharbanda][~peng.m...@intel.com][~facai] [~srowen] Please review the pull request /approach,. > GradientBoostedTreesModel doesn't have featureSubsetStrategy parameter > --- > > Key: SPARK-20199 > URL: https://issues.apache.org/jira/browse/SPARK-20199 > Project: Spark > Issue Type: Improvement > Components: ML, MLlib >Affects Versions: 2.1.0 >Reporter: pralabhkumar > > Spark GradientBoostedTreesModel doesn't have featureSubsetStrategy . It Uses > random forest internally ,which have featureSubsetStrategy hardcoded "all". > It should be provided by the user to have randomness at the feature level. > This parameter is available in H2O and XGBoost. > Sample from H2O.ai > gbmParams._col_sample_rate > Please provide the parameter . -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20199) GradientBoostedTreesModel doesn't have featureSubsetStrategy parameter
[ https://issues.apache.org/jira/browse/SPARK-20199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16027725#comment-16027725 ] pralabhkumar commented on SPARK-20199: -- [~arushkharbanda][~peng.m...@intel.com][~facai] [~srowen] Please review the pull request /approach,. > GradientBoostedTreesModel doesn't have featureSubsetStrategy parameter > --- > > Key: SPARK-20199 > URL: https://issues.apache.org/jira/browse/SPARK-20199 > Project: Spark > Issue Type: Improvement > Components: ML, MLlib >Affects Versions: 2.1.0 >Reporter: pralabhkumar > > Spark GradientBoostedTreesModel doesn't have featureSubsetStrategy . It Uses > random forest internally ,which have featureSubsetStrategy hardcoded "all". > It should be provided by the user to have randomness at the feature level. > This parameter is available in H2O and XGBoost. > Sample from H2O.ai > gbmParams._col_sample_rate > Please provide the parameter . -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20877) Investigate if tests will time out on CRAN
[ https://issues.apache.org/jira/browse/SPARK-20877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16027720#comment-16027720 ] Felix Cheung commented on SPARK-20877: -- Have a run on AppVeyor https://ci.appveyor.com/project/ApacheSoftwareFoundation/spark/build/1360-master it ran for 31 min (vs. on Jenkins, <7min) > Investigate if tests will time out on CRAN > -- > > Key: SPARK-20877 > URL: https://issues.apache.org/jira/browse/SPARK-20877 > Project: Spark > Issue Type: Sub-task > Components: SparkR >Affects Versions: 2.2.0 >Reporter: Felix Cheung > -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org