[jira] [Updated] (SPARK-46359) add optional batch id _spark_metadata sink file status
[ https://issues.apache.org/jira/browse/SPARK-46359?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] gagan taneja updated SPARK-46359: - Summary: add optional batch id _spark_metadata sink file status (was: add optional batch id to sink file status ) > add optional batch id _spark_metadata sink file status > --- > > Key: SPARK-46359 > URL: https://issues.apache.org/jira/browse/SPARK-46359 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 3.5.0 >Reporter: gagan taneja >Priority: Major > > Currently FileStreamSinkLog use SinkFileStatus class for serialization > purpose but it does preserve batch id which can be used effectively for > snapshots > Change the definition to > {code} > case class SinkFileStatus (path : String, size : Long, isDir : Boolean, > modificationTime : Long, blockReplication: Int, blockSize: Long, action : > String, bactchId : Option[Long]) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-46359) add optional batch id to sink file status
gagan taneja created SPARK-46359: Summary: add optional batch id to sink file status Key: SPARK-46359 URL: https://issues.apache.org/jira/browse/SPARK-46359 Project: Spark Issue Type: Improvement Components: Structured Streaming Affects Versions: 3.5.0 Reporter: gagan taneja Currently FileStreamSinkLog use SinkFileStatus class for serialization purpose but it does preserve batch id which can be used effectively for snapshots Change the definition to {code} case class SinkFileStatus (path : String, size : Long, isDir : Boolean, modificationTime : Long, blockReplication: Int, blockSize: Long, action : String, bactchId : Option[Long]) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24437) Memory leak in UnsafeHashedRelation
[ https://issues.apache.org/jira/browse/SPARK-24437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16499733#comment-16499733 ] gagan taneja commented on SPARK-24437: -- I have already set the values of spark.cleaner.periodicGC.interval to 5 min Which branch did you try to reproduce this issue. There are many change made in this area to address some other leaks. Can you try it out with 2.2 branch. If its reproducible in 2.2 then we know for sure that we have a way to reproduce it and the issue is resolved in post 2.2 fixes > Memory leak in UnsafeHashedRelation > --- > > Key: SPARK-24437 > URL: https://issues.apache.org/jira/browse/SPARK-24437 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 >Reporter: gagan taneja >Priority: Critical > Attachments: Screen Shot 2018-05-30 at 2.05.40 PM.png, Screen Shot > 2018-05-30 at 2.07.22 PM.png > > > There seems to memory leak with > org.apache.spark.sql.execution.joins.UnsafeHashedRelation > We have a long running instance of STS. > With each query execution requiring Broadcast Join, UnsafeHashedRelation is > getting added for cleanup in ContextCleaner. This reference of > UnsafeHashedRelation is being held at some other Collection and not becoming > eligible for GC and because of this ContextCleaner is not able to clean it. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24437) Memory leak in UnsafeHashedRelation
[ https://issues.apache.org/jira/browse/SPARK-24437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16496866#comment-16496866 ] gagan taneja commented on SPARK-24437: -- No Dynamic allocation. Also this is an issue with Driver where Broadcast is not Garbage collected. By any chance do you know which other Collection is holding reference to this Broadcast. Also do you have a simple way of reproducing this issue > Memory leak in UnsafeHashedRelation > --- > > Key: SPARK-24437 > URL: https://issues.apache.org/jira/browse/SPARK-24437 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 >Reporter: gagan taneja >Priority: Critical > Attachments: Screen Shot 2018-05-30 at 2.05.40 PM.png, Screen Shot > 2018-05-30 at 2.07.22 PM.png > > > There seems to memory leak with > org.apache.spark.sql.execution.joins.UnsafeHashedRelation > We have a long running instance of STS. > With each query execution requiring Broadcast Join, UnsafeHashedRelation is > getting added for cleanup in ContextCleaner. This reference of > UnsafeHashedRelation is being held at some other Collection and not becoming > eligible for GC and because of this ContextCleaner is not able to clean it. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-24437) Memory leak in UnsafeHashedRelation
[ https://issues.apache.org/jira/browse/SPARK-24437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] gagan taneja updated SPARK-24437: - Attachment: Screen Shot 2018-05-30 at 2.07.22 PM.png > Memory leak in UnsafeHashedRelation > --- > > Key: SPARK-24437 > URL: https://issues.apache.org/jira/browse/SPARK-24437 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 >Reporter: gagan taneja >Priority: Critical > Attachments: Screen Shot 2018-05-30 at 2.05.40 PM.png, Screen Shot > 2018-05-30 at 2.07.22 PM.png > > > There seems to memory leak with > org.apache.spark.sql.execution.joins.UnsafeHashedRelation > We have a long running instance of STS. > With each query execution requiring Broadcast Join, UnsafeHashedRelation is > getting added for cleanup in ContextCleaner. This reference of > UnsafeHashedRelation is being held at some other Collection and not becoming > eligible for GC and because of this ContextCleaner is not able to clean it. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-24437) Memory leak in UnsafeHashedRelation
[ https://issues.apache.org/jira/browse/SPARK-24437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] gagan taneja updated SPARK-24437: - Attachment: Screen Shot 2018-05-30 at 2.05.40 PM.png > Memory leak in UnsafeHashedRelation > --- > > Key: SPARK-24437 > URL: https://issues.apache.org/jira/browse/SPARK-24437 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 >Reporter: gagan taneja >Priority: Critical > Attachments: Screen Shot 2018-05-30 at 2.05.40 PM.png > > > There seems to memory leak with > org.apache.spark.sql.execution.joins.UnsafeHashedRelation > We have a long running instance of STS. > With each query execution requiring Broadcast Join, UnsafeHashedRelation is > getting added for cleanup in ContextCleaner. This reference of > UnsafeHashedRelation is being held at some other Collection and not becoming > eligible for GC and because of this ContextCleaner is not able to clean it. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-24437) Memory leak in UnsafeHashedRelation
gagan taneja created SPARK-24437: Summary: Memory leak in UnsafeHashedRelation Key: SPARK-24437 URL: https://issues.apache.org/jira/browse/SPARK-24437 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.2.0 Reporter: gagan taneja There seems to memory leak with org.apache.spark.sql.execution.joins.UnsafeHashedRelation We have a long running instance of STS. With each query execution requiring Broadcast Join, UnsafeHashedRelation is getting added for cleanup in ContextCleaner. This reference of UnsafeHashedRelation is being held at some other Collection and not becoming eligible for GC and because of this ContextCleaner is not able to clean it. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-21760) Structured streaming terminates with Exception
gagan taneja created SPARK-21760: Summary: Structured streaming terminates with Exception Key: SPARK-21760 URL: https://issues.apache.org/jira/browse/SPARK-21760 Project: Spark Issue Type: Bug Components: Structured Streaming Affects Versions: 2.1.0 Reporter: gagan taneja Priority: Critical We have seen Structured stream stops with exception below While analyzing the content we found that latest log file as just one line with version {quote}hdfs dfs -cat warehouse/latency_internal/_spark_metadata/1683 v1 {quote} Exception is below Exception in thread "stream execution thread for latency_internal [id = 39f35d01-60d5-40b4-826e-99e5e38d0077, runId = 95c95a01-bd4f-4604-8aae-c0c5d3e873e8]" java.lang.IllegalStateException: Incomplete log file at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.deserialize(CompactibleFileStreamLog.scala:147) at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.deserialize(CompactibleFileStreamLog.scala:42) at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.get(HDFSMetadataLog.scala:237) at org.apache.spark.sql.execution.streaming.HDFSMetadataLog$$anonfun$getLatest$1.apply$mcVJ$sp(HDFSMetadataLog.scala:266) at org.apache.spark.sql.execution.streaming.HDFSMetadataLog$$anonfun$getLatest$1.apply(HDFSMetadataLog.scala:265) at org.apache.spark.sql.execution.streaming.HDFSMetadataLog$$anonfun$getLatest$1.apply(HDFSMetadataLog.scala:265) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofLong.foreach(ArrayOps.scala:246) at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.getLatest(HDFSMetadataLog.scala:265) at org.apache.spark.sql.execution.streaming.FileStreamSource.(FileStreamSource.scala:60) at org.apache.spark.sql.execution.datasources.DataSource.createSource(DataSource.scala:256) at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$logicalPlan$1.applyOrElse(StreamExecution.scala:127) at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$logicalPlan$1.applyOrElse(StreamExecution.scala:123) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:288) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:288) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:287) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:293) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:293) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:331) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:188) at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:329) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:293) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:293) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:293) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:331) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:188) at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:329) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:293) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:293) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:293) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:331) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:188) at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:329) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:293) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:293) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:293) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:331) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:188) at
[jira] [Commented] (SPARK-19705) Preferred location supporting HDFS Cache for FileScanRDD
[ https://issues.apache.org/jira/browse/SPARK-19705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15940897#comment-15940897 ] gagan taneja commented on SPARK-19705: -- Sandy Would you be able to help with this bug. I saw you had filed earlier bug for HDFS Cache support to RDD and this is relatively minor change. We have been running this code in production and we are able to achieve major performance boost Below is the reference to earlier bug filed by you https://issues.apache.org/jira/browse/SPARK-1767 > Preferred location supporting HDFS Cache for FileScanRDD > > > Key: SPARK-19705 > URL: https://issues.apache.org/jira/browse/SPARK-19705 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.0 >Reporter: gagan taneja > > Although NewHadoopRDD and HadoopRdd considers HDFS cache while calculating > preferredLocations, FileScanRDD do not take into account HDFS cache while > calculating preferredLocations > The enhancement can be easily implemented for large files where FilePartition > only contains single HDFS file > The enhancement will also result in significant performance improvement for > cached hdfs partitions -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19145) Timestamp to String casting is slowing the query significantly
[ https://issues.apache.org/jira/browse/SPARK-19145?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15896061#comment-15896061 ] gagan taneja commented on SPARK-19145: -- I am suggesting following changes introduce the function to test perfactCast similar to private def perfectCast(expr: Literal, dataType: DataType): Boolean = { expr match { case Literal(value, StringType) => scala.util.Try { Cast(expr, dataType).eval(null) }.isSuccess case _ => false } } And string promotion based on condition if input string can be perfectly casted // We should cast all relative timestamp/date/string comparison into string comparisons // This behaves as a user would expect because timestamp strings sort lexicographically. // i.e. TimeStamp(2013-01-01 00:00 ...) < "2014" = true // For cases where its a exact cast we should cast String type to timestamp time // This would speed up the execution // i.e TimeStamp(2013-01-01 00:00T ...) < '2017-01-02 19:53:51' would translate to // TimeStamp(2013-01-01 00:00T ...) < Timestamp(2017-01-02 19:53:51) would translate to case p @ BinaryComparison(left @ Literal(_, StringType), right @ DateType()) if (acceptedDataTypes.contains( right) && perfectCast( left, right.dataType ) ) => p.makeCopy( Array( Cast( left, right.dataType), right )) case p @ BinaryComparison(left @ StringType(), right @ DateType()) if acceptedDataTypes.contains( right) => p.makeCopy(Array(left, Cast(right, StringType))) case p @ BinaryComparison(left @ DateType(), right @ Literal(_, StringType)) if (acceptedDataTypes.contains( left) && perfectCast( right, left.dataType )) => p.makeCopy( Array( left, Cast( right, left.dataType) )) case p @ BinaryComparison(left @ DateType(), right @ StringType()) if acceptedDataTypes.contains( left) => p.makeCopy(Array(Cast(left, StringType), right)) > Timestamp to String casting is slowing the query significantly > -- > > Key: SPARK-19145 > URL: https://issues.apache.org/jira/browse/SPARK-19145 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.0 >Reporter: gagan taneja > > i have a time series table with timestamp column > Following query > SELECT COUNT(*) AS `count` >FROM `default`.`table` >WHERE `time` >= '2017-01-02 19:53:51' > AND `time` <= '2017-01-09 19:53:51' LIMIT 5 > is significantly SLOWER than > SELECT COUNT(*) AS `count` > FROM `default`.`table` > WHERE `time` >= to_utc_timestamp('2017-01-02 19:53:51','-MM-DD > HH24:MI:SS−0800') > AND `time` <= to_utc_timestamp('2017-01-09 19:53:51','-MM-DD > HH24:MI:SS−0800') LIMIT 5 > After investigation i found that in the first query time colum is cast to > String before applying the filter > However in the second query no such casting is performed and its a filter > with long value > Below are the generate Physical plan for slower execution followed by > physical plan for faster execution > SELECT COUNT(*) AS `count` >FROM `default`.`table` >WHERE `time` >= '2017-01-02 19:53:51' > AND `time` <= '2017-01-09 19:53:51' LIMIT 5 > == Physical Plan == > CollectLimit 5 > +- *HashAggregate(keys=[], functions=[count(1)], output=[count#3290L]) >+- Exchange SinglePartition > +- *HashAggregate(keys=[], functions=[partial_count(1)], > output=[count#3339L]) > +- *Project > +- *Filter ((isnotnull(time#3314) && (cast(time#3314 as string) > >= 2017-01-02 19:53:51)) && (cast(time#3314 as string) <= 2017-01-09 > 19:53:51)) >+- *FileScan parquet default.cstat[time#3314] Batched: true, > Format: Parquet, Location: > InMemoryFileIndex[hdfs://10.65.55.220/user/spark/spark-warehouse/cstat], > PartitionFilters: [], PushedFilters: [IsNotNull(time)], ReadSchema: > struct > SELECT COUNT(*) AS `count` > FROM `default`.`table` > WHERE `time` >= to_utc_timestamp('2017-01-02 19:53:51','-MM-DD > HH24:MI:SS−0800') > AND `time` <= to_utc_timestamp('2017-01-09 19:53:51','-MM-DD > HH24:MI:SS−0800') LIMIT 5 > == Physical Plan == > CollectLimit 5 > +- *HashAggregate(keys=[], functions=[count(1)], output=[count#3238L]) >+- Exchange SinglePartition > +- *HashAggregate(keys=[], functions=[partial_count(1)], > output=[count#3287L]) > +- *Project > +- *Filter ((isnotnull(time#3262) && (time#3262 >= > 148340483100)) && (time#3262 <= 148400963100)) >+- *FileScan parquet default.cstat[time#3262] Batched: true, > Format: Parquet, Location: >
[jira] [Commented] (SPARK-19145) Timestamp to String casting is slowing the query significantly
[ https://issues.apache.org/jira/browse/SPARK-19145?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15896046#comment-15896046 ] gagan taneja commented on SPARK-19145: -- 17/03/04 19:05:32 TRACE HiveSessionState$$anon$1: === Applying Rule org.apache.spark.sql.catalyst.analysis.TypeCoercion$PromoteStrings === Before 'Project [*] !+- 'Filter (time#88 >= 2017-01-02 19:53:51) +- SubqueryAlias person_view, `rule_test`.`person_view` +- Project [gen_attr_0#83 AS name#86, gen_attr_1#84 AS age#87, gen_attr_2#85 AS time#88] +- SubqueryAlias person_table +- Project [gen_attr_0#83, gen_attr_1#84, gen_attr_2#85] +- Filter (gen_attr_1#84 > 10) +- SubqueryAlias gen_subquery_0 +- Project [name#89 AS gen_attr_0#83, age#90 AS gen_attr_1#84, time#91 AS gen_attr_2#85] +- MetastoreRelation rule_test, person_table After 'Project [*] +- Filter (cast(time#88 as string) >= 2017-01-02 19:53:51) +- SubqueryAlias person_view, `rule_test`.`person_view` +- Project [gen_attr_0#83 AS name#86, gen_attr_1#84 AS age#87, gen_attr_2#85 AS time#88] +- SubqueryAlias person_table +- Project [gen_attr_0#83, gen_attr_1#84, gen_attr_2#85] +- Filter (gen_attr_1#84 > 10) +- SubqueryAlias gen_subquery +- Project [name#89 AS gen_attr_0#83, age#90 AS gen_attr_1#84, time#91 AS gen_attr_2#85] +- MetastoreRelation rule_test, person_table > Timestamp to String casting is slowing the query significantly > -- > > Key: SPARK-19145 > URL: https://issues.apache.org/jira/browse/SPARK-19145 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.0 >Reporter: gagan taneja > > i have a time series table with timestamp column > Following query > SELECT COUNT(*) AS `count` >FROM `default`.`table` >WHERE `time` >= '2017-01-02 19:53:51' > AND `time` <= '2017-01-09 19:53:51' LIMIT 5 > is significantly SLOWER than > SELECT COUNT(*) AS `count` > FROM `default`.`table` > WHERE `time` >= to_utc_timestamp('2017-01-02 19:53:51','-MM-DD > HH24:MI:SS−0800') > AND `time` <= to_utc_timestamp('2017-01-09 19:53:51','-MM-DD > HH24:MI:SS−0800') LIMIT 5 > After investigation i found that in the first query time colum is cast to > String before applying the filter > However in the second query no such casting is performed and its a filter > with long value > Below are the generate Physical plan for slower execution followed by > physical plan for faster execution > SELECT COUNT(*) AS `count` >FROM `default`.`table` >WHERE `time` >= '2017-01-02 19:53:51' > AND `time` <= '2017-01-09 19:53:51' LIMIT 5 > == Physical Plan == > CollectLimit 5 > +- *HashAggregate(keys=[], functions=[count(1)], output=[count#3290L]) >+- Exchange SinglePartition > +- *HashAggregate(keys=[], functions=[partial_count(1)], > output=[count#3339L]) > +- *Project > +- *Filter ((isnotnull(time#3314) && (cast(time#3314 as string) > >= 2017-01-02 19:53:51)) && (cast(time#3314 as string) <= 2017-01-09 > 19:53:51)) >+- *FileScan parquet default.cstat[time#3314] Batched: true, > Format: Parquet, Location: > InMemoryFileIndex[hdfs://10.65.55.220/user/spark/spark-warehouse/cstat], > PartitionFilters: [], PushedFilters: [IsNotNull(time)], ReadSchema: > struct > SELECT COUNT(*) AS `count` > FROM `default`.`table` > WHERE `time` >= to_utc_timestamp('2017-01-02 19:53:51','-MM-DD > HH24:MI:SS−0800') > AND `time` <= to_utc_timestamp('2017-01-09 19:53:51','-MM-DD > HH24:MI:SS−0800') LIMIT 5 > == Physical Plan == > CollectLimit 5 > +- *HashAggregate(keys=[], functions=[count(1)], output=[count#3238L]) >+- Exchange SinglePartition > +- *HashAggregate(keys=[], functions=[partial_count(1)], > output=[count#3287L]) > +- *Project > +- *Filter
[jira] [Commented] (SPARK-19145) Timestamp to String casting is slowing the query significantly
[ https://issues.apache.org/jira/browse/SPARK-19145?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15896040#comment-15896040 ] gagan taneja commented on SPARK-19145: -- Code responsible for this belongs to class amd the code below is responsible to type coercion. Although this is logically correct its also slowing down binary comparison because during execution Interval types are casted to String and the comparision is done on string operator resulting in 10x slower performance org.apache.spark.sql.catalyst.analysis.TypeCoercion { . // We should cast all relative timestamp/date/string comparison into string comparisons // This behaves as a user would expect because timestamp strings sort lexicographically. // i.e. TimeStamp(2013-01-01 00:00 ...) < "2014" = true case p @ BinaryComparison(left @ StringType(), right @ DateType()) => p.makeCopy(Array(left, Cast(right, StringType))) case p @ BinaryComparison(left @ DateType(), right @ StringType()) => p.makeCopy(Array(Cast(left, StringType), right)) case p @ BinaryComparison(left @ StringType(), right @ TimestampType()) => p.makeCopy(Array(left, Cast(right, StringType))) case p @ BinaryComparison(left @ TimestampType(), right @ StringType()) => p.makeCopy(Array(Cast(left, StringType), right)) // Comparisons between dates and timestamps. case p @ BinaryComparison(left @ TimestampType(), right @ DateType()) => p.makeCopy(Array(Cast(left, StringType), Cast(right, StringType))) case p @ BinaryComparison(left @ DateType(), right @ TimestampType()) => p.makeCopy(Array(Cast(left, StringType), Cast(right, StringType))) > Timestamp to String casting is slowing the query significantly > -- > > Key: SPARK-19145 > URL: https://issues.apache.org/jira/browse/SPARK-19145 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.0 >Reporter: gagan taneja > > i have a time series table with timestamp column > Following query > SELECT COUNT(*) AS `count` >FROM `default`.`table` >WHERE `time` >= '2017-01-02 19:53:51' > AND `time` <= '2017-01-09 19:53:51' LIMIT 5 > is significantly SLOWER than > SELECT COUNT(*) AS `count` > FROM `default`.`table` > WHERE `time` >= to_utc_timestamp('2017-01-02 19:53:51','-MM-DD > HH24:MI:SS−0800') > AND `time` <= to_utc_timestamp('2017-01-09 19:53:51','-MM-DD > HH24:MI:SS−0800') LIMIT 5 > After investigation i found that in the first query time colum is cast to > String before applying the filter > However in the second query no such casting is performed and its a filter > with long value > Below are the generate Physical plan for slower execution followed by > physical plan for faster execution > SELECT COUNT(*) AS `count` >FROM `default`.`table` >WHERE `time` >= '2017-01-02 19:53:51' > AND `time` <= '2017-01-09 19:53:51' LIMIT 5 > == Physical Plan == > CollectLimit 5 > +- *HashAggregate(keys=[], functions=[count(1)], output=[count#3290L]) >+- Exchange SinglePartition > +- *HashAggregate(keys=[], functions=[partial_count(1)], > output=[count#3339L]) > +- *Project > +- *Filter ((isnotnull(time#3314) && (cast(time#3314 as string) > >= 2017-01-02 19:53:51)) && (cast(time#3314 as string) <= 2017-01-09 > 19:53:51)) >+- *FileScan parquet default.cstat[time#3314] Batched: true, > Format: Parquet, Location: > InMemoryFileIndex[hdfs://10.65.55.220/user/spark/spark-warehouse/cstat], > PartitionFilters: [], PushedFilters: [IsNotNull(time)], ReadSchema: > struct > SELECT COUNT(*) AS `count` > FROM `default`.`table` > WHERE `time` >= to_utc_timestamp('2017-01-02 19:53:51','-MM-DD > HH24:MI:SS−0800') > AND `time` <= to_utc_timestamp('2017-01-09 19:53:51','-MM-DD > HH24:MI:SS−0800') LIMIT 5 > == Physical Plan == > CollectLimit 5 > +- *HashAggregate(keys=[], functions=[count(1)], output=[count#3238L]) >+- Exchange SinglePartition > +- *HashAggregate(keys=[], functions=[partial_count(1)], > output=[count#3287L]) > +- *Project > +- *Filter ((isnotnull(time#3262) && (time#3262 >= > 148340483100)) && (time#3262 <= 148400963100)) >+- *FileScan parquet default.cstat[time#3262] Batched: true, > Format: Parquet, Location: > InMemoryFileIndex[hdfs://10.65.55.220/user/spark/spark-warehouse/cstat], > PartitionFilters: [], PushedFilters: [IsNotNull(time), > GreaterThanOrEqual(time,2017-01-02 19:53:51.0), > LessThanOrEqual(time,2017-01-09..., ReadSchema: struct > In Impala both query run efficiently without and performance difference > Spark should be able to parse the Date string and convert to
[jira] [Commented] (SPARK-19541) High Availability support for ThriftServer
[ https://issues.apache.org/jira/browse/SPARK-19541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15896032#comment-15896032 ] gagan taneja commented on SPARK-19541: -- This would a great improvement as we are also leveraging Thriftserver for JDBC/ODBC connectivity > High Availability support for ThriftServer > -- > > Key: SPARK-19541 > URL: https://issues.apache.org/jira/browse/SPARK-19541 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 2.0.2, 2.1.0 >Reporter: LvDongrong > > Currently, We use the spark ThriftServer frequently, and there are many > connects between the client and only ThriftServer.When the ThriftServer is > down ,we cannot get the service again until the server is restarted .So we > need to consider the ThriftServer HA as well as HiveServer HA. For > ThriftServer, we want to import the pattern of HiveServer HA to provide > ThriftServer HA. Therefore, we need to start multiple thrift server which > register on the zookeeper. Then the client can find the thrift server by > just connecting to the zookeeper.So the beeline can get the service from > other thrift server when one is down. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-19705) Preferred location supporting HDFS Cache for FileScanRDD
[ https://issues.apache.org/jira/browse/SPARK-19705?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] gagan taneja updated SPARK-19705: - Shepherd: Herman van Hovell > Preferred location supporting HDFS Cache for FileScanRDD > > > Key: SPARK-19705 > URL: https://issues.apache.org/jira/browse/SPARK-19705 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.0 >Reporter: gagan taneja > > Although NewHadoopRDD and HadoopRdd considers HDFS cache while calculating > preferredLocations, FileScanRDD do not take into account HDFS cache while > calculating preferredLocations > The enhancement can be easily implemented for large files where FilePartition > only contains single HDFS file > The enhancement will also result in significant performance improvement for > cached hdfs partitions -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19705) Preferred location supporting HDFS Cache for FileScanRDD
[ https://issues.apache.org/jira/browse/SPARK-19705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15896026#comment-15896026 ] gagan taneja commented on SPARK-19705: -- Herman, Can you help me with this enhancement thanks > Preferred location supporting HDFS Cache for FileScanRDD > > > Key: SPARK-19705 > URL: https://issues.apache.org/jira/browse/SPARK-19705 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.0 >Reporter: gagan taneja > > Although NewHadoopRDD and HadoopRdd considers HDFS cache while calculating > preferredLocations, FileScanRDD do not take into account HDFS cache while > calculating preferredLocations > The enhancement can be easily implemented for large files where FilePartition > only contains single HDFS file > The enhancement will also result in significant performance improvement for > cached hdfs partitions -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-19705) Preferred location supporting HDFS Cache for FileScanRDD
gagan taneja created SPARK-19705: Summary: Preferred location supporting HDFS Cache for FileScanRDD Key: SPARK-19705 URL: https://issues.apache.org/jira/browse/SPARK-19705 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.1.0 Reporter: gagan taneja Although NewHadoopRDD and HadoopRdd considers HDFS cache while calculating preferredLocations, FileScanRDD do not take into account HDFS cache while calculating preferredLocations The enhancement can be easily implemented for large files where FilePartition only contains single HDFS file The enhancement will also result in significant performance improvement for cached hdfs partitions -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-13219) Pushdown predicate propagation in SparkSQL with join
[ https://issues.apache.org/jira/browse/SPARK-13219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15872811#comment-15872811 ] gagan taneja commented on SPARK-13219: -- This is what we are looking for For example Table Address is partitioned based on postal_code Table Location which contain location_name and potal_code Query SELECT * FROM address JOIN location ON postal_code WHERE location_name = 'San Jose' If the query is re-written as a in clause the optimizer will be able to prune the partitions which would be significantly faster > Pushdown predicate propagation in SparkSQL with join > > > Key: SPARK-13219 > URL: https://issues.apache.org/jira/browse/SPARK-13219 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 1.4.1, 1.5.2, 1.6.0 > Environment: Spark 1.4 > Datastax Spark connector 1.4 > Cassandra. 2.1.12 > Centos 6.6 >Reporter: Abhinav Chawade > > When 2 or more tables are joined in SparkSQL and there is an equality clause > in query on attributes used to perform the join, it is useful to apply that > clause on scans for both table. If this is not done, one of the tables > results in full scan which can reduce the query dramatically. Consider > following example with 2 tables being joined. > {code} > CREATE TABLE assets ( > assetid int PRIMARY KEY, > address text, > propertyname text > ) > CREATE TABLE tenants ( > assetid int PRIMARY KEY, > name text > ) > spark-sql> explain select t.name from tenants t, assets a where a.assetid = > t.assetid and t.assetid='1201'; > WARN 2016-02-05 23:05:19 org.apache.hadoop.util.NativeCodeLoader: Unable to > load native-hadoop library for your platform... using builtin-java classes > where applicable > == Physical Plan == > Project [name#14] > ShuffledHashJoin [assetid#13], [assetid#15], BuildRight > Exchange (HashPartitioning 200) >Filter (CAST(assetid#13, DoubleType) = 1201.0) > HiveTableScan [assetid#13,name#14], (MetastoreRelation element, tenants, > Some(t)), None > Exchange (HashPartitioning 200) >HiveTableScan [assetid#15], (MetastoreRelation element, assets, Some(a)), > None > Time taken: 1.354 seconds, Fetched 8 row(s) > {code} > The simple workaround is to add another equality condition for each table but > it becomes cumbersome. It will be helpful if the query planner could improve > filter propagation. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-13219) Pushdown predicate propagation in SparkSQL with join
[ https://issues.apache.org/jira/browse/SPARK-13219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15872809#comment-15872809 ] gagan taneja commented on SPARK-13219: -- Venu This is very interesting i would like to look at the code for all the optimization that are in-place Do you have plans to contribute is back to spark > Pushdown predicate propagation in SparkSQL with join > > > Key: SPARK-13219 > URL: https://issues.apache.org/jira/browse/SPARK-13219 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 1.4.1, 1.5.2, 1.6.0 > Environment: Spark 1.4 > Datastax Spark connector 1.4 > Cassandra. 2.1.12 > Centos 6.6 >Reporter: Abhinav Chawade > > When 2 or more tables are joined in SparkSQL and there is an equality clause > in query on attributes used to perform the join, it is useful to apply that > clause on scans for both table. If this is not done, one of the tables > results in full scan which can reduce the query dramatically. Consider > following example with 2 tables being joined. > {code} > CREATE TABLE assets ( > assetid int PRIMARY KEY, > address text, > propertyname text > ) > CREATE TABLE tenants ( > assetid int PRIMARY KEY, > name text > ) > spark-sql> explain select t.name from tenants t, assets a where a.assetid = > t.assetid and t.assetid='1201'; > WARN 2016-02-05 23:05:19 org.apache.hadoop.util.NativeCodeLoader: Unable to > load native-hadoop library for your platform... using builtin-java classes > where applicable > == Physical Plan == > Project [name#14] > ShuffledHashJoin [assetid#13], [assetid#15], BuildRight > Exchange (HashPartitioning 200) >Filter (CAST(assetid#13, DoubleType) = 1201.0) > HiveTableScan [assetid#13,name#14], (MetastoreRelation element, tenants, > Some(t)), None > Exchange (HashPartitioning 200) >HiveTableScan [assetid#15], (MetastoreRelation element, assets, Some(a)), > None > Time taken: 1.354 seconds, Fetched 8 row(s) > {code} > The simple workaround is to add another equality condition for each table but > it becomes cumbersome. It will be helpful if the query planner could improve > filter propagation. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19609) Broadcast joins should pushdown join constraints as Filter to the larger relation
[ https://issues.apache.org/jira/browse/SPARK-19609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15872797#comment-15872797 ] gagan taneja commented on SPARK-19609: -- This can be further extended to join on the column that are also partitioned column For example Table Address is partitioned based on postal_code Table Location which contain location_name and potal_code Query SELECT * FROM address JOIN location ON postal_code WHERE location_name = 'San Jose' If the query is re-written as a in clause the optimizer will be able to prune the partitions which would be significantly faster > Broadcast joins should pushdown join constraints as Filter to the larger > relation > - > > Key: SPARK-19609 > URL: https://issues.apache.org/jira/browse/SPARK-19609 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.1.0 >Reporter: Nick Dimiduk > > For broadcast inner-joins, where the smaller relation is known to be small > enough to materialize on a worker, the set of values for all join columns is > known and fits in memory. Spark should translate these values into a > {{Filter}} pushed down to the datasource. The common join condition of > equality, i.e. {{lhs.a == rhs.a}}, can be written as an {{a in ...}} clause. > An example of pushing such filters is already present in the form of > {{IsNotNull}} filters via [~sameerag]'s work on SPARK-12957 subtasks. > This optimization could even work when the smaller relation does not fit > entirely in memory. This could be done by partitioning the smaller relation > into N pieces, applying this predicate pushdown for each piece, and unioning > the results. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19145) Timestamp to String casting is slowing the query significantly
[ https://issues.apache.org/jira/browse/SPARK-19145?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15814165#comment-15814165 ] gagan taneja commented on SPARK-19145: -- i should be able to work on a proposal for the fix > Timestamp to String casting is slowing the query significantly > -- > > Key: SPARK-19145 > URL: https://issues.apache.org/jira/browse/SPARK-19145 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.0 >Reporter: gagan taneja > > i have a time series table with timestamp column > Following query > SELECT COUNT(*) AS `count` >FROM `default`.`table` >WHERE `time` >= '2017-01-02 19:53:51' > AND `time` <= '2017-01-09 19:53:51' LIMIT 5 > is significantly SLOWER than > SELECT COUNT(*) AS `count` > FROM `default`.`table` > WHERE `time` >= to_utc_timestamp('2017-01-02 19:53:51','-MM-DD > HH24:MI:SS−0800') > AND `time` <= to_utc_timestamp('2017-01-09 19:53:51','-MM-DD > HH24:MI:SS−0800') LIMIT 5 > After investigation i found that in the first query time colum is cast to > String before applying the filter > However in the second query no such casting is performed and its a filter > with long value > Below are the generate Physical plan for slower execution followed by > physical plan for faster execution > SELECT COUNT(*) AS `count` >FROM `default`.`table` >WHERE `time` >= '2017-01-02 19:53:51' > AND `time` <= '2017-01-09 19:53:51' LIMIT 5 > == Physical Plan == > CollectLimit 5 > +- *HashAggregate(keys=[], functions=[count(1)], output=[count#3290L]) >+- Exchange SinglePartition > +- *HashAggregate(keys=[], functions=[partial_count(1)], > output=[count#3339L]) > +- *Project > +- *Filter ((isnotnull(time#3314) && (cast(time#3314 as string) > >= 2017-01-02 19:53:51)) && (cast(time#3314 as string) <= 2017-01-09 > 19:53:51)) >+- *FileScan parquet default.cstat[time#3314] Batched: true, > Format: Parquet, Location: > InMemoryFileIndex[hdfs://10.65.55.220/user/spark/spark-warehouse/cstat], > PartitionFilters: [], PushedFilters: [IsNotNull(time)], ReadSchema: > struct > SELECT COUNT(*) AS `count` > FROM `default`.`table` > WHERE `time` >= to_utc_timestamp('2017-01-02 19:53:51','-MM-DD > HH24:MI:SS−0800') > AND `time` <= to_utc_timestamp('2017-01-09 19:53:51','-MM-DD > HH24:MI:SS−0800') LIMIT 5 > == Physical Plan == > CollectLimit 5 > +- *HashAggregate(keys=[], functions=[count(1)], output=[count#3238L]) >+- Exchange SinglePartition > +- *HashAggregate(keys=[], functions=[partial_count(1)], > output=[count#3287L]) > +- *Project > +- *Filter ((isnotnull(time#3262) && (time#3262 >= > 148340483100)) && (time#3262 <= 148400963100)) >+- *FileScan parquet default.cstat[time#3262] Batched: true, > Format: Parquet, Location: > InMemoryFileIndex[hdfs://10.65.55.220/user/spark/spark-warehouse/cstat], > PartitionFilters: [], PushedFilters: [IsNotNull(time), > GreaterThanOrEqual(time,2017-01-02 19:53:51.0), > LessThanOrEqual(time,2017-01-09..., ReadSchema: struct > In Impala both query run efficiently without and performance difference > Spark should be able to parse the Date string and convert to Long/Timestamp > during generation of Optimized Logical Plan so that both the query would have > similar performance -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-19145) Timestamp to String casting is slowing the query significantly
gagan taneja created SPARK-19145: Summary: Timestamp to String casting is slowing the query significantly Key: SPARK-19145 URL: https://issues.apache.org/jira/browse/SPARK-19145 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.1.0 Reporter: gagan taneja i have a time series table with timestamp column Following query SELECT COUNT(*) AS `count` FROM `default`.`table` WHERE `time` >= '2017-01-02 19:53:51' AND `time` <= '2017-01-09 19:53:51' LIMIT 5 is significantly SLOWER than SELECT COUNT(*) AS `count` FROM `default`.`table` WHERE `time` >= to_utc_timestamp('2017-01-02 19:53:51','-MM-DD HH24:MI:SS−0800') AND `time` <= to_utc_timestamp('2017-01-09 19:53:51','-MM-DD HH24:MI:SS−0800') LIMIT 5 After investigation i found that in the first query time colum is cast to String before applying the filter However in the second query no such casting is performed and its a filter with long value Below are the generate Physical plan for slower execution followed by physical plan for faster execution SELECT COUNT(*) AS `count` FROM `default`.`table` WHERE `time` >= '2017-01-02 19:53:51' AND `time` <= '2017-01-09 19:53:51' LIMIT 5 == Physical Plan == CollectLimit 5 +- *HashAggregate(keys=[], functions=[count(1)], output=[count#3290L]) +- Exchange SinglePartition +- *HashAggregate(keys=[], functions=[partial_count(1)], output=[count#3339L]) +- *Project +- *Filter ((isnotnull(time#3314) && (cast(time#3314 as string) >= 2017-01-02 19:53:51)) && (cast(time#3314 as string) <= 2017-01-09 19:53:51)) +- *FileScan parquet default.cstat[time#3314] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://10.65.55.220/user/spark/spark-warehouse/cstat], PartitionFilters: [], PushedFilters: [IsNotNull(time)], ReadSchema: struct SELECT COUNT(*) AS `count` FROM `default`.`table` WHERE `time` >= to_utc_timestamp('2017-01-02 19:53:51','-MM-DD HH24:MI:SS−0800') AND `time` <= to_utc_timestamp('2017-01-09 19:53:51','-MM-DD HH24:MI:SS−0800') LIMIT 5 == Physical Plan == CollectLimit 5 +- *HashAggregate(keys=[], functions=[count(1)], output=[count#3238L]) +- Exchange SinglePartition +- *HashAggregate(keys=[], functions=[partial_count(1)], output=[count#3287L]) +- *Project +- *Filter ((isnotnull(time#3262) && (time#3262 >= 148340483100)) && (time#3262 <= 148400963100)) +- *FileScan parquet default.cstat[time#3262] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://10.65.55.220/user/spark/spark-warehouse/cstat], PartitionFilters: [], PushedFilters: [IsNotNull(time), GreaterThanOrEqual(time,2017-01-02 19:53:51.0), LessThanOrEqual(time,2017-01-09..., ReadSchema: struct In Impala both query run efficiently without and performance difference Spark should be able to parse the Date string and convert to Long/Timestamp during generation of Optimized Logical Plan so that both the query would have similar performance -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19118) Percentile support for frequency distribution table
[ https://issues.apache.org/jira/browse/SPARK-19118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15808173#comment-15808173 ] gagan taneja commented on SPARK-19118: -- I had filed original JIRA to provide the functionality for both Percentile and Approx Percentile After review the code i realized that both the function implementations very different and it would be best to divide them into two sub tasks I have created one sub task for adding support in Percentile function For implementing the functionality in Approx Percentile i would need to work with Developer to find the right approach and make the code changes. This is tracked with subtask https://issues.apache.org/jira/browse/SPARK-19119 > Percentile support for frequency distribution table > --- > > Key: SPARK-19118 > URL: https://issues.apache.org/jira/browse/SPARK-19118 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.0.2 >Reporter: gagan taneja > -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-19119) Approximate percentile support for frequency distribution table
gagan taneja created SPARK-19119: Summary: Approximate percentile support for frequency distribution table Key: SPARK-19119 URL: https://issues.apache.org/jira/browse/SPARK-19119 Project: Spark Issue Type: Sub-task Components: SQL Affects Versions: 2.0.2 Reporter: gagan taneja -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-19118) Percentile support for frequency distribution table
gagan taneja created SPARK-19118: Summary: Percentile support for frequency distribution table Key: SPARK-19118 URL: https://issues.apache.org/jira/browse/SPARK-19118 Project: Spark Issue Type: Sub-task Components: SQL Affects Versions: 2.0.2 Reporter: gagan taneja -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18940) Percentile and approximate percentile support for frequency distribution table
[ https://issues.apache.org/jira/browse/SPARK-18940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15764702#comment-15764702 ] gagan taneja commented on SPARK-18940: -- i am working on the fix. Should be able to make the changes and send it for review soon > Percentile and approximate percentile support for frequency distribution table > -- > > Key: SPARK-18940 > URL: https://issues.apache.org/jira/browse/SPARK-18940 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.0.2 >Reporter: gagan taneja > > I have a frequency distribution table with following entries > {noformat} > Age,No of person > 21, 10 > 22, 15 > 23, 18 > .. > .. > 30, 14 > {noformat} > Moreover it is common to have data in frequency distribution format to > further calculate Percentile, Median. With current implementation > It would be very difficult and complex to find the percentile. > Therefore i am proposing enhancement to current Percentile and Approx > Percentile implementation to take frequency distribution column into > consideration > Current Percentile definition > {noformat} > percentile(col, array(percentage1 [, percentage2]...)) > case class Percentile( > child: Expression, > percentageExpression: Expression, > mutableAggBufferOffset: Int = 0, > inputAggBufferOffset: Int = 0) { >def this(child: Expression, percentageExpression: Expression) = { > this(child, percentageExpression, 0, 0) > } > } > {noformat} > Proposed changes > {noformat} > percentile(col, [frequency], array(percentage1 [, percentage2]...)) > case class Percentile( > child: Expression, > frequency : Expression, > percentageExpression: Expression, > mutableAggBufferOffset: Int = 0, > inputAggBufferOffset: Int = 0) { >def this(child: Expression, percentageExpression: Expression) = { > this(child, Literal(1L), percentageExpression, 0, 0) > } > def this(child: Expression, frequency : Expression, percentageExpression: > Expression) = { > this(child, frequency, percentageExpression, 0, 0) > } > } > {noformat} > Although this definition will differ from hive implementation, it will be > useful functionality to many spark user. > Moreover the changes are local to only Percentile and ApproxPercentile > implementation -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-18940) Percentile and approximate percentile support for frequency distribution table
[ https://issues.apache.org/jira/browse/SPARK-18940?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] gagan taneja updated SPARK-18940: - Shepherd: Herman van Hovell > Percentile and approximate percentile support for frequency distribution table > -- > > Key: SPARK-18940 > URL: https://issues.apache.org/jira/browse/SPARK-18940 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.0.2 >Reporter: gagan taneja > > I have a frequency distribution table with following entries > {noformat} > Age,No of person > 21, 10 > 22, 15 > 23, 18 > .. > .. > 30, 14 > {noformat} > Moreover it is common to have data in frequency distribution format to > further calculate Percentile, Median. With current implementation > It would be very difficult and complex to find the percentile. > Therefore i am proposing enhancement to current Percentile and Approx > Percentile implementation to take frequency distribution column into > consideration > Current Percentile definition > {noformat} > percentile(col, array(percentage1 [, percentage2]...)) > case class Percentile( > child: Expression, > percentageExpression: Expression, > mutableAggBufferOffset: Int = 0, > inputAggBufferOffset: Int = 0) { >def this(child: Expression, percentageExpression: Expression) = { > this(child, percentageExpression, 0, 0) > } > } > {noformat} > Proposed changes > {noformat} > percentile(col, [frequency], array(percentage1 [, percentage2]...)) > case class Percentile( > child: Expression, > frequency : Expression, > percentageExpression: Expression, > mutableAggBufferOffset: Int = 0, > inputAggBufferOffset: Int = 0) { >def this(child: Expression, percentageExpression: Expression) = { > this(child, Literal(1L), percentageExpression, 0, 0) > } > def this(child: Expression, frequency : Expression, percentageExpression: > Expression) = { > this(child, frequency, percentageExpression, 0, 0) > } > } > {noformat} > Although this definition will differ from hive implementation, it will be > useful functionality to many spark user. > Moreover the changes are local to only Percentile and ApproxPercentile > implementation -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-18940) Percentile and approximate percentile support for frequency distribution table
gagan taneja created SPARK-18940: Summary: Percentile and approximate percentile support for frequency distribution table Key: SPARK-18940 URL: https://issues.apache.org/jira/browse/SPARK-18940 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.0.2 Reporter: gagan taneja I have a frequency distribution table with following entries Age,No of person 21, 10 22, 15 23, 18 .. .. 30, 14 Moreover it is common to have data in frequency distribution format to further calculate Percentile, Median. With current implementation It would be very difficult and complex to find the percentile. Therefore i am proposing enhancement to current Percentile and Approx Percentile implementation to take frequency distribution column into consideration Current Percentile definition percentile(col, array(percentage1 [, percentage2]...)) case class Percentile( child: Expression, percentageExpression: Expression, mutableAggBufferOffset: Int = 0, inputAggBufferOffset: Int = 0) { def this(child: Expression, percentageExpression: Expression) = { this(child, percentageExpression, 0, 0) } } Proposed changes percentile(col, [frequency], array(percentage1 [, percentage2]...)) case class Percentile( child: Expression, frequency : Expression, percentageExpression: Expression, mutableAggBufferOffset: Int = 0, inputAggBufferOffset: Int = 0) { def this(child: Expression, percentageExpression: Expression) = { this(child, Literal(1L), percentageExpression, 0, 0) } def this(child: Expression, frequency : Expression, percentageExpression: Expression) = { this(child, frequency, percentageExpression, 0, 0) } } Although this definition will differ from hive implementation, it will be useful functionality to many spark user. Moreover the changes are local to only Percentile and ApproxPercentile implementation -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-15345) SparkSession's conf doesn't take effect when there's already an existing SparkContext
[ https://issues.apache.org/jira/browse/SPARK-15345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15414544#comment-15414544 ] gagan taneja commented on SPARK-15345: -- i was annoyed by this bug but fortunately found the workaround it will work fine if you start the server with export HADOOP_CONF_DIR=/etc/hadoop/conf:/etc/hive/conf bin/spark-shell It would give trouble if HIVE_CONF_DIR is as below export HADOOP_CONF_DIR=/etc/hadoop/conf export HIVE_CONF_DIR=/etc/hive/conf bin/spark-shell > SparkSession's conf doesn't take effect when there's already an existing > SparkContext > - > > Key: SPARK-15345 > URL: https://issues.apache.org/jira/browse/SPARK-15345 > Project: Spark > Issue Type: Bug > Components: PySpark, SQL >Reporter: Piotr Milanowski >Assignee: Reynold Xin >Priority: Blocker > Fix For: 2.0.0 > > > I am working with branch-2.0, spark is compiled with hive support (-Phive and > -Phvie-thriftserver). > I am trying to access databases using this snippet: > {code} > from pyspark.sql import HiveContext > hc = HiveContext(sc) > hc.sql("show databases").collect() > [Row(result='default')] > {code} > This means that spark doesn't find any databases specified in configuration. > Using the same configuration (i.e. hive-site.xml and core-site.xml) in spark > 1.6, and launching above snippet, I can print out existing databases. > When run in DEBUG mode this is what spark (2.0) prints out: > {code} > 16/05/16 12:17:47 INFO SparkSqlParser: Parsing command: show databases > 16/05/16 12:17:47 DEBUG SimpleAnalyzer: > === Result of Batch Resolution === > !'Project [unresolveddeserializer(createexternalrow(if (isnull(input[0, > string])) null else input[0, string].toString, > StructField(result,StringType,false)), result#2) AS #3] Project > [createexternalrow(if (isnull(result#2)) null else result#2.toString, > StructField(result,StringType,false)) AS #3] > +- LocalRelation [result#2] > > +- LocalRelation [result#2] > > 16/05/16 12:17:47 DEBUG ClosureCleaner: +++ Cleaning closure > (org.apache.spark.sql.Dataset$$anonfun$53) +++ > 16/05/16 12:17:47 DEBUG ClosureCleaner: + declared fields: 2 > 16/05/16 12:17:47 DEBUG ClosureCleaner: public static final long > org.apache.spark.sql.Dataset$$anonfun$53.serialVersionUID > 16/05/16 12:17:47 DEBUG ClosureCleaner: private final > org.apache.spark.sql.types.StructType > org.apache.spark.sql.Dataset$$anonfun$53.structType$1 > 16/05/16 12:17:47 DEBUG ClosureCleaner: + declared methods: 2 > 16/05/16 12:17:47 DEBUG ClosureCleaner: public final java.lang.Object > org.apache.spark.sql.Dataset$$anonfun$53.apply(java.lang.Object) > 16/05/16 12:17:47 DEBUG ClosureCleaner: public final java.lang.Object > org.apache.spark.sql.Dataset$$anonfun$53.apply(org.apache.spark.sql.catalyst.InternalRow) > 16/05/16 12:17:47 DEBUG ClosureCleaner: + inner classes: 0 > 16/05/16 12:17:47 DEBUG ClosureCleaner: + outer classes: 0 > 16/05/16 12:17:47 DEBUG ClosureCleaner: + outer objects: 0 > 16/05/16 12:17:47 DEBUG ClosureCleaner: + populating accessed fields because > this is the starting closure > 16/05/16 12:17:47 DEBUG ClosureCleaner: + fields accessed by starting > closure: 0 > 16/05/16 12:17:47 DEBUG ClosureCleaner: + there are no enclosing objects! > 16/05/16 12:17:47 DEBUG ClosureCleaner: +++ closure > (org.apache.spark.sql.Dataset$$anonfun$53) is now cleaned +++ > 16/05/16 12:17:47 DEBUG ClosureCleaner: +++ Cleaning closure > (org.apache.spark.sql.execution.python.EvaluatePython$$anonfun$javaToPython$1) > +++ > 16/05/16 12:17:47 DEBUG ClosureCleaner: + declared fields: 1 > 16/05/16 12:17:47 DEBUG ClosureCleaner: public static final long > org.apache.spark.sql.execution.python.EvaluatePython$$anonfun$javaToPython$1.serialVersionUID > 16/05/16 12:17:47 DEBUG ClosureCleaner: + declared methods: 2 > 16/05/16 12:17:47 DEBUG ClosureCleaner: public final java.lang.Object > org.apache.spark.sql.execution.python.EvaluatePython$$anonfun$javaToPython$1.apply(java.lang.Object) > 16/05/16 12:17:47 DEBUG ClosureCleaner: public final > org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler > org.apache.spark.sql.execution.python.EvaluatePython$$anonfun$javaToPython$1.apply(scala.collection.Iterator) > 16/05/16 12:17:47 DEBUG ClosureCleaner: + inner classes: 0 > 16/05/16 12:17:47 DEBUG ClosureCleaner: + outer classes: 0 > 16/05/16 12:17:47 DEBUG ClosureCleaner: + outer objects: 0 > 16/05/16 12:17:47 DEBUG ClosureCleaner: + populating accessed fields because > this is the starting closure > 16/05/16 12:17:47
[jira] [Created] (SPARK-5292) optimize join for table that are already sharded/support for hive bucket
gagan taneja created SPARK-5292: --- Summary: optimize join for table that are already sharded/support for hive bucket Key: SPARK-5292 URL: https://issues.apache.org/jira/browse/SPARK-5292 Project: Spark Issue Type: New Feature Components: SQL Affects Versions: 1.2.0 Reporter: gagan taneja Currently join do not consider the locality of the data and perform the shuffle anyway If the user takes the responsilbity of distributing the data based on some hash or shared the data, spark join should be able to leverage sharding to optimize join calculation/eliminate shuffle -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org