[jira] [Updated] (SPARK-46359) add optional batch id _spark_metadata sink file status

2023-12-10 Thread gagan taneja (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-46359?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

gagan taneja updated SPARK-46359:
-
Summary: add optional batch id _spark_metadata sink file status   (was: add 
optional batch id to sink file status )

> add optional batch id _spark_metadata sink file status 
> ---
>
> Key: SPARK-46359
> URL: https://issues.apache.org/jira/browse/SPARK-46359
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 3.5.0
>Reporter: gagan taneja
>Priority: Major
>
> Currently FileStreamSinkLog use SinkFileStatus class for serialization 
> purpose but it does preserve batch id which can be used effectively for 
> snapshots
> Change the definition to 
> {code}
> case class SinkFileStatus (path : String, size : Long, isDir : Boolean, 
> modificationTime : Long, blockReplication: Int, blockSize: Long, action : 
> String, bactchId : Option[Long])
> {code} 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-46359) add optional batch id to sink file status

2023-12-10 Thread gagan taneja (Jira)
gagan taneja created SPARK-46359:


 Summary: add optional batch id to sink file status 
 Key: SPARK-46359
 URL: https://issues.apache.org/jira/browse/SPARK-46359
 Project: Spark
  Issue Type: Improvement
  Components: Structured Streaming
Affects Versions: 3.5.0
Reporter: gagan taneja


Currently FileStreamSinkLog use SinkFileStatus class for serialization purpose 
but it does preserve batch id which can be used effectively for snapshots

Change the definition to 

{code}

case class SinkFileStatus (path : String, size : Long, isDir : Boolean, 
modificationTime : Long, blockReplication: Int, blockSize: Long, action : 
String, bactchId : Option[Long])

{code} 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24437) Memory leak in UnsafeHashedRelation

2018-06-03 Thread gagan taneja (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16499733#comment-16499733
 ] 

gagan taneja commented on SPARK-24437:
--

I have already set the values  of spark.cleaner.periodicGC.interval to 5 min

Which branch did you try to reproduce this issue. There are many change made in 
this area to address some other leaks. Can you try it out with 2.2 branch. If 
its reproducible in 2.2 then we know for sure that we have a way to reproduce 
it and the issue is resolved in post 2.2 fixes  

> Memory leak in UnsafeHashedRelation
> ---
>
> Key: SPARK-24437
> URL: https://issues.apache.org/jira/browse/SPARK-24437
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: gagan taneja
>Priority: Critical
> Attachments: Screen Shot 2018-05-30 at 2.05.40 PM.png, Screen Shot 
> 2018-05-30 at 2.07.22 PM.png
>
>
> There seems to memory leak with 
> org.apache.spark.sql.execution.joins.UnsafeHashedRelation
> We have a long running instance of STS.
> With each query execution requiring Broadcast Join, UnsafeHashedRelation is 
> getting added for cleanup in ContextCleaner. This reference of 
> UnsafeHashedRelation is being held at some other Collection and not becoming 
> eligible for GC and because of this ContextCleaner is not able to clean it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24437) Memory leak in UnsafeHashedRelation

2018-05-31 Thread gagan taneja (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16496866#comment-16496866
 ] 

gagan taneja commented on SPARK-24437:
--

No Dynamic allocation. Also this is an issue with Driver where Broadcast is not 
Garbage collected.

By any chance do  you know which other Collection is holding reference to this 
Broadcast. Also do you have a simple way of reproducing this issue

> Memory leak in UnsafeHashedRelation
> ---
>
> Key: SPARK-24437
> URL: https://issues.apache.org/jira/browse/SPARK-24437
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: gagan taneja
>Priority: Critical
> Attachments: Screen Shot 2018-05-30 at 2.05.40 PM.png, Screen Shot 
> 2018-05-30 at 2.07.22 PM.png
>
>
> There seems to memory leak with 
> org.apache.spark.sql.execution.joins.UnsafeHashedRelation
> We have a long running instance of STS.
> With each query execution requiring Broadcast Join, UnsafeHashedRelation is 
> getting added for cleanup in ContextCleaner. This reference of 
> UnsafeHashedRelation is being held at some other Collection and not becoming 
> eligible for GC and because of this ContextCleaner is not able to clean it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24437) Memory leak in UnsafeHashedRelation

2018-05-30 Thread gagan taneja (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

gagan taneja updated SPARK-24437:
-
Attachment: Screen Shot 2018-05-30 at 2.07.22 PM.png

> Memory leak in UnsafeHashedRelation
> ---
>
> Key: SPARK-24437
> URL: https://issues.apache.org/jira/browse/SPARK-24437
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: gagan taneja
>Priority: Critical
> Attachments: Screen Shot 2018-05-30 at 2.05.40 PM.png, Screen Shot 
> 2018-05-30 at 2.07.22 PM.png
>
>
> There seems to memory leak with 
> org.apache.spark.sql.execution.joins.UnsafeHashedRelation
> We have a long running instance of STS.
> With each query execution requiring Broadcast Join, UnsafeHashedRelation is 
> getting added for cleanup in ContextCleaner. This reference of 
> UnsafeHashedRelation is being held at some other Collection and not becoming 
> eligible for GC and because of this ContextCleaner is not able to clean it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24437) Memory leak in UnsafeHashedRelation

2018-05-30 Thread gagan taneja (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

gagan taneja updated SPARK-24437:
-
Attachment: Screen Shot 2018-05-30 at 2.05.40 PM.png

> Memory leak in UnsafeHashedRelation
> ---
>
> Key: SPARK-24437
> URL: https://issues.apache.org/jira/browse/SPARK-24437
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: gagan taneja
>Priority: Critical
> Attachments: Screen Shot 2018-05-30 at 2.05.40 PM.png
>
>
> There seems to memory leak with 
> org.apache.spark.sql.execution.joins.UnsafeHashedRelation
> We have a long running instance of STS.
> With each query execution requiring Broadcast Join, UnsafeHashedRelation is 
> getting added for cleanup in ContextCleaner. This reference of 
> UnsafeHashedRelation is being held at some other Collection and not becoming 
> eligible for GC and because of this ContextCleaner is not able to clean it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24437) Memory leak in UnsafeHashedRelation

2018-05-30 Thread gagan taneja (JIRA)
gagan taneja created SPARK-24437:


 Summary: Memory leak in UnsafeHashedRelation
 Key: SPARK-24437
 URL: https://issues.apache.org/jira/browse/SPARK-24437
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.2.0
Reporter: gagan taneja


There seems to memory leak with 
org.apache.spark.sql.execution.joins.UnsafeHashedRelation

We have a long running instance of STS.

With each query execution requiring Broadcast Join, UnsafeHashedRelation is 
getting added for cleanup in ContextCleaner. This reference of 
UnsafeHashedRelation is being held at some other Collection and not becoming 
eligible for GC and because of this ContextCleaner is not able to clean it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-21760) Structured streaming terminates with Exception

2017-08-17 Thread gagan taneja (JIRA)
gagan taneja created SPARK-21760:


 Summary: Structured streaming terminates with Exception 
 Key: SPARK-21760
 URL: https://issues.apache.org/jira/browse/SPARK-21760
 Project: Spark
  Issue Type: Bug
  Components: Structured Streaming
Affects Versions: 2.1.0
Reporter: gagan taneja
Priority: Critical


We have seen Structured stream stops with exception below 
While analyzing the content we found that latest log file as just one line with 
version 
{quote}hdfs dfs -cat warehouse/latency_internal/_spark_metadata/1683
v1
{quote}

Exception is below 
Exception in thread "stream execution thread for latency_internal [id = 
39f35d01-60d5-40b4-826e-99e5e38d0077, runId = 
95c95a01-bd4f-4604-8aae-c0c5d3e873e8]" java.lang.IllegalStateException: 
Incomplete log file
at 
org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.deserialize(CompactibleFileStreamLog.scala:147)
at 
org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.deserialize(CompactibleFileStreamLog.scala:42)
at 
org.apache.spark.sql.execution.streaming.HDFSMetadataLog.get(HDFSMetadataLog.scala:237)
at 
org.apache.spark.sql.execution.streaming.HDFSMetadataLog$$anonfun$getLatest$1.apply$mcVJ$sp(HDFSMetadataLog.scala:266)
at 
org.apache.spark.sql.execution.streaming.HDFSMetadataLog$$anonfun$getLatest$1.apply(HDFSMetadataLog.scala:265)
at 
org.apache.spark.sql.execution.streaming.HDFSMetadataLog$$anonfun$getLatest$1.apply(HDFSMetadataLog.scala:265)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofLong.foreach(ArrayOps.scala:246)
at 
org.apache.spark.sql.execution.streaming.HDFSMetadataLog.getLatest(HDFSMetadataLog.scala:265)
at 
org.apache.spark.sql.execution.streaming.FileStreamSource.(FileStreamSource.scala:60)
at 
org.apache.spark.sql.execution.datasources.DataSource.createSource(DataSource.scala:256)
at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$logicalPlan$1.applyOrElse(StreamExecution.scala:127)
at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$logicalPlan$1.applyOrElse(StreamExecution.scala:123)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:288)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:288)
at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:287)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:293)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:293)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:331)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:188)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:329)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:293)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:293)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:293)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:331)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:188)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:329)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:293)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:293)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:293)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:331)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:188)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:329)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:293)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:293)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:293)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:331)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:188)
at 

[jira] [Commented] (SPARK-19705) Preferred location supporting HDFS Cache for FileScanRDD

2017-03-24 Thread gagan taneja (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15940897#comment-15940897
 ] 

gagan taneja commented on SPARK-19705:
--

Sandy
Would you be able to help with this bug.

I saw you had filed earlier bug for HDFS Cache support to RDD and this is 
relatively minor change. We have been running this code in production and we 
are able to achieve major performance boost
Below is the reference to earlier bug filed by you 
https://issues.apache.org/jira/browse/SPARK-1767

> Preferred location supporting HDFS Cache for FileScanRDD
> 
>
> Key: SPARK-19705
> URL: https://issues.apache.org/jira/browse/SPARK-19705
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.0
>Reporter: gagan taneja
>
> Although NewHadoopRDD and HadoopRdd considers HDFS cache while calculating 
> preferredLocations, FileScanRDD do not take into account HDFS cache while 
> calculating preferredLocations
> The enhancement can be easily implemented for large files where FilePartition 
> only contains single HDFS file
> The enhancement will also result in significant performance improvement for 
> cached hdfs partitions



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19145) Timestamp to String casting is slowing the query significantly

2017-03-04 Thread gagan taneja (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19145?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15896061#comment-15896061
 ] 

gagan taneja commented on SPARK-19145:
--

I am suggesting following changes 
introduce the function to test perfactCast similar to 

private def perfectCast(expr: Literal, dataType: DataType): Boolean = {
  expr match {
case Literal(value, StringType) => scala.util.Try {
  Cast(expr, dataType).eval(null)
}.isSuccess
case _ => false
  }
}


And string promotion based on condition if input string can be perfectly casted 

// We should cast all relative timestamp/date/string comparison into string 
comparisons
  // This behaves as a user would expect because timestamp strings sort 
lexicographically.
  // i.e. TimeStamp(2013-01-01 00:00 ...) < "2014" = true
  // For cases where its a exact cast we should cast String type to 
timestamp time 
  // This would speed up the execution
  // i.e TimeStamp(2013-01-01 00:00T ...) < '2017-01-02 19:53:51' would 
translate to 
  // TimeStamp(2013-01-01 00:00T ...) < Timestamp(2017-01-02 19:53:51) 
would translate to
  case p @ BinaryComparison(left @ Literal(_, StringType), right @ 
DateType())
 if (acceptedDataTypes.contains( right) && 
perfectCast( left, right.dataType ) ) => 
 p.makeCopy( Array( Cast( left, right.dataType), right ))
  case p @ BinaryComparison(left @ StringType(), right @ DateType())
if acceptedDataTypes.contains( right) =>
p.makeCopy(Array(left, Cast(right, StringType)))
  case p @ BinaryComparison(left @ DateType(), right @ Literal(_, 
StringType)) 
if (acceptedDataTypes.contains( left) &&
   perfectCast( right, left.dataType )) =>
 p.makeCopy( Array( left, Cast( right, left.dataType) ))
  case p @ BinaryComparison(left @ DateType(), right @ StringType()) 
if acceptedDataTypes.contains( left) =>
 p.makeCopy(Array(Cast(left, StringType), right))

> Timestamp to String casting is slowing the query significantly
> --
>
> Key: SPARK-19145
> URL: https://issues.apache.org/jira/browse/SPARK-19145
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.0
>Reporter: gagan taneja
>
> i have a time series table with timestamp column 
> Following query
> SELECT COUNT(*) AS `count`
>FROM `default`.`table`
>WHERE `time` >= '2017-01-02 19:53:51'
> AND `time` <= '2017-01-09 19:53:51' LIMIT 5
> is significantly SLOWER than 
> SELECT COUNT(*) AS `count`
> FROM `default`.`table`
> WHERE `time` >= to_utc_timestamp('2017-01-02 19:53:51','-MM-DD 
> HH24:MI:SS−0800')
>   AND `time` <= to_utc_timestamp('2017-01-09 19:53:51','-MM-DD 
> HH24:MI:SS−0800') LIMIT 5
> After investigation i found that in the first query time colum is cast to 
> String before applying the filter 
> However in the second query no such casting is performed and its a filter 
> with long value 
> Below are the generate Physical plan for slower execution followed by 
> physical plan for faster execution 
> SELECT COUNT(*) AS `count`
>FROM `default`.`table`
>WHERE `time` >= '2017-01-02 19:53:51'
> AND `time` <= '2017-01-09 19:53:51' LIMIT 5
> == Physical Plan ==
> CollectLimit 5
> +- *HashAggregate(keys=[], functions=[count(1)], output=[count#3290L])
>+- Exchange SinglePartition
>   +- *HashAggregate(keys=[], functions=[partial_count(1)], 
> output=[count#3339L])
>  +- *Project
> +- *Filter ((isnotnull(time#3314) && (cast(time#3314 as string) 
> >= 2017-01-02 19:53:51)) && (cast(time#3314 as string) <= 2017-01-09 
> 19:53:51))
>+- *FileScan parquet default.cstat[time#3314] Batched: true, 
> Format: Parquet, Location: 
> InMemoryFileIndex[hdfs://10.65.55.220/user/spark/spark-warehouse/cstat], 
> PartitionFilters: [], PushedFilters: [IsNotNull(time)], ReadSchema: 
> struct
> SELECT COUNT(*) AS `count`
> FROM `default`.`table`
> WHERE `time` >= to_utc_timestamp('2017-01-02 19:53:51','-MM-DD 
> HH24:MI:SS−0800')
>   AND `time` <= to_utc_timestamp('2017-01-09 19:53:51','-MM-DD 
> HH24:MI:SS−0800') LIMIT 5
> == Physical Plan ==
> CollectLimit 5
> +- *HashAggregate(keys=[], functions=[count(1)], output=[count#3238L])
>+- Exchange SinglePartition
>   +- *HashAggregate(keys=[], functions=[partial_count(1)], 
> output=[count#3287L])
>  +- *Project
> +- *Filter ((isnotnull(time#3262) && (time#3262 >= 
> 148340483100)) && (time#3262 <= 148400963100))
>+- *FileScan parquet default.cstat[time#3262] Batched: true, 
> Format: Parquet, Location: 
> 

[jira] [Commented] (SPARK-19145) Timestamp to String casting is slowing the query significantly

2017-03-04 Thread gagan taneja (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19145?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15896046#comment-15896046
 ] 

gagan taneja commented on SPARK-19145:
--

17/03/04 19:05:32 TRACE HiveSessionState$$anon$1: 
=== Applying Rule 
org.apache.spark.sql.catalyst.analysis.TypeCoercion$PromoteStrings ===

Before
'Project [*]

!+- 'Filter (time#88 >= 2017-01-02 19:53:51)
 
+- SubqueryAlias person_view, `rule_test`.`person_view` 

   +- Project [gen_attr_0#83 AS name#86, gen_attr_1#84 AS age#87, 
gen_attr_2#85 AS time#88]   
  +- SubqueryAlias person_table 
   
 +- Project [gen_attr_0#83, gen_attr_1#84, gen_attr_2#85]   
 
+- Filter (gen_attr_1#84 > 10)  

   +- SubqueryAlias gen_subquery_0  
 
  +- Project [name#89 AS gen_attr_0#83, age#90 AS 
gen_attr_1#84, time#91 AS gen_attr_2#85]
 +- MetastoreRelation rule_test, person_table   
 

After
'Project [*]

+- Filter (cast(time#88 as string) >= 2017-01-02 19:53:51)
+- SubqueryAlias person_view, `rule_test`.`person_view`
+- Project [gen_attr_0#83 AS name#86, gen_attr_1#84 AS age#87, 
gen_attr_2#85 AS time#88]
+- SubqueryAlias person_table
  +- Project [gen_attr_0#83, gen_attr_1#84, gen_attr_2#85]
  +- Filter (gen_attr_1#84 > 10)
  +- SubqueryAlias gen_subquery
  +- Project [name#89 AS gen_attr_0#83, age#90 AS 
gen_attr_1#84, time#91 AS gen_attr_2#85]
  +- MetastoreRelation rule_test, person_table

> Timestamp to String casting is slowing the query significantly
> --
>
> Key: SPARK-19145
> URL: https://issues.apache.org/jira/browse/SPARK-19145
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.0
>Reporter: gagan taneja
>
> i have a time series table with timestamp column 
> Following query
> SELECT COUNT(*) AS `count`
>FROM `default`.`table`
>WHERE `time` >= '2017-01-02 19:53:51'
> AND `time` <= '2017-01-09 19:53:51' LIMIT 5
> is significantly SLOWER than 
> SELECT COUNT(*) AS `count`
> FROM `default`.`table`
> WHERE `time` >= to_utc_timestamp('2017-01-02 19:53:51','-MM-DD 
> HH24:MI:SS−0800')
>   AND `time` <= to_utc_timestamp('2017-01-09 19:53:51','-MM-DD 
> HH24:MI:SS−0800') LIMIT 5
> After investigation i found that in the first query time colum is cast to 
> String before applying the filter 
> However in the second query no such casting is performed and its a filter 
> with long value 
> Below are the generate Physical plan for slower execution followed by 
> physical plan for faster execution 
> SELECT COUNT(*) AS `count`
>FROM `default`.`table`
>WHERE `time` >= '2017-01-02 19:53:51'
> AND `time` <= '2017-01-09 19:53:51' LIMIT 5
> == Physical Plan ==
> CollectLimit 5
> +- *HashAggregate(keys=[], functions=[count(1)], output=[count#3290L])
>+- Exchange SinglePartition
>   +- *HashAggregate(keys=[], functions=[partial_count(1)], 
> output=[count#3339L])
>  +- *Project
> +- *Filter ((isnotnull(time#3314) && (cast(time#3314 as string) 
> >= 2017-01-02 19:53:51)) && (cast(time#3314 as string) <= 2017-01-09 
> 19:53:51))
>+- *FileScan parquet default.cstat[time#3314] Batched: true, 
> Format: Parquet, Location: 
> InMemoryFileIndex[hdfs://10.65.55.220/user/spark/spark-warehouse/cstat], 
> PartitionFilters: [], PushedFilters: [IsNotNull(time)], ReadSchema: 
> struct
> SELECT COUNT(*) AS `count`
> FROM `default`.`table`
> WHERE `time` >= to_utc_timestamp('2017-01-02 19:53:51','-MM-DD 
> HH24:MI:SS−0800')
>   AND `time` <= to_utc_timestamp('2017-01-09 19:53:51','-MM-DD 
> HH24:MI:SS−0800') LIMIT 5
> == Physical Plan ==
> CollectLimit 5
> +- *HashAggregate(keys=[], functions=[count(1)], output=[count#3238L])
>+- Exchange SinglePartition
>   +- *HashAggregate(keys=[], functions=[partial_count(1)], 
> output=[count#3287L])
>  +- *Project
> +- *Filter 

[jira] [Commented] (SPARK-19145) Timestamp to String casting is slowing the query significantly

2017-03-04 Thread gagan taneja (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19145?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15896040#comment-15896040
 ] 

gagan taneja commented on SPARK-19145:
--

Code responsible for this belongs to class amd the code below is responsible to 
type coercion. Although this is logically correct its also slowing down binary 
comparison because during execution Interval types are casted to String and the 
comparision is done on string operator resulting in 10x slower performance

org.apache.spark.sql.catalyst.analysis.TypeCoercion {
.

// We should cast all relative timestamp/date/string comparison into string 
comparisons
  // This behaves as a user would expect because timestamp strings sort 
lexicographically.
  // i.e. TimeStamp(2013-01-01 00:00 ...) < "2014" = true
  case p @ BinaryComparison(left @ StringType(), right @ DateType()) =>
p.makeCopy(Array(left, Cast(right, StringType)))
  case p @ BinaryComparison(left @ DateType(), right @ StringType()) =>
p.makeCopy(Array(Cast(left, StringType), right))
  case p @ BinaryComparison(left @ StringType(), right @ TimestampType()) =>
p.makeCopy(Array(left, Cast(right, StringType)))
  case p @ BinaryComparison(left @ TimestampType(), right @ StringType()) =>
p.makeCopy(Array(Cast(left, StringType), right))

  // Comparisons between dates and timestamps.
  case p @ BinaryComparison(left @ TimestampType(), right @ DateType()) =>
p.makeCopy(Array(Cast(left, StringType), Cast(right, StringType)))
  case p @ BinaryComparison(left @ DateType(), right @ TimestampType()) =>
p.makeCopy(Array(Cast(left, StringType), Cast(right, StringType)))


> Timestamp to String casting is slowing the query significantly
> --
>
> Key: SPARK-19145
> URL: https://issues.apache.org/jira/browse/SPARK-19145
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.0
>Reporter: gagan taneja
>
> i have a time series table with timestamp column 
> Following query
> SELECT COUNT(*) AS `count`
>FROM `default`.`table`
>WHERE `time` >= '2017-01-02 19:53:51'
> AND `time` <= '2017-01-09 19:53:51' LIMIT 5
> is significantly SLOWER than 
> SELECT COUNT(*) AS `count`
> FROM `default`.`table`
> WHERE `time` >= to_utc_timestamp('2017-01-02 19:53:51','-MM-DD 
> HH24:MI:SS−0800')
>   AND `time` <= to_utc_timestamp('2017-01-09 19:53:51','-MM-DD 
> HH24:MI:SS−0800') LIMIT 5
> After investigation i found that in the first query time colum is cast to 
> String before applying the filter 
> However in the second query no such casting is performed and its a filter 
> with long value 
> Below are the generate Physical plan for slower execution followed by 
> physical plan for faster execution 
> SELECT COUNT(*) AS `count`
>FROM `default`.`table`
>WHERE `time` >= '2017-01-02 19:53:51'
> AND `time` <= '2017-01-09 19:53:51' LIMIT 5
> == Physical Plan ==
> CollectLimit 5
> +- *HashAggregate(keys=[], functions=[count(1)], output=[count#3290L])
>+- Exchange SinglePartition
>   +- *HashAggregate(keys=[], functions=[partial_count(1)], 
> output=[count#3339L])
>  +- *Project
> +- *Filter ((isnotnull(time#3314) && (cast(time#3314 as string) 
> >= 2017-01-02 19:53:51)) && (cast(time#3314 as string) <= 2017-01-09 
> 19:53:51))
>+- *FileScan parquet default.cstat[time#3314] Batched: true, 
> Format: Parquet, Location: 
> InMemoryFileIndex[hdfs://10.65.55.220/user/spark/spark-warehouse/cstat], 
> PartitionFilters: [], PushedFilters: [IsNotNull(time)], ReadSchema: 
> struct
> SELECT COUNT(*) AS `count`
> FROM `default`.`table`
> WHERE `time` >= to_utc_timestamp('2017-01-02 19:53:51','-MM-DD 
> HH24:MI:SS−0800')
>   AND `time` <= to_utc_timestamp('2017-01-09 19:53:51','-MM-DD 
> HH24:MI:SS−0800') LIMIT 5
> == Physical Plan ==
> CollectLimit 5
> +- *HashAggregate(keys=[], functions=[count(1)], output=[count#3238L])
>+- Exchange SinglePartition
>   +- *HashAggregate(keys=[], functions=[partial_count(1)], 
> output=[count#3287L])
>  +- *Project
> +- *Filter ((isnotnull(time#3262) && (time#3262 >= 
> 148340483100)) && (time#3262 <= 148400963100))
>+- *FileScan parquet default.cstat[time#3262] Batched: true, 
> Format: Parquet, Location: 
> InMemoryFileIndex[hdfs://10.65.55.220/user/spark/spark-warehouse/cstat], 
> PartitionFilters: [], PushedFilters: [IsNotNull(time), 
> GreaterThanOrEqual(time,2017-01-02 19:53:51.0), 
> LessThanOrEqual(time,2017-01-09..., ReadSchema: struct
> In Impala both query run efficiently without and performance difference
> Spark should be able to parse the Date string and convert to 

[jira] [Commented] (SPARK-19541) High Availability support for ThriftServer

2017-03-04 Thread gagan taneja (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15896032#comment-15896032
 ] 

gagan taneja commented on SPARK-19541:
--

This would a great improvement as we are also leveraging Thriftserver for 
JDBC/ODBC connectivity


> High Availability support for ThriftServer
> --
>
> Key: SPARK-19541
> URL: https://issues.apache.org/jira/browse/SPARK-19541
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.0.2, 2.1.0
>Reporter: LvDongrong
>
> Currently, We use the spark ThriftServer frequently, and there are many 
> connects between the client and only ThriftServer.When the ThriftServer is 
> down ,we cannot get the service again until the server is restarted .So we 
> need to consider the ThriftServer HA as well as HiveServer HA. For 
> ThriftServer, we want to import the pattern of HiveServer HA to provide 
> ThriftServer HA. Therefore, we need to start multiple thrift server which 
> register on the zookeeper. Then the client  can find the thrift server by 
> just connecting to the zookeeper.So the beeline can get the service from 
> other thrift server when one is down.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-19705) Preferred location supporting HDFS Cache for FileScanRDD

2017-03-04 Thread gagan taneja (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19705?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

gagan taneja updated SPARK-19705:
-
Shepherd: Herman van Hovell

> Preferred location supporting HDFS Cache for FileScanRDD
> 
>
> Key: SPARK-19705
> URL: https://issues.apache.org/jira/browse/SPARK-19705
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.0
>Reporter: gagan taneja
>
> Although NewHadoopRDD and HadoopRdd considers HDFS cache while calculating 
> preferredLocations, FileScanRDD do not take into account HDFS cache while 
> calculating preferredLocations
> The enhancement can be easily implemented for large files where FilePartition 
> only contains single HDFS file
> The enhancement will also result in significant performance improvement for 
> cached hdfs partitions



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19705) Preferred location supporting HDFS Cache for FileScanRDD

2017-03-04 Thread gagan taneja (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15896026#comment-15896026
 ] 

gagan taneja commented on SPARK-19705:
--

Herman,
Can you help me with this enhancement
thanks

> Preferred location supporting HDFS Cache for FileScanRDD
> 
>
> Key: SPARK-19705
> URL: https://issues.apache.org/jira/browse/SPARK-19705
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.0
>Reporter: gagan taneja
>
> Although NewHadoopRDD and HadoopRdd considers HDFS cache while calculating 
> preferredLocations, FileScanRDD do not take into account HDFS cache while 
> calculating preferredLocations
> The enhancement can be easily implemented for large files where FilePartition 
> only contains single HDFS file
> The enhancement will also result in significant performance improvement for 
> cached hdfs partitions



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-19705) Preferred location supporting HDFS Cache for FileScanRDD

2017-02-22 Thread gagan taneja (JIRA)
gagan taneja created SPARK-19705:


 Summary: Preferred location supporting HDFS Cache for FileScanRDD
 Key: SPARK-19705
 URL: https://issues.apache.org/jira/browse/SPARK-19705
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.1.0
Reporter: gagan taneja


Although NewHadoopRDD and HadoopRdd considers HDFS cache while calculating 
preferredLocations, FileScanRDD do not take into account HDFS cache while 
calculating preferredLocations
The enhancement can be easily implemented for large files where FilePartition 
only contains single HDFS file
The enhancement will also result in significant performance improvement for 
cached hdfs partitions



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-13219) Pushdown predicate propagation in SparkSQL with join

2017-02-17 Thread gagan taneja (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-13219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15872811#comment-15872811
 ] 

gagan taneja commented on SPARK-13219:
--

This is what we are looking for 
For example 
Table Address is partitioned based on postal_code 
Table Location which contain location_name and potal_code 
Query SELECT * FROM address JOIN location ON postal_code WHERE location_name = 
'San Jose'
If the query is re-written as a in clause the optimizer will be able to prune 
the partitions which would be significantly faster

> Pushdown predicate propagation in SparkSQL with join
> 
>
> Key: SPARK-13219
> URL: https://issues.apache.org/jira/browse/SPARK-13219
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 1.4.1, 1.5.2, 1.6.0
> Environment: Spark 1.4
> Datastax Spark connector 1.4
> Cassandra. 2.1.12
> Centos 6.6
>Reporter: Abhinav Chawade
>
> When 2 or more tables are joined in SparkSQL and there is an equality clause 
> in query on attributes used to perform the join, it is useful to apply that 
> clause on scans for both table. If this is not done, one of the tables 
> results in full scan which can reduce the query dramatically. Consider 
> following example with 2 tables being joined.
> {code}
> CREATE TABLE assets (
> assetid int PRIMARY KEY,
> address text,
> propertyname text
> )
> CREATE TABLE tenants (
> assetid int PRIMARY KEY,
> name text
> )
> spark-sql> explain select t.name from tenants t, assets a where a.assetid = 
> t.assetid and t.assetid='1201';
> WARN  2016-02-05 23:05:19 org.apache.hadoop.util.NativeCodeLoader: Unable to 
> load native-hadoop library for your platform... using builtin-java classes 
> where applicable
> == Physical Plan ==
> Project [name#14]
>  ShuffledHashJoin [assetid#13], [assetid#15], BuildRight
>   Exchange (HashPartitioning 200)
>Filter (CAST(assetid#13, DoubleType) = 1201.0)
> HiveTableScan [assetid#13,name#14], (MetastoreRelation element, tenants, 
> Some(t)), None
>   Exchange (HashPartitioning 200)
>HiveTableScan [assetid#15], (MetastoreRelation element, assets, Some(a)), 
> None
> Time taken: 1.354 seconds, Fetched 8 row(s)
> {code}
> The simple workaround is to add another equality condition for each table but 
> it becomes cumbersome. It will be helpful if the query planner could improve 
> filter propagation.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-13219) Pushdown predicate propagation in SparkSQL with join

2017-02-17 Thread gagan taneja (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-13219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15872809#comment-15872809
 ] 

gagan taneja commented on SPARK-13219:
--

Venu
This is very interesting i would like to look at the code for all the 
optimization that are in-place 
Do you have plans to contribute is back to spark

> Pushdown predicate propagation in SparkSQL with join
> 
>
> Key: SPARK-13219
> URL: https://issues.apache.org/jira/browse/SPARK-13219
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 1.4.1, 1.5.2, 1.6.0
> Environment: Spark 1.4
> Datastax Spark connector 1.4
> Cassandra. 2.1.12
> Centos 6.6
>Reporter: Abhinav Chawade
>
> When 2 or more tables are joined in SparkSQL and there is an equality clause 
> in query on attributes used to perform the join, it is useful to apply that 
> clause on scans for both table. If this is not done, one of the tables 
> results in full scan which can reduce the query dramatically. Consider 
> following example with 2 tables being joined.
> {code}
> CREATE TABLE assets (
> assetid int PRIMARY KEY,
> address text,
> propertyname text
> )
> CREATE TABLE tenants (
> assetid int PRIMARY KEY,
> name text
> )
> spark-sql> explain select t.name from tenants t, assets a where a.assetid = 
> t.assetid and t.assetid='1201';
> WARN  2016-02-05 23:05:19 org.apache.hadoop.util.NativeCodeLoader: Unable to 
> load native-hadoop library for your platform... using builtin-java classes 
> where applicable
> == Physical Plan ==
> Project [name#14]
>  ShuffledHashJoin [assetid#13], [assetid#15], BuildRight
>   Exchange (HashPartitioning 200)
>Filter (CAST(assetid#13, DoubleType) = 1201.0)
> HiveTableScan [assetid#13,name#14], (MetastoreRelation element, tenants, 
> Some(t)), None
>   Exchange (HashPartitioning 200)
>HiveTableScan [assetid#15], (MetastoreRelation element, assets, Some(a)), 
> None
> Time taken: 1.354 seconds, Fetched 8 row(s)
> {code}
> The simple workaround is to add another equality condition for each table but 
> it becomes cumbersome. It will be helpful if the query planner could improve 
> filter propagation.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19609) Broadcast joins should pushdown join constraints as Filter to the larger relation

2017-02-17 Thread gagan taneja (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15872797#comment-15872797
 ] 

gagan taneja commented on SPARK-19609:
--

This can be further extended to join on the column that are also partitioned 
column 
For example 
Table Address is partitioned based on postal_code 
Table Location which contain location_name and potal_code 
Query SELECT * FROM address JOIN location ON postal_code WHERE location_name = 
'San Jose'
If the query is re-written as a in clause the optimizer will be able to prune 
the partitions which would be significantly faster


> Broadcast joins should pushdown join constraints as Filter to the larger 
> relation
> -
>
> Key: SPARK-19609
> URL: https://issues.apache.org/jira/browse/SPARK-19609
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.1.0
>Reporter: Nick Dimiduk
>
> For broadcast inner-joins, where the smaller relation is known to be small 
> enough to materialize on a worker, the set of values for all join columns is 
> known and fits in memory. Spark should translate these values into a 
> {{Filter}} pushed down to the datasource. The common join condition of 
> equality, i.e. {{lhs.a == rhs.a}}, can be written as an {{a in ...}} clause. 
> An example of pushing such filters is already present in the form of 
> {{IsNotNull}} filters via [~sameerag]'s work on SPARK-12957 subtasks.
> This optimization could even work when the smaller relation does not fit 
> entirely in memory. This could be done by partitioning the smaller relation 
> into N pieces, applying this predicate pushdown for each piece, and unioning 
> the results.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19145) Timestamp to String casting is slowing the query significantly

2017-01-09 Thread gagan taneja (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19145?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15814165#comment-15814165
 ] 

gagan taneja commented on SPARK-19145:
--

i should be able to work on a proposal for the fix

> Timestamp to String casting is slowing the query significantly
> --
>
> Key: SPARK-19145
> URL: https://issues.apache.org/jira/browse/SPARK-19145
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.0
>Reporter: gagan taneja
>
> i have a time series table with timestamp column 
> Following query
> SELECT COUNT(*) AS `count`
>FROM `default`.`table`
>WHERE `time` >= '2017-01-02 19:53:51'
> AND `time` <= '2017-01-09 19:53:51' LIMIT 5
> is significantly SLOWER than 
> SELECT COUNT(*) AS `count`
> FROM `default`.`table`
> WHERE `time` >= to_utc_timestamp('2017-01-02 19:53:51','-MM-DD 
> HH24:MI:SS−0800')
>   AND `time` <= to_utc_timestamp('2017-01-09 19:53:51','-MM-DD 
> HH24:MI:SS−0800') LIMIT 5
> After investigation i found that in the first query time colum is cast to 
> String before applying the filter 
> However in the second query no such casting is performed and its a filter 
> with long value 
> Below are the generate Physical plan for slower execution followed by 
> physical plan for faster execution 
> SELECT COUNT(*) AS `count`
>FROM `default`.`table`
>WHERE `time` >= '2017-01-02 19:53:51'
> AND `time` <= '2017-01-09 19:53:51' LIMIT 5
> == Physical Plan ==
> CollectLimit 5
> +- *HashAggregate(keys=[], functions=[count(1)], output=[count#3290L])
>+- Exchange SinglePartition
>   +- *HashAggregate(keys=[], functions=[partial_count(1)], 
> output=[count#3339L])
>  +- *Project
> +- *Filter ((isnotnull(time#3314) && (cast(time#3314 as string) 
> >= 2017-01-02 19:53:51)) && (cast(time#3314 as string) <= 2017-01-09 
> 19:53:51))
>+- *FileScan parquet default.cstat[time#3314] Batched: true, 
> Format: Parquet, Location: 
> InMemoryFileIndex[hdfs://10.65.55.220/user/spark/spark-warehouse/cstat], 
> PartitionFilters: [], PushedFilters: [IsNotNull(time)], ReadSchema: 
> struct
> SELECT COUNT(*) AS `count`
> FROM `default`.`table`
> WHERE `time` >= to_utc_timestamp('2017-01-02 19:53:51','-MM-DD 
> HH24:MI:SS−0800')
>   AND `time` <= to_utc_timestamp('2017-01-09 19:53:51','-MM-DD 
> HH24:MI:SS−0800') LIMIT 5
> == Physical Plan ==
> CollectLimit 5
> +- *HashAggregate(keys=[], functions=[count(1)], output=[count#3238L])
>+- Exchange SinglePartition
>   +- *HashAggregate(keys=[], functions=[partial_count(1)], 
> output=[count#3287L])
>  +- *Project
> +- *Filter ((isnotnull(time#3262) && (time#3262 >= 
> 148340483100)) && (time#3262 <= 148400963100))
>+- *FileScan parquet default.cstat[time#3262] Batched: true, 
> Format: Parquet, Location: 
> InMemoryFileIndex[hdfs://10.65.55.220/user/spark/spark-warehouse/cstat], 
> PartitionFilters: [], PushedFilters: [IsNotNull(time), 
> GreaterThanOrEqual(time,2017-01-02 19:53:51.0), 
> LessThanOrEqual(time,2017-01-09..., ReadSchema: struct
> In Impala both query run efficiently without and performance difference
> Spark should be able to parse the Date string and convert to Long/Timestamp 
> during generation of Optimized Logical Plan so that both the query would have 
> similar performance



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-19145) Timestamp to String casting is slowing the query significantly

2017-01-09 Thread gagan taneja (JIRA)
gagan taneja created SPARK-19145:


 Summary: Timestamp to String casting is slowing the query 
significantly
 Key: SPARK-19145
 URL: https://issues.apache.org/jira/browse/SPARK-19145
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.1.0
Reporter: gagan taneja


i have a time series table with timestamp column 

Following query
SELECT COUNT(*) AS `count`
   FROM `default`.`table`
   WHERE `time` >= '2017-01-02 19:53:51'
AND `time` <= '2017-01-09 19:53:51' LIMIT 5

is significantly SLOWER than 

SELECT COUNT(*) AS `count`
FROM `default`.`table`
WHERE `time` >= to_utc_timestamp('2017-01-02 19:53:51','-MM-DD 
HH24:MI:SS−0800')
  AND `time` <= to_utc_timestamp('2017-01-09 19:53:51','-MM-DD 
HH24:MI:SS−0800') LIMIT 5


After investigation i found that in the first query time colum is cast to 
String before applying the filter 
However in the second query no such casting is performed and its a filter with 
long value 

Below are the generate Physical plan for slower execution followed by physical 
plan for faster execution 

SELECT COUNT(*) AS `count`
   FROM `default`.`table`
   WHERE `time` >= '2017-01-02 19:53:51'
AND `time` <= '2017-01-09 19:53:51' LIMIT 5

== Physical Plan ==
CollectLimit 5
+- *HashAggregate(keys=[], functions=[count(1)], output=[count#3290L])
   +- Exchange SinglePartition
  +- *HashAggregate(keys=[], functions=[partial_count(1)], 
output=[count#3339L])
 +- *Project
+- *Filter ((isnotnull(time#3314) && (cast(time#3314 as string) >= 
2017-01-02 19:53:51)) && (cast(time#3314 as string) <= 2017-01-09 19:53:51))
   +- *FileScan parquet default.cstat[time#3314] Batched: true, 
Format: Parquet, Location: 
InMemoryFileIndex[hdfs://10.65.55.220/user/spark/spark-warehouse/cstat], 
PartitionFilters: [], PushedFilters: [IsNotNull(time)], ReadSchema: 
struct

SELECT COUNT(*) AS `count`
FROM `default`.`table`
WHERE `time` >= to_utc_timestamp('2017-01-02 19:53:51','-MM-DD 
HH24:MI:SS−0800')
  AND `time` <= to_utc_timestamp('2017-01-09 19:53:51','-MM-DD 
HH24:MI:SS−0800') LIMIT 5

== Physical Plan ==
CollectLimit 5
+- *HashAggregate(keys=[], functions=[count(1)], output=[count#3238L])
   +- Exchange SinglePartition
  +- *HashAggregate(keys=[], functions=[partial_count(1)], 
output=[count#3287L])
 +- *Project
+- *Filter ((isnotnull(time#3262) && (time#3262 >= 
148340483100)) && (time#3262 <= 148400963100))
   +- *FileScan parquet default.cstat[time#3262] Batched: true, 
Format: Parquet, Location: 
InMemoryFileIndex[hdfs://10.65.55.220/user/spark/spark-warehouse/cstat], 
PartitionFilters: [], PushedFilters: [IsNotNull(time), 
GreaterThanOrEqual(time,2017-01-02 19:53:51.0), 
LessThanOrEqual(time,2017-01-09..., ReadSchema: struct

In Impala both query run efficiently without and performance difference
Spark should be able to parse the Date string and convert to Long/Timestamp 
during generation of Optimized Logical Plan so that both the query would have 
similar performance




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19118) Percentile support for frequency distribution table

2017-01-07 Thread gagan taneja (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15808173#comment-15808173
 ] 

gagan taneja commented on SPARK-19118:
--

I had filed original JIRA to provide the functionality for both Percentile and 
Approx Percentile
After review the code i realized that both the function implementations very 
different and it would be best to divide them into two sub tasks 
I have created one sub task for adding support in Percentile function 
For implementing the functionality in Approx Percentile i would need to work 
with Developer to find the right approach and make the code changes. This is 
tracked with subtask https://issues.apache.org/jira/browse/SPARK-19119

> Percentile support for frequency distribution table
> ---
>
> Key: SPARK-19118
> URL: https://issues.apache.org/jira/browse/SPARK-19118
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 2.0.2
>Reporter: gagan taneja
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-19119) Approximate percentile support for frequency distribution table

2017-01-07 Thread gagan taneja (JIRA)
gagan taneja created SPARK-19119:


 Summary: Approximate percentile support for frequency distribution 
table
 Key: SPARK-19119
 URL: https://issues.apache.org/jira/browse/SPARK-19119
 Project: Spark
  Issue Type: Sub-task
  Components: SQL
Affects Versions: 2.0.2
Reporter: gagan taneja






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-19118) Percentile support for frequency distribution table

2017-01-07 Thread gagan taneja (JIRA)
gagan taneja created SPARK-19118:


 Summary: Percentile support for frequency distribution table
 Key: SPARK-19118
 URL: https://issues.apache.org/jira/browse/SPARK-19118
 Project: Spark
  Issue Type: Sub-task
  Components: SQL
Affects Versions: 2.0.2
Reporter: gagan taneja






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18940) Percentile and approximate percentile support for frequency distribution table

2016-12-20 Thread gagan taneja (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15764702#comment-15764702
 ] 

gagan taneja commented on SPARK-18940:
--

i am working on the fix. Should be able to make the changes and send it for 
review soon

> Percentile and approximate percentile support for frequency distribution table
> --
>
> Key: SPARK-18940
> URL: https://issues.apache.org/jira/browse/SPARK-18940
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.0.2
>Reporter: gagan taneja
>
> I have a frequency distribution table with following entries 
> {noformat}
> Age,No of person 
> 21, 10
> 22, 15
> 23, 18 
> ..
> ..
> 30, 14
> {noformat}
> Moreover it is common to have data in frequency distribution format to 
> further calculate Percentile, Median. With current implementation
> It would be very difficult and complex to find the percentile.
> Therefore i am proposing enhancement to current Percentile and Approx 
> Percentile implementation to take frequency distribution column into 
> consideration 
> Current Percentile definition 
> {noformat}
> percentile(col, array(percentage1 [, percentage2]...))
> case class Percentile(
>   child: Expression,
>   percentageExpression: Expression,
>   mutableAggBufferOffset: Int = 0,
>   inputAggBufferOffset: Int = 0) {
>def this(child: Expression, percentageExpression: Expression) = {
> this(child, percentageExpression, 0, 0)
>   }
> }
> {noformat}
> Proposed changes 
> {noformat}
> percentile(col, [frequency], array(percentage1 [, percentage2]...))
> case class Percentile(
>   child: Expression,
>   frequency : Expression,
>   percentageExpression: Expression,
>   mutableAggBufferOffset: Int = 0,
>   inputAggBufferOffset: Int = 0) {
>def this(child: Expression, percentageExpression: Expression) = {
> this(child, Literal(1L), percentageExpression, 0, 0)
>   }
>   def this(child: Expression, frequency : Expression, percentageExpression: 
> Expression) = {
> this(child, frequency, percentageExpression, 0, 0)
>   }
> }
> {noformat}
> Although this definition will differ from hive implementation, it will be 
> useful functionality to many spark user.
> Moreover the changes are local to only Percentile and ApproxPercentile 
> implementation



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-18940) Percentile and approximate percentile support for frequency distribution table

2016-12-20 Thread gagan taneja (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18940?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

gagan taneja updated SPARK-18940:
-
Shepherd: Herman van Hovell

> Percentile and approximate percentile support for frequency distribution table
> --
>
> Key: SPARK-18940
> URL: https://issues.apache.org/jira/browse/SPARK-18940
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.0.2
>Reporter: gagan taneja
>
> I have a frequency distribution table with following entries 
> {noformat}
> Age,No of person 
> 21, 10
> 22, 15
> 23, 18 
> ..
> ..
> 30, 14
> {noformat}
> Moreover it is common to have data in frequency distribution format to 
> further calculate Percentile, Median. With current implementation
> It would be very difficult and complex to find the percentile.
> Therefore i am proposing enhancement to current Percentile and Approx 
> Percentile implementation to take frequency distribution column into 
> consideration 
> Current Percentile definition 
> {noformat}
> percentile(col, array(percentage1 [, percentage2]...))
> case class Percentile(
>   child: Expression,
>   percentageExpression: Expression,
>   mutableAggBufferOffset: Int = 0,
>   inputAggBufferOffset: Int = 0) {
>def this(child: Expression, percentageExpression: Expression) = {
> this(child, percentageExpression, 0, 0)
>   }
> }
> {noformat}
> Proposed changes 
> {noformat}
> percentile(col, [frequency], array(percentage1 [, percentage2]...))
> case class Percentile(
>   child: Expression,
>   frequency : Expression,
>   percentageExpression: Expression,
>   mutableAggBufferOffset: Int = 0,
>   inputAggBufferOffset: Int = 0) {
>def this(child: Expression, percentageExpression: Expression) = {
> this(child, Literal(1L), percentageExpression, 0, 0)
>   }
>   def this(child: Expression, frequency : Expression, percentageExpression: 
> Expression) = {
> this(child, frequency, percentageExpression, 0, 0)
>   }
> }
> {noformat}
> Although this definition will differ from hive implementation, it will be 
> useful functionality to many spark user.
> Moreover the changes are local to only Percentile and ApproxPercentile 
> implementation



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-18940) Percentile and approximate percentile support for frequency distribution table

2016-12-20 Thread gagan taneja (JIRA)
gagan taneja created SPARK-18940:


 Summary: Percentile and approximate percentile support for 
frequency distribution table
 Key: SPARK-18940
 URL: https://issues.apache.org/jira/browse/SPARK-18940
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.0.2
Reporter: gagan taneja


I have a frequency distribution table with following entries 
Age,No of person 
21, 10
22, 15
23, 18 
..
..
30, 14

Moreover it is common to have data in frequency distribution format to further 
calculate Percentile, Median. With current implementation
It would be very difficult and complex to find the percentile.
Therefore i am proposing enhancement to current Percentile and Approx 
Percentile implementation to take frequency distribution column into 
consideration 
Current Percentile definition 

percentile(col, array(percentage1 [, percentage2]...))
case class Percentile(
  child: Expression,
  percentageExpression: Expression,
  mutableAggBufferOffset: Int = 0,
  inputAggBufferOffset: Int = 0) {
   def this(child: Expression, percentageExpression: Expression) = {
this(child, percentageExpression, 0, 0)
  }
}

Proposed changes 

percentile(col, [frequency], array(percentage1 [, percentage2]...))
case class Percentile(
  child: Expression,
  frequency : Expression,
  percentageExpression: Expression,
  mutableAggBufferOffset: Int = 0,
  inputAggBufferOffset: Int = 0) {
   def this(child: Expression, percentageExpression: Expression) = {
this(child, Literal(1L), percentageExpression, 0, 0)
  }
  def this(child: Expression, frequency : Expression, percentageExpression: 
Expression) = {
this(child, frequency, percentageExpression, 0, 0)
  }
}

Although this definition will differ from hive implementation, it will be 
useful functionality to many spark user.
Moreover the changes are local to only Percentile and ApproxPercentile 
implementation




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-15345) SparkSession's conf doesn't take effect when there's already an existing SparkContext

2016-08-09 Thread gagan taneja (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15414544#comment-15414544
 ] 

gagan taneja commented on SPARK-15345:
--

i was annoyed by this bug but fortunately found the workaround 
it will work fine if you start the server with 
export HADOOP_CONF_DIR=/etc/hadoop/conf:/etc/hive/conf
bin/spark-shell

It would give trouble if HIVE_CONF_DIR is as below 
export HADOOP_CONF_DIR=/etc/hadoop/conf
export HIVE_CONF_DIR=/etc/hive/conf
bin/spark-shell


> SparkSession's conf doesn't take effect when there's already an existing 
> SparkContext
> -
>
> Key: SPARK-15345
> URL: https://issues.apache.org/jira/browse/SPARK-15345
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark, SQL
>Reporter: Piotr Milanowski
>Assignee: Reynold Xin
>Priority: Blocker
> Fix For: 2.0.0
>
>
> I am working with branch-2.0, spark is compiled with hive support (-Phive and 
> -Phvie-thriftserver).
> I am trying to access databases using this snippet:
> {code}
> from pyspark.sql import HiveContext
> hc = HiveContext(sc)
> hc.sql("show databases").collect()
> [Row(result='default')]
> {code}
> This means that spark doesn't find any databases specified in configuration.
> Using the same configuration (i.e. hive-site.xml and core-site.xml) in spark 
> 1.6, and launching above snippet, I can print out existing databases.
> When run in DEBUG mode this is what spark (2.0) prints out:
> {code}
> 16/05/16 12:17:47 INFO SparkSqlParser: Parsing command: show databases
> 16/05/16 12:17:47 DEBUG SimpleAnalyzer: 
> === Result of Batch Resolution ===
> !'Project [unresolveddeserializer(createexternalrow(if (isnull(input[0, 
> string])) null else input[0, string].toString, 
> StructField(result,StringType,false)), result#2) AS #3]   Project 
> [createexternalrow(if (isnull(result#2)) null else result#2.toString, 
> StructField(result,StringType,false)) AS #3]
>  +- LocalRelation [result#2]  
>   
>  +- LocalRelation [result#2]
> 
> 16/05/16 12:17:47 DEBUG ClosureCleaner: +++ Cleaning closure  
> (org.apache.spark.sql.Dataset$$anonfun$53) +++
> 16/05/16 12:17:47 DEBUG ClosureCleaner:  + declared fields: 2
> 16/05/16 12:17:47 DEBUG ClosureCleaner:  public static final long 
> org.apache.spark.sql.Dataset$$anonfun$53.serialVersionUID
> 16/05/16 12:17:47 DEBUG ClosureCleaner:  private final 
> org.apache.spark.sql.types.StructType 
> org.apache.spark.sql.Dataset$$anonfun$53.structType$1
> 16/05/16 12:17:47 DEBUG ClosureCleaner:  + declared methods: 2
> 16/05/16 12:17:47 DEBUG ClosureCleaner:  public final java.lang.Object 
> org.apache.spark.sql.Dataset$$anonfun$53.apply(java.lang.Object)
> 16/05/16 12:17:47 DEBUG ClosureCleaner:  public final java.lang.Object 
> org.apache.spark.sql.Dataset$$anonfun$53.apply(org.apache.spark.sql.catalyst.InternalRow)
> 16/05/16 12:17:47 DEBUG ClosureCleaner:  + inner classes: 0
> 16/05/16 12:17:47 DEBUG ClosureCleaner:  + outer classes: 0
> 16/05/16 12:17:47 DEBUG ClosureCleaner:  + outer objects: 0
> 16/05/16 12:17:47 DEBUG ClosureCleaner:  + populating accessed fields because 
> this is the starting closure
> 16/05/16 12:17:47 DEBUG ClosureCleaner:  + fields accessed by starting 
> closure: 0
> 16/05/16 12:17:47 DEBUG ClosureCleaner:  + there are no enclosing objects!
> 16/05/16 12:17:47 DEBUG ClosureCleaner:  +++ closure  
> (org.apache.spark.sql.Dataset$$anonfun$53) is now cleaned +++
> 16/05/16 12:17:47 DEBUG ClosureCleaner: +++ Cleaning closure  
> (org.apache.spark.sql.execution.python.EvaluatePython$$anonfun$javaToPython$1)
>  +++
> 16/05/16 12:17:47 DEBUG ClosureCleaner:  + declared fields: 1
> 16/05/16 12:17:47 DEBUG ClosureCleaner:  public static final long 
> org.apache.spark.sql.execution.python.EvaluatePython$$anonfun$javaToPython$1.serialVersionUID
> 16/05/16 12:17:47 DEBUG ClosureCleaner:  + declared methods: 2
> 16/05/16 12:17:47 DEBUG ClosureCleaner:  public final java.lang.Object 
> org.apache.spark.sql.execution.python.EvaluatePython$$anonfun$javaToPython$1.apply(java.lang.Object)
> 16/05/16 12:17:47 DEBUG ClosureCleaner:  public final 
> org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler 
> org.apache.spark.sql.execution.python.EvaluatePython$$anonfun$javaToPython$1.apply(scala.collection.Iterator)
> 16/05/16 12:17:47 DEBUG ClosureCleaner:  + inner classes: 0
> 16/05/16 12:17:47 DEBUG ClosureCleaner:  + outer classes: 0
> 16/05/16 12:17:47 DEBUG ClosureCleaner:  + outer objects: 0
> 16/05/16 12:17:47 DEBUG ClosureCleaner:  + populating accessed fields because 
> this is the starting closure
> 16/05/16 12:17:47 

[jira] [Created] (SPARK-5292) optimize join for table that are already sharded/support for hive bucket

2015-01-16 Thread gagan taneja (JIRA)
gagan taneja created SPARK-5292:
---

 Summary: optimize join for table that are already sharded/support 
for hive bucket
 Key: SPARK-5292
 URL: https://issues.apache.org/jira/browse/SPARK-5292
 Project: Spark
  Issue Type: New Feature
  Components: SQL
Affects Versions: 1.2.0
Reporter: gagan taneja


Currently join do not consider the locality of the data and perform the shuffle 
anyway
If the user takes the responsilbity of distributing the data based on some hash 
or shared the data, spark join should be able to leverage sharding to optimize 
join calculation/eliminate shuffle



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org