[jira] [Commented] (SPARK-17299) TRIM/LTRIM/RTRIM strips characters other than spaces
[ https://issues.apache.org/jira/browse/SPARK-17299?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15451914#comment-15451914 ] Cheng Hao commented on SPARK-17299: --- Or come after SPARK-14878 ? > TRIM/LTRIM/RTRIM strips characters other than spaces > > > Key: SPARK-17299 > URL: https://issues.apache.org/jira/browse/SPARK-17299 > Project: Spark > Issue Type: Bug > Components: Documentation, SQL >Affects Versions: 2.0.0 >Reporter: Jeremy Beard >Priority: Minor > > TRIM/LTRIM/RTRIM docs state that they only strip spaces: > http://spark.apache.org/docs/2.0.0/api/java/org/apache/spark/sql/functions.html#trim(org.apache.spark.sql.Column) > But the implementation strips all characters of ASCII value 20 or less: > https://github.com/apache/spark/blob/v2.0.0/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java#L468-L470 -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17299) TRIM/LTRIM/RTRIM strips characters other than spaces
[ https://issues.apache.org/jira/browse/SPARK-17299?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15451810#comment-15451810 ] Cheng Hao commented on SPARK-17299: --- Yes, that's my bad, I thought it should be the same behavior of `String.trim()`. We should fix this bug. [~jbeard], can you please fix it? > TRIM/LTRIM/RTRIM strips characters other than spaces > > > Key: SPARK-17299 > URL: https://issues.apache.org/jira/browse/SPARK-17299 > Project: Spark > Issue Type: Bug > Components: Documentation, SQL >Affects Versions: 2.0.0 >Reporter: Jeremy Beard >Priority: Minor > > TRIM/LTRIM/RTRIM docs state that they only strip spaces: > http://spark.apache.org/docs/2.0.0/api/java/org/apache/spark/sql/functions.html#trim(org.apache.spark.sql.Column) > But the implementation strips all characters of ASCII value 20 or less: > https://github.com/apache/spark/blob/v2.0.0/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java#L468-L470 -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-15859) Optimize the Partition Pruning with Disjunction
Cheng Hao created SPARK-15859: - Summary: Optimize the Partition Pruning with Disjunction Key: SPARK-15859 URL: https://issues.apache.org/jira/browse/SPARK-15859 Project: Spark Issue Type: Improvement Components: SQL Reporter: Cheng Hao Priority: Critical Currently we can not optimize the partition pruning in disjunction, for example: {{(part1=2 and col1='abc') or (part1=5 and col1='cde')}} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-15730) [Spark SQL] the value of 'hiveconf' parameter in Spark-sql CLI don't take effect in spark-sql session
[ https://issues.apache.org/jira/browse/SPARK-15730?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15318654#comment-15318654 ] Cheng Hao commented on SPARK-15730: --- [~jameszhouyi], can you please verify this fixing? > [Spark SQL] the value of 'hiveconf' parameter in Spark-sql CLI don't take > effect in spark-sql session > - > > Key: SPARK-15730 > URL: https://issues.apache.org/jira/browse/SPARK-15730 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0 >Reporter: Yi Zhou >Priority: Critical > > /usr/lib/spark/bin/spark-sql -v --driver-memory 4g --executor-memory 7g > --executor-cores 5 --num-executors 31 --master yarn-client --conf > spark.yarn.executor.memoryOverhead=1024 --hiveconf RESULT_TABLE=test_result01 > spark-sql> use test; > 16/06/02 21:36:15 INFO execution.SparkSqlParser: Parsing command: use test > 16/06/02 21:36:15 INFO spark.SparkContext: Starting job: processCmd at > CliDriver.java:376 > 16/06/02 21:36:15 INFO scheduler.DAGScheduler: Got job 2 (processCmd at > CliDriver.java:376) with 1 output partitions > 16/06/02 21:36:15 INFO scheduler.DAGScheduler: Final stage: ResultStage 2 > (processCmd at CliDriver.java:376) > 16/06/02 21:36:15 INFO scheduler.DAGScheduler: Parents of final stage: List() > 16/06/02 21:36:15 INFO scheduler.DAGScheduler: Missing parents: List() > 16/06/02 21:36:15 INFO scheduler.DAGScheduler: Submitting ResultStage 2 > (MapPartitionsRDD[8] at processCmd at CliDriver.java:376), which has no > missing parents > 16/06/02 21:36:15 INFO memory.MemoryStore: Block broadcast_2 stored as values > in memory (estimated size 3.2 KB, free 2.4 GB) > 16/06/02 21:36:15 INFO memory.MemoryStore: Block broadcast_2_piece0 stored as > bytes in memory (estimated size 1964.0 B, free 2.4 GB) > 16/06/02 21:36:15 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in > memory on 192.168.3.11:36189 (size: 1964.0 B, free: 2.4 GB) > 16/06/02 21:36:15 INFO spark.SparkContext: Created broadcast 2 from broadcast > at DAGScheduler.scala:1012 > 16/06/02 21:36:15 INFO scheduler.DAGScheduler: Submitting 1 missing tasks > from ResultStage 2 (MapPartitionsRDD[8] at processCmd at CliDriver.java:376) > 16/06/02 21:36:15 INFO cluster.YarnScheduler: Adding task set 2.0 with 1 tasks > 16/06/02 21:36:15 INFO scheduler.TaskSetManager: Starting task 0.0 in stage > 2.0 (TID 2, 192.168.3.13, partition 0, PROCESS_LOCAL, 5362 bytes) > 16/06/02 21:36:15 INFO cluster.YarnClientSchedulerBackend: Launching task 2 > on executor id: 10 hostname: 192.168.3.13. > 16/06/02 21:36:16 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in > memory on hw-node3:45924 (size: 1964.0 B, free: 4.4 GB) > 16/06/02 21:36:17 INFO scheduler.TaskSetManager: Finished task 0.0 in stage > 2.0 (TID 2) in 1934 ms on 192.168.3.13 (1/1) > 16/06/02 21:36:17 INFO cluster.YarnScheduler: Removed TaskSet 2.0, whose > tasks have all completed, from pool > 16/06/02 21:36:17 INFO scheduler.DAGScheduler: ResultStage 2 (processCmd at > CliDriver.java:376) finished in 1.937 s > 16/06/02 21:36:17 INFO scheduler.DAGScheduler: Job 2 finished: processCmd at > CliDriver.java:376, took 1.962631 s > Time taken: 2.027 seconds > 16/06/02 21:36:17 INFO CliDriver: Time taken: 2.027 seconds > spark-sql> DROP TABLE IF EXISTS ${hiveconf:RESULT_TABLE}; > 16/06/02 21:36:36 INFO execution.SparkSqlParser: Parsing command: DROP TABLE > IF EXISTS ${hiveconf:RESULT_TABLE} > Error in query: > mismatched input '$' expecting {'ADD', 'AS', 'ALL', 'GROUP', 'BY', > 'GROUPING', 'SETS', 'CUBE', 'ROLLUP', 'ORDER', 'LIMIT', 'AT', 'IN', 'NO', > 'EXISTS', 'BETWEEN', 'LIKE', RLIKE, 'IS', 'NULL', 'TRUE', 'FALSE', 'NULLS', > 'ASC', 'DESC', 'FOR', 'OUTER', 'LATERAL', 'WINDOW', 'OVER', 'PARTITION', > 'RANGE', 'ROWS', 'PRECEDING', 'FOLLOWING', 'CURRENT', 'ROW', 'WITH', > 'VALUES', 'CREATE', 'TABLE', 'VIEW', 'REPLACE', 'INSERT', 'DELETE', 'INTO', > 'DESCRIBE', 'EXPLAIN', 'FORMAT', 'LOGICAL', 'CODEGEN', 'SHOW', 'TABLES', > 'COLUMNS', 'COLUMN', 'USE', 'PARTITIONS', 'FUNCTIONS', 'DROP', 'TO', > 'TABLESAMPLE', 'ALTER', 'RENAME', 'ARRAY', 'MAP', 'STRUCT', 'COMMENT', 'SET', > 'RESET', 'DATA', 'START', 'TRANSACTION', 'COMMIT', 'ROLLBACK', 'IF', > 'PERCENT', 'BUCKET', 'OUT', 'OF', 'SORT', 'CLUSTER', 'DISTRIBUTE', > 'OVERWRITE', 'TRANSFORM', 'REDUCE', 'USING', 'SERDE', 'SERDEPROPERTIES', > 'RECORDREADER', 'RECORDWRITER', 'DELIMITED', 'FIELDS', 'TERMINATED', > 'COLLECTION', 'ITEMS', 'KEYS', 'ESCAPED', 'LINES', 'SEPARATED', 'EXTENDED', > 'REFRESH', 'CLEAR', 'CACHE', 'UNCACHE', 'LAZY', 'FORMATTED', TEMPORARY, > 'OPTIONS', 'UNSET', 'TBLPROPERTIES', 'DBPROPERTIES', 'BUCKETS', 'SKEWED', > 'STORED', 'DIRECTORIES', 'LOCATION', 'EXCHANGE', 'ARCHIVE', 'UNARCHIVE', > 'FILEFORMAT',
[jira] [Commented] (SPARK-15034) Use the value of spark.sql.warehouse.dir as the warehouse location instead of using hive.metastore.warehouse.dir
[ https://issues.apache.org/jira/browse/SPARK-15034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15300072#comment-15300072 ] Cheng Hao commented on SPARK-15034: --- [~yhuai], but it probably not respect the `hive-site.xml`, and lots of users will be impacted by this configuration change, will that acceptable? > Use the value of spark.sql.warehouse.dir as the warehouse location instead of > using hive.metastore.warehouse.dir > > > Key: SPARK-15034 > URL: https://issues.apache.org/jira/browse/SPARK-15034 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Yin Huai >Assignee: Yin Huai > Labels: release_notes, releasenotes > Fix For: 2.0.0 > > > Starting from Spark 2.0, spark.sql.warehouse.dir will be the conf to set > warehouse location. We will not use hive.metastore.warehouse.dir. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-13894) SQLContext.range should return Dataset[Long]
[ https://issues.apache.org/jira/browse/SPARK-13894?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15195274#comment-15195274 ] Cheng Hao commented on SPARK-13894: --- The existing functions "SQLContext.range()" returns the underlying schema with name "id", it will be lots of unit test code requires to be updated if we changed the column name to "value". How about keep the name as "id" unchanged? > SQLContext.range should return Dataset[Long] > > > Key: SPARK-13894 > URL: https://issues.apache.org/jira/browse/SPARK-13894 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Reynold Xin >Priority: Blocker > > Rather than returning DataFrame, it should return a Dataset[Long]. The > documentation should still make it clear that the underlying schema consists > of a single long column named "value". -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-13326) Dataset in spark 2.0.0-SNAPSHOT missing columns
[ https://issues.apache.org/jira/browse/SPARK-13326?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15195022#comment-15195022 ] Cheng Hao commented on SPARK-13326: --- Can not reproduce it anymore, can you try it again? > Dataset in spark 2.0.0-SNAPSHOT missing columns > --- > > Key: SPARK-13326 > URL: https://issues.apache.org/jira/browse/SPARK-13326 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0 >Reporter: koert kuipers >Priority: Minor > > i noticed some things stopped working on datasets in spark 2.0.0-SNAPSHOT, > and with a confusing error message (cannot resolved some column with input > columns []). > for example in 1.6.0-SNAPSHOT: > {noformat} > scala> val ds = sc.parallelize(1 to 10).toDS > ds: org.apache.spark.sql.Dataset[Int] = [value: int] > scala> ds.map(x => Option(x)) > res0: org.apache.spark.sql.Dataset[Option[Int]] = [value: int] > {noformat} > and same commands in 2.0.0-SNAPSHOT: > {noformat} > scala> val ds = sc.parallelize(1 to 10).toDS > ds: org.apache.spark.sql.Dataset[Int] = [value: int] > scala> ds.map(x => Option(x)) > org.apache.spark.sql.AnalysisException: cannot resolve 'value' given input > columns: []; > at > org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42) > at > org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:60) > at > org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:57) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:284) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:284) > at > org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:283) > at > org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionUp$1(QueryPlan.scala:162) > at > org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2(QueryPlan.scala:172) > at > org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2$1.apply(QueryPlan.scala:176) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245) > at scala.collection.immutable.List.foreach(List.scala:381) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:245) > at scala.collection.immutable.List.map(List.scala:285) > at > org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2(QueryPlan.scala:176) > at > org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:181) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:370) > at scala.collection.Iterator$class.foreach(Iterator.scala:742) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1194) > at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) > at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104) > at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48) > at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:308) > at scala.collection.AbstractIterator.to(Iterator.scala:1194) > at > scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:300) > at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1194) > at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:287) > at scala.collection.AbstractIterator.toArray(Iterator.scala:1194) > at > org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:181) > at > org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:57) > at > org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:50) > at > org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:122) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:121) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:121) > at scala.collection.immutable.List.foreach(List.scala:381) > at > org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:121) > at > org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:50) > at > org.apache.spark
[jira] [Created] (SPARK-12610) Add Anti Join Operators
Cheng Hao created SPARK-12610: - Summary: Add Anti Join Operators Key: SPARK-12610 URL: https://issues.apache.org/jira/browse/SPARK-12610 Project: Spark Issue Type: New Feature Components: SQL Reporter: Cheng Hao We need to implements the anti join operators, for supporting the NOT predicates in subquery. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-12610) Add Anti Join Operators
[ https://issues.apache.org/jira/browse/SPARK-12610?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Hao updated SPARK-12610: -- Issue Type: Sub-task (was: New Feature) Parent: SPARK-4226 > Add Anti Join Operators > --- > > Key: SPARK-12610 > URL: https://issues.apache.org/jira/browse/SPARK-12610 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Cheng Hao > > We need to implements the anti join operators, for supporting the NOT > predicates in subquery. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12196) Store blocks in different speed storage devices by hierarchy way
[ https://issues.apache.org/jira/browse/SPARK-12196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15072634#comment-15072634 ] Cheng Hao commented on SPARK-12196: --- Thank you wei wu to support this feature! However, we're trying to avoid to change the existing configuration format, as it might impact the user applications, and besides, in Yarn/Mesos, this configuration key will not work anymore. An updated PR will be submitted soon, welcome to join the discussion the in PR. > Store blocks in different speed storage devices by hierarchy way > > > Key: SPARK-12196 > URL: https://issues.apache.org/jira/browse/SPARK-12196 > Project: Spark > Issue Type: New Feature > Components: Spark Core >Reporter: yucai > > *Problem* > Nowadays, users have both SSDs and HDDs. > SSDs have great performance, but capacity is small. HDDs have good capacity, > but x2-x3 lower than SSDs. > How can we get both good? > *Solution* > Our idea is to build hierarchy store: use SSDs as cache and HDDs as backup > storage. > When Spark core allocates blocks for RDD (either shuffle or RDD cache), it > gets blocks from SSDs first, and when SSD’s useable space is less than some > threshold, getting blocks from HDDs. > In our implementation, we actually go further. We support a way to build any > level hierarchy store access all storage medias (NVM, SSD, HDD etc.). > *Performance* > 1. At the best case, our solution performs the same as all SSDs. > 2. At the worst case, like all data are spilled to HDDs, no performance > regression. > 3. Compared with all HDDs, hierarchy store improves more than *_x1.86_* (it > could be higher, CPU reaches bottleneck in our test environment). > 4. Compared with Tachyon, our hierarchy store still *_x1.3_* faster. Because > we support both RDD cache and shuffle and no extra inter process > communication. > *Usage* > 1. Set the priority and threshold for each layer in > spark.storage.hierarchyStore. > {code} > spark.storage.hierarchyStore='nvm 50GB,ssd 80GB' > {code} > It builds a 3 layers hierarchy store: the 1st is "nvm", the 2nd is "sdd", all > the rest form the last layer. > 2. Configure each layer's location, user just needs put the keyword like > "nvm", "ssd", which are specified in step 1, into local dirs, like > spark.local.dir or yarn.nodemanager.local-dirs. > {code} > spark.local.dir=/mnt/nvm1,/mnt/ssd1,/mnt/ssd2,/mnt/ssd3,/mnt/disk1,/mnt/disk2,/mnt/disk3,/mnt/disk4,/mnt/others > {code} > After then, restart your Spark application, it will allocate blocks from nvm > first. > When nvm's usable space is less than 50GB, it starts to allocate from ssd. > When ssd's usable space is less than 80GB, it starts to allocate from the > last layer. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-8360) Streaming DataFrames
[ https://issues.apache.org/jira/browse/SPARK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15035335#comment-15035335 ] Cheng Hao edited comment on SPARK-8360 at 12/2/15 12:14 PM: Remove the google docs link, as I cannot make it access for anyone when using the corp account. In the meantime, I put an pdf doc, hopefully helpful. was (Author: chenghao): Add some thoughts on StreamingSQL. https://docs.google.com/document/u/1/d/1ZgmDsNLT2XltMI317nNtkUkxBFwaaCkLHtPXjQE6JBY/pub Request Edit if you needed. https://docs.google.com/document/d/1ZgmDsNLT2XltMI317nNtkUkxBFwaaCkLHtPXjQE6JBY/edit > Streaming DataFrames > > > Key: SPARK-8360 > URL: https://issues.apache.org/jira/browse/SPARK-8360 > Project: Spark > Issue Type: Umbrella > Components: SQL, Streaming >Reporter: Reynold Xin > Attachments: StreamingDataFrameProposal.pdf > > > Umbrella ticket to track what's needed to make streaming DataFrame a reality. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-8360) Streaming DataFrames
[ https://issues.apache.org/jira/browse/SPARK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Hao updated SPARK-8360: - Attachment: StreamingDataFrameProposal.pdf This is a proposal for streaming dataframes that we were trying to work, hopefully helpful for the new design. > Streaming DataFrames > > > Key: SPARK-8360 > URL: https://issues.apache.org/jira/browse/SPARK-8360 > Project: Spark > Issue Type: Umbrella > Components: SQL, Streaming >Reporter: Reynold Xin > Attachments: StreamingDataFrameProposal.pdf > > > Umbrella ticket to track what's needed to make streaming DataFrame a reality. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-8360) Streaming DataFrames
[ https://issues.apache.org/jira/browse/SPARK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15035335#comment-15035335 ] Cheng Hao edited comment on SPARK-8360 at 12/2/15 6:19 AM: --- Add some thoughts on StreamingSQL. https://docs.google.com/document/u/1/d/1ZgmDsNLT2XltMI317nNtkUkxBFwaaCkLHtPXjQE6JBY/pub Request Edit if you needed. https://docs.google.com/document/d/1ZgmDsNLT2XltMI317nNtkUkxBFwaaCkLHtPXjQE6JBY/edit was (Author: chenghao): Add some thoughts on StreamingSQL. https://docs.google.com/document/d/1ZgmDsNLT2XltMI317nNtkUkxBFwaaCkLHtPXjQE6JBY/edit > Streaming DataFrames > > > Key: SPARK-8360 > URL: https://issues.apache.org/jira/browse/SPARK-8360 > Project: Spark > Issue Type: Umbrella > Components: SQL, Streaming >Reporter: Reynold Xin > > Umbrella ticket to track what's needed to make streaming DataFrame a reality. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-8360) Streaming DataFrames
[ https://issues.apache.org/jira/browse/SPARK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15035335#comment-15035335 ] Cheng Hao commented on SPARK-8360: -- Add some thoughts on StreamingSQL. https://docs.google.com/document/d/1ZgmDsNLT2XltMI317nNtkUkxBFwaaCkLHtPXjQE6JBY/edit > Streaming DataFrames > > > Key: SPARK-8360 > URL: https://issues.apache.org/jira/browse/SPARK-8360 > Project: Spark > Issue Type: Umbrella > Components: SQL, Streaming >Reporter: Reynold Xin > > Umbrella ticket to track what's needed to make streaming DataFrame a reality. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-12064) Make the SqlParser as trait for better integrated with extensions
[ https://issues.apache.org/jira/browse/SPARK-12064?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Hao resolved SPARK-12064. --- Resolution: Won't Fix DBX has plan to remove the SqlParser in 2.0. > Make the SqlParser as trait for better integrated with extensions > - > > Key: SPARK-12064 > URL: https://issues.apache.org/jira/browse/SPARK-12064 > Project: Spark > Issue Type: Improvement > Components: SQL >Reporter: Cheng Hao > > `SqlParser` is now an object, which hard to reuse it in extensions, a proper > implementation will be make the `SqlParser` as trait, and keep all of its > implementation unchanged, and then add another object called `SqlParser` > inherits from the trait. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-12064) Make the SqlParser as trait for better integrated with extensions
Cheng Hao created SPARK-12064: - Summary: Make the SqlParser as trait for better integrated with extensions Key: SPARK-12064 URL: https://issues.apache.org/jira/browse/SPARK-12064 Project: Spark Issue Type: Improvement Components: SQL Reporter: Cheng Hao `SqlParser` is now an object, which hard to reuse it in extensions, a proper implementation will be make the `SqlParser` as trait, and keep all of its implementation unchanged, and then add another object called `SqlParser` inherits from the trait. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-10865) [Spark SQL] [UDF] the ceil/ceiling function got wrong return value type
[ https://issues.apache.org/jira/browse/SPARK-10865?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15001423#comment-15001423 ] Cheng Hao commented on SPARK-10865: --- 1.5.2 is released, I am not sure whether part of it now or not. > [Spark SQL] [UDF] the ceil/ceiling function got wrong return value type > --- > > Key: SPARK-10865 > URL: https://issues.apache.org/jira/browse/SPARK-10865 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.5.0 >Reporter: Yi Zhou >Assignee: Cheng Hao > Fix For: 1.6.0 > > > As per ceil/ceiling definition,it should get BIGINT return value > -ceil(DOUBLE a), ceiling(DOUBLE a) > -Returns the minimum BIGINT value that is equal to or greater than a. > But in current Spark implementation, it got wrong value type. > e.g., > select ceil(2642.12) from udf_test_web_sales limit 1; > 2643.0 > In hive implementation, it got return value type like below: > hive> select ceil(2642.12) from udf_test_web_sales limit 1; > OK > 2643 -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-10865) [Spark SQL] [UDF] the ceil/ceiling function got wrong return value type
[ https://issues.apache.org/jira/browse/SPARK-10865?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15001422#comment-15001422 ] Cheng Hao commented on SPARK-10865: --- We actually follow the criteria of Hive, and actually I tested it in MySQL, it works in the same way. > [Spark SQL] [UDF] the ceil/ceiling function got wrong return value type > --- > > Key: SPARK-10865 > URL: https://issues.apache.org/jira/browse/SPARK-10865 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.5.0 >Reporter: Yi Zhou >Assignee: Cheng Hao > Fix For: 1.6.0 > > > As per ceil/ceiling definition,it should get BIGINT return value > -ceil(DOUBLE a), ceiling(DOUBLE a) > -Returns the minimum BIGINT value that is equal to or greater than a. > But in current Spark implementation, it got wrong value type. > e.g., > select ceil(2642.12) from udf_test_web_sales limit 1; > 2643.0 > In hive implementation, it got return value type like below: > hive> select ceil(2642.12) from udf_test_web_sales limit 1; > OK > 2643 -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-11512) Bucket Join
[ https://issues.apache.org/jira/browse/SPARK-11512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14990868#comment-14990868 ] Cheng Hao commented on SPARK-11512: --- We need to support the "bucket" for DataSource API. > Bucket Join > --- > > Key: SPARK-11512 > URL: https://issues.apache.org/jira/browse/SPARK-11512 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Cheng Hao > > Sort merge join on two datasets on the file system that have already been > partitioned the same with the same number of partitions and sorted within > each partition, and we don't need to sort it again while join with the > sorted/partitioned keys > This functionality exists in > - Hive (hive.optimize.bucketmapjoin.sortedmerge) > - Pig (USING 'merge') > - MapReduce (CompositeInputFormat) -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-11512) Bucket Join
[ https://issues.apache.org/jira/browse/SPARK-11512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14990867#comment-14990867 ] Cheng Hao commented on SPARK-11512: --- Oh, yes, but SPARK-5292 is only about to support the Hive bucket, but in a generic way, we need to add support the bucket for Data Source API. Anyway, I will add a link with that jira issue. > Bucket Join > --- > > Key: SPARK-11512 > URL: https://issues.apache.org/jira/browse/SPARK-11512 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Cheng Hao > > Sort merge join on two datasets on the file system that have already been > partitioned the same with the same number of partitions and sorted within > each partition, and we don't need to sort it again while join with the > sorted/partitioned keys > This functionality exists in > - Hive (hive.optimize.bucketmapjoin.sortedmerge) > - Pig (USING 'merge') > - MapReduce (CompositeInputFormat) -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-11512) Bucket Join
Cheng Hao created SPARK-11512: - Summary: Bucket Join Key: SPARK-11512 URL: https://issues.apache.org/jira/browse/SPARK-11512 Project: Spark Issue Type: Sub-task Components: SQL Reporter: Cheng Hao Sort merge join on two datasets on the file system that have already been partitioned the same with the same number of partitions and sorted within each partition, and we don't need to sort it again while join with the sorted/partitioned keys This functionality exists in - Hive (hive.optimize.bucketmapjoin.sortedmerge) - Pig (USING 'merge') - MapReduce (CompositeInputFormat) -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-10371) Optimize sequential projections
[ https://issues.apache.org/jira/browse/SPARK-10371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14981650#comment-14981650 ] Cheng Hao commented on SPARK-10371: --- Eliminating the common sub expression within the projection? > Optimize sequential projections > --- > > Key: SPARK-10371 > URL: https://issues.apache.org/jira/browse/SPARK-10371 > Project: Spark > Issue Type: New Feature > Components: ML, SQL >Affects Versions: 1.5.0 >Reporter: Xiangrui Meng > > In ML pipelines, each transformer/estimator appends new columns to the input > DataFrame. For example, it might produce DataFrames like the following > columns: a, b, c, d, where a is from raw input, b = udf_b(a), c = udf_c(b), > and d = udf_d(c). Some UDFs could be expensive. However, if we materialize c > and d, udf_b, and udf_c are triggered twice, i.e., value c is not re-used. > It would be nice to detect this pattern and re-use intermediate values. > {code} > val input = sqlContext.range(10) > val output = input.withColumn("x", col("id") + 1).withColumn("y", col("x") * > 2) > output.explain(true) > == Parsed Logical Plan == > 'Project [*,('x * 2) AS y#254] > Project [id#252L,(id#252L + cast(1 as bigint)) AS x#253L] > LogicalRDD [id#252L], MapPartitionsRDD[458] at range at :30 > == Analyzed Logical Plan == > id: bigint, x: bigint, y: bigint > Project [id#252L,x#253L,(x#253L * cast(2 as bigint)) AS y#254L] > Project [id#252L,(id#252L + cast(1 as bigint)) AS x#253L] > LogicalRDD [id#252L], MapPartitionsRDD[458] at range at :30 > == Optimized Logical Plan == > Project [id#252L,(id#252L + 1) AS x#253L,((id#252L + 1) * 2) AS y#254L] > LogicalRDD [id#252L], MapPartitionsRDD[458] at range at :30 > == Physical Plan == > TungstenProject [id#252L,(id#252L + 1) AS x#253L,((id#252L + 1) * 2) AS > y#254L] > Scan PhysicalRDD[id#252L] > Code Generation: true > input: org.apache.spark.sql.DataFrame = [id: bigint] > output: org.apache.spark.sql.DataFrame = [id: bigint, x: bigint, y: bigint] > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-11330) Filter operation on StringType after groupBy PERSISTED brings no results
[ https://issues.apache.org/jira/browse/SPARK-11330?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14979699#comment-14979699 ] Cheng Hao edited comment on SPARK-11330 at 10/29/15 2:48 AM: - OK, seems it's solved in https://issues.apache.org/jira/browse/SPARK-10859, it should not be the problem in the 1.5.2 or 1.6.0 any more. was (Author: chenghao): OK, seems it's solved in https://issues.apache.org/jira/browse/SPARK-11330, it should not be the problem in the 1.5.2 or 1.6.0 any more. > Filter operation on StringType after groupBy PERSISTED brings no results > > > Key: SPARK-11330 > URL: https://issues.apache.org/jira/browse/SPARK-11330 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.5.1 > Environment: Stand alone Cluster of five servers (happens as well in > local mode). sqlContext instance of HiveContext (happens as well with > SQLContext) > No special options other than driver memory and executor memory. > Parquet partitions are 512 where there are 160 cores. Happens as well with > other partitioning > Data is nearly 2 billion rows. >Reporter: Saif Addin Ellafi >Priority: Blocker > Attachments: bug_reproduce.zip, bug_reproduce_50k.zip > > > ONLY HAPPENS WHEN PERSIST() IS CALLED > val data = sqlContext.read.parquet("/var/data/Saif/ppnr_pqt") > data.groupBy("vintages").count.select("vintages").filter("vintages = > '2007-01-01'").first > >>> res9: org.apache.spark.sql.Row = [2007-01-01] > data.groupBy("vintages").count.persist.select("vintages").filter("vintages = > '2007-01-01'").first > >>> Exception on empty iterator stuff > This does not happen if using another type of field, eg IntType > data.groupBy("mm").count.persist.select("mm").filter("mm = > 200805").first >>> res13: org.apache.spark.sql.Row = [200805] > NOTE: I have no idea whether I used KRYO serializer when stored this parquet. > NOTE2: If setting the persist after the filtering, it works fine. But this is > not a good enough workaround since any filter operation afterwards will break > results. > NOTE3: I have reproduced the issue with several different datasets. > NOTE4: The only real workaround is to store the groupBy dataframe in database > and reload it as a new dataframe. > Query to raw-data works fine: > data.select("vintages").filter("vintages = '2007-01-01'").first >>> res4: > org.apache.spark.sql.Row = [2007-01-01] > Originally, the issue happened with a larger aggregation operation, the > result was that data was inconsistent bringing different results every call. > Reducing the operation step by step, I got into this issue. > In any case, the original operation was: > val data = sqlContext.read.parquet("/var/Saif/data_pqt") > val res = data.groupBy("product", "band", "age", "vint", "mb", > "mm").agg(count($"account_id").as("N"), > sum($"balance").as("balance_eom"), sum($"balancec").as("balance_eoc"), > sum($"spend").as("spend"), sum($"payment").as("payment"), > sum($"feoc").as("feoc"), sum($"cfintbal").as("cfintbal"), count($"newacct" > === 1).as("newacct")).persist() > val z = res.select("vint", "mm").filter("vint = > '2007-01-01'").select("mm").distinct.collect > z.length > >>> res0: Int = 102 > res.unpersist() > val z = res.select("vint", "mm").filter("vint = > '2007-01-01'").select("mm").distinct.collect > z.length > >>> res1: Int = 103 -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-11330) Filter operation on StringType after groupBy PERSISTED brings no results
[ https://issues.apache.org/jira/browse/SPARK-11330?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14979699#comment-14979699 ] Cheng Hao commented on SPARK-11330: --- OK, seems it's solved in https://issues.apache.org/jira/browse/SPARK-11330, it should not be the problem in the 1.5.2 or 1.6.0 any more. > Filter operation on StringType after groupBy PERSISTED brings no results > > > Key: SPARK-11330 > URL: https://issues.apache.org/jira/browse/SPARK-11330 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.5.1 > Environment: Stand alone Cluster of five servers (happens as well in > local mode). sqlContext instance of HiveContext (happens as well with > SQLContext) > No special options other than driver memory and executor memory. > Parquet partitions are 512 where there are 160 cores. Happens as well with > other partitioning > Data is nearly 2 billion rows. >Reporter: Saif Addin Ellafi >Priority: Blocker > Attachments: bug_reproduce.zip, bug_reproduce_50k.zip > > > ONLY HAPPENS WHEN PERSIST() IS CALLED > val data = sqlContext.read.parquet("/var/data/Saif/ppnr_pqt") > data.groupBy("vintages").count.select("vintages").filter("vintages = > '2007-01-01'").first > >>> res9: org.apache.spark.sql.Row = [2007-01-01] > data.groupBy("vintages").count.persist.select("vintages").filter("vintages = > '2007-01-01'").first > >>> Exception on empty iterator stuff > This does not happen if using another type of field, eg IntType > data.groupBy("mm").count.persist.select("mm").filter("mm = > 200805").first >>> res13: org.apache.spark.sql.Row = [200805] > NOTE: I have no idea whether I used KRYO serializer when stored this parquet. > NOTE2: If setting the persist after the filtering, it works fine. But this is > not a good enough workaround since any filter operation afterwards will break > results. > NOTE3: I have reproduced the issue with several different datasets. > NOTE4: The only real workaround is to store the groupBy dataframe in database > and reload it as a new dataframe. > Query to raw-data works fine: > data.select("vintages").filter("vintages = '2007-01-01'").first >>> res4: > org.apache.spark.sql.Row = [2007-01-01] > Originally, the issue happened with a larger aggregation operation, the > result was that data was inconsistent bringing different results every call. > Reducing the operation step by step, I got into this issue. > In any case, the original operation was: > val data = sqlContext.read.parquet("/var/Saif/data_pqt") > val res = data.groupBy("product", "band", "age", "vint", "mb", > "mm").agg(count($"account_id").as("N"), > sum($"balance").as("balance_eom"), sum($"balancec").as("balance_eoc"), > sum($"spend").as("spend"), sum($"payment").as("payment"), > sum($"feoc").as("feoc"), sum($"cfintbal").as("cfintbal"), count($"newacct" > === 1).as("newacct")).persist() > val z = res.select("vint", "mm").filter("vint = > '2007-01-01'").select("mm").distinct.collect > z.length > >>> res0: Int = 102 > res.unpersist() > val z = res.select("vint", "mm").filter("vint = > '2007-01-01'").select("mm").distinct.collect > z.length > >>> res1: Int = 103 -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-11330) Filter operation on StringType after groupBy PERSISTED brings no results
[ https://issues.apache.org/jira/browse/SPARK-11330?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14979646#comment-14979646 ] Cheng Hao commented on SPARK-11330: --- [~saif.a.ellafi] I've checked that with 1.5.0 and it's confirmed it can be reproduced, however, it does not exists in latest master branch, I am still digging when and how it's been fixed. > Filter operation on StringType after groupBy PERSISTED brings no results > > > Key: SPARK-11330 > URL: https://issues.apache.org/jira/browse/SPARK-11330 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.5.1 > Environment: Stand alone Cluster of five servers (happens as well in > local mode). sqlContext instance of HiveContext (happens as well with > SQLContext) > No special options other than driver memory and executor memory. > Parquet partitions are 512 where there are 160 cores. Happens as well with > other partitioning > Data is nearly 2 billion rows. >Reporter: Saif Addin Ellafi >Priority: Blocker > Attachments: bug_reproduce.zip, bug_reproduce_50k.zip > > > ONLY HAPPENS WHEN PERSIST() IS CALLED > val data = sqlContext.read.parquet("/var/data/Saif/ppnr_pqt") > data.groupBy("vintages").count.select("vintages").filter("vintages = > '2007-01-01'").first > >>> res9: org.apache.spark.sql.Row = [2007-01-01] > data.groupBy("vintages").count.persist.select("vintages").filter("vintages = > '2007-01-01'").first > >>> Exception on empty iterator stuff > This does not happen if using another type of field, eg IntType > data.groupBy("mm").count.persist.select("mm").filter("mm = > 200805").first >>> res13: org.apache.spark.sql.Row = [200805] > NOTE: I have no idea whether I used KRYO serializer when stored this parquet. > NOTE2: If setting the persist after the filtering, it works fine. But this is > not a good enough workaround since any filter operation afterwards will break > results. > NOTE3: I have reproduced the issue with several different datasets. > NOTE4: The only real workaround is to store the groupBy dataframe in database > and reload it as a new dataframe. > Query to raw-data works fine: > data.select("vintages").filter("vintages = '2007-01-01'").first >>> res4: > org.apache.spark.sql.Row = [2007-01-01] > Originally, the issue happened with a larger aggregation operation, the > result was that data was inconsistent bringing different results every call. > Reducing the operation step by step, I got into this issue. > In any case, the original operation was: > val data = sqlContext.read.parquet("/var/Saif/data_pqt") > val res = data.groupBy("product", "band", "age", "vint", "mb", > "mm").agg(count($"account_id").as("N"), > sum($"balance").as("balance_eom"), sum($"balancec").as("balance_eoc"), > sum($"spend").as("spend"), sum($"payment").as("payment"), > sum($"feoc").as("feoc"), sum($"cfintbal").as("cfintbal"), count($"newacct" > === 1).as("newacct")).persist() > val z = res.select("vint", "mm").filter("vint = > '2007-01-01'").select("mm").distinct.collect > z.length > >>> res0: Int = 102 > res.unpersist() > val z = res.select("vint", "mm").filter("vint = > '2007-01-01'").select("mm").distinct.collect > z.length > >>> res1: Int = 103 -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-11364) HadoopFsRelation doesn't reload the hadoop configuration for each execution
Cheng Hao created SPARK-11364: - Summary: HadoopFsRelation doesn't reload the hadoop configuration for each execution Key: SPARK-11364 URL: https://issues.apache.org/jira/browse/SPARK-11364 Project: Spark Issue Type: Bug Components: SQL Reporter: Cheng Hao https://www.mail-archive.com/user@spark.apache.org/msg39706.html We didn't propagate the hadoop configuration to the Data Source, as we always try to load the default hadoop configuration. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-11330) Filter operation on StringType after groupBy PERSISTED brings no results
[ https://issues.apache.org/jira/browse/SPARK-11330?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14977600#comment-14977600 ] Cheng Hao edited comment on SPARK-11330 at 10/28/15 2:28 AM: - Hi, [~saif.a.ellafi], I've tried the code like below: {code} case class Spark11330(account_id: Int, product: String, vint: String, band: String, age: Int, mb: String, mm: String, balance: Float, balancec: Float) test("SPARK-11330: Filter operation on StringType after groupBy PERSISTED brings no results") { withTempPath { f => // generate the more data. val d = Seq( Spark11330(1, "product1", "2007-01-01", "band1", 27, "mb1", "200808", 1000.0f, 2000.0f), Spark11330(1, "product1", "2007-01-01", "band1", 27, "mb1", "200808", 2000.0f, 2000.0f), Spark11330(1, "product1", "2007-01-01", "band1", 27, "mb1", "200809", 2000.0f, 2000.0f), Spark11330(2, "product2", "2007-01-01", "band3", 29, "mb1", "200809", 2010.0f, 3000.0f)) val data = List.tabulate[Seq[Spark11330]](10) { i => d }.flatten // save as parquet file in local disk sqlContext.sparkContext.parallelize(data, 4) .toDF().write.format("parquet").mode(SaveMode.Overwrite).save(f.getAbsolutePath) // reproduce val df = sqlContext.read.parquet(f.getAbsolutePath) val f1 = df.groupBy("vint").count().persist().filter("vint = '2007-01-01'").first val f2 = df.groupBy("vint").count().filter("vint = '2007-01-01'").first assert(f1 == f2) val res = df .groupBy("product", "band", "age", "vint", "mb", "mm") .agg( count($"account_id").as("N"), sum($"balance").as("balance_eom"), sum($"balancec").as("balance_eoc")).persist() val c1 = res.select("vint", "mm").filter("vint='2007-01-01'").select("mm").distinct.collect res.unpersist() val c2 = res.select("vint", "mm").filter("vint='2007-01-01'").select("mm").distinct.collect assert(c1.sameElements(c2)) } } {code} Seems everything works fine, I am not sure if I missed something, can you try to reproduce the issue based on my code? was (Author: chenghao): Hi, [~saif.a.ellafi], I've tried the code like below: {code} case class Spark11330(account_id: Int, product: String, vint: String, band: String, age: Int, mb: String, mm: String, balance: Float, balancec: Float) test("SPARK-11330: Filter operation on StringType after groupBy PERSISTED brings no results") { withTempPath { f => val d = Seq( Spark11330(1, "product1", "2007-01-01", "band1", 27, "mb1", "200808", 1000.0f, 2000.0f), Spark11330(1, "product1", "2007-01-01", "band1", 27, "mb1", "200808", 2000.0f, 2000.0f), Spark11330(1, "product1", "2007-01-01", "band1", 27, "mb1", "200809", 2000.0f, 2000.0f), Spark11330(2, "product2", "2007-01-01", "band3", 29, "mb1", "200809", 2010.0f, 3000.0f)) val data = List.tabulate[Seq[Spark11330]](10) { i => d }.flatten sqlContext.sparkContext.parallelize(data, 4) .toDF().write.format("parquet").mode(SaveMode.Overwrite).save(f.getAbsolutePath) val df = sqlContext.read.parquet(f.getAbsolutePath) val f1 = df.groupBy("vint").count().persist().filter("vint = '2007-01-01'").first val f2 = df.groupBy("vint").count().filter("vint = '2007-01-01'").first assert(f1 == f2) val res = df .groupBy("product", "band", "age", "vint", "mb", "mm") .agg( count($"account_id").as("N"), sum($"balance").as("balance_eom"), sum($"balancec").as("balance_eoc")).persist() val c1 = res.select("vint", "mm").filter("vint='2007-01-01'").select("mm").distinct.collect res.unpersist() val c2 = res.select("vint", "mm").filter("vint='2007-01-01'").select("mm").distinct.collect assert(c1.sameElements(c2)) } } {code} Seems everything works fine, I am not sure if I missed something, can you try to reproduce the issue based on my code? > Filter operation on StringType after groupBy PERSISTED brings no results > > > Key: SPARK-11330 > URL: https://issues.apache.org/jira/browse/SPARK-11330 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.5.1 > Environment: Stand alone Cluster of five servers (happens as well in > local mode). sqlContext instance of HiveContext (happens as well with > SQLContext) > No special options other than driver memory and executor memory. > Parquet partitions are 512 where there are 160 cores. Happens as well with > other partitioning > Data is nearly 2 billion rows. >
[jira] [Commented] (SPARK-11330) Filter operation on StringType after groupBy PERSISTED brings no results
[ https://issues.apache.org/jira/browse/SPARK-11330?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14977600#comment-14977600 ] Cheng Hao commented on SPARK-11330: --- Hi, [~saif.a.ellafi], I've tried the code like below: {code} case class Spark11330(account_id: Int, product: String, vint: String, band: String, age: Int, mb: String, mm: String, balance: Float, balancec: Float) test("SPARK-11330: Filter operation on StringType after groupBy PERSISTED brings no results") { withTempPath { f => val d = Seq( Spark11330(1, "product1", "2007-01-01", "band1", 27, "mb1", "200808", 1000.0f, 2000.0f), Spark11330(1, "product1", "2007-01-01", "band1", 27, "mb1", "200808", 2000.0f, 2000.0f), Spark11330(1, "product1", "2007-01-01", "band1", 27, "mb1", "200809", 2000.0f, 2000.0f), Spark11330(2, "product2", "2007-01-01", "band3", 29, "mb1", "200809", 2010.0f, 3000.0f)) val data = List.tabulate[Seq[Spark11330]](10) { i => d }.flatten sqlContext.sparkContext.parallelize(data, 4) .toDF().write.format("parquet").mode(SaveMode.Overwrite).save(f.getAbsolutePath) val df = sqlContext.read.parquet(f.getAbsolutePath) val f1 = df.groupBy("vint").count().persist().filter("vint = '2007-01-01'").first val f2 = df.groupBy("vint").count().filter("vint = '2007-01-01'").first assert(f1 == f2) val res = df .groupBy("product", "band", "age", "vint", "mb", "mm") .agg( count($"account_id").as("N"), sum($"balance").as("balance_eom"), sum($"balancec").as("balance_eoc")).persist() val c1 = res.select("vint", "mm").filter("vint='2007-01-01'").select("mm").distinct.collect res.unpersist() val c2 = res.select("vint", "mm").filter("vint='2007-01-01'").select("mm").distinct.collect assert(c1.sameElements(c2)) } } {code} Seems everything works fine, I am not sure if I missed something, can you try to reproduce the issue based on my code? > Filter operation on StringType after groupBy PERSISTED brings no results > > > Key: SPARK-11330 > URL: https://issues.apache.org/jira/browse/SPARK-11330 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.5.1 > Environment: Stand alone Cluster of five servers (happens as well in > local mode). sqlContext instance of HiveContext (happens as well with > SQLContext) > No special options other than driver memory and executor memory. > Parquet partitions are 512 where there are 160 cores. Happens as well with > other partitioning > Data is nearly 2 billion rows. >Reporter: Saif Addin Ellafi >Priority: Blocker > > ONLY HAPPENS WHEN PERSIST() IS CALLED > val data = sqlContext.read.parquet("/var/data/Saif/ppnr_pqt") > data.groupBy("vintages").count.select("vintages").filter("vintages = > '2007-01-01'").first > >>> res9: org.apache.spark.sql.Row = [2007-01-01] > data.groupBy("vintages").count.persist.select("vintages").filter("vintages = > '2007-01-01'").first > >>> Exception on empty iterator stuff > This does not happen if using another type of field, eg IntType > data.groupBy("mm").count.persist.select("mm").filter("mm = > 200805").first >>> res13: org.apache.spark.sql.Row = [200805] > NOTE: I have no idea whether I used KRYO serializer when stored this parquet. > NOTE2: If setting the persist after the filtering, it works fine. But this is > not a good enough workaround since any filter operation afterwards will break > results. > NOTE3: I have reproduced the issue with several different datasets. > NOTE4: The only real workaround is to store the groupBy dataframe in database > and reload it as a new dataframe. > Query to raw-data works fine: > data.select("vintages").filter("vintages = '2007-01-01'").first >>> res4: > org.apache.spark.sql.Row = [2007-01-01] > Originally, the issue happened with a larger aggregation operation, the > result was that data was inconsistent bringing different results every call. > Reducing the operation step by step, I got into this issue. > In any case, the original operation was: > val data = sqlContext.read.parquet("/var/Saif/data_pqt") > val res = data.groupBy("product", "band", "age", "vint", "mb", > "mm").agg(count($"account_id").as("N"), > sum($"balance").as("balance_eom"), sum($"balancec").as("balance_eoc"), > sum($"spend").as("spend"), sum($"payment").as("payment"), > sum($"feoc").as("feoc"), sum($"cfintbal").as("cfintbal"), count($"newacct" > === 1).as("newacct")).persist() > val z = res.select("vint", "mm").filter("vint = > '2007-01-01'").select("mm").distinct.collect > z.length > >>> res0: Int = 102 > res.unpersist
[jira] [Updated] (SPARK-9735) Auto infer partition schema of HadoopFsRelation should should respected the user specified one
[ https://issues.apache.org/jira/browse/SPARK-9735?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Hao updated SPARK-9735: - Description: This code is copied from the hadoopFsRelationSuite.scala {code} partitionedTestDF = (for { i <- 1 to 3 p2 <- Seq("foo", "bar") } yield (i, s"val_$i", 1, p2)).toDF("a", "b", "p1", "p2") withTempPath { file => val input = partitionedTestDF.select('a, 'b, 'p1.cast(StringType).as('ps), 'p2) input .write .format(dataSourceName) .mode(SaveMode.Overwrite) .partitionBy("ps", "p2") .saveAsTable("t") input .write .format(dataSourceName) .mode(SaveMode.Append) .partitionBy("ps", "p2") .saveAsTable("t") val realData = input.collect() withTempTable("t") { checkAnswer(sqlContext.table("t"), realData ++ realData) } } java.lang.ClassCastException: java.lang.Integer cannot be cast to org.apache.spark.unsafe.types.UTF8String at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getUTF8String(rows.scala:45) at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getUTF8String(rows.scala:220) at org.apache.spark.sql.catalyst.expressions.JoinedRow.getUTF8String(JoinedRow.scala:102) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(generated.java:62) at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$17$$anonfun$apply$9.apply(DataSourceStrategy.scala:212) at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$17$$anonfun$apply$9.apply(DataSourceStrategy.scala:212) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:903) at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:903) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1846) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1846) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 07:44:01.344 ERROR org.apache.spark.executor.Executor: Exception in task 14.0 in stage 3.0 (TID 206) java.lang.ClassCastException: java.lang.Integer cannot be cast to org.apache.spark.unsafe.types.UTF8String at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getUTF8String(rows.scala:45) at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getUTF8String(rows.scala:220) at org.apache.spark.sql.catalyst.expressions.JoinedRow.getUTF8String(JoinedRow.scala:102) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(generated.java:62) at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$17$$anonfun$apply$9.apply(DataSourceStrategy.scala:212) at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$17$$anonfun$apply$9.apply(DataSourceStrategy.scala:212) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
[jira] [Commented] (SPARK-4226) SparkSQL - Add support for subqueries in predicates
[ https://issues.apache.org/jira/browse/SPARK-4226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14958524#comment-14958524 ] Cheng Hao commented on SPARK-4226: -- [~nadenf] Actually I am working on it right now, and the first PR is ready, it will be great appreciated if you can try https://github.com/apache/spark/pull/9055 in your local testing, let me know if there any problem or bug you found. > SparkSQL - Add support for subqueries in predicates > --- > > Key: SPARK-4226 > URL: https://issues.apache.org/jira/browse/SPARK-4226 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 1.2.0 > Environment: Spark 1.2 snapshot >Reporter: Terry Siu > > I have a test table defined in Hive as follows: > {code:sql} > CREATE TABLE sparkbug ( > id INT, > event STRING > ) STORED AS PARQUET; > {code} > and insert some sample data with ids 1, 2, 3. > In a Spark shell, I then create a HiveContext and then execute the following > HQL to test out subquery predicates: > {code} > val hc = HiveContext(hc) > hc.hql("select customerid from sparkbug where customerid in (select > customerid from sparkbug where customerid in (2,3))") > {code} > I get the following error: > {noformat} > java.lang.RuntimeException: Unsupported language features in query: select > customerid from sparkbug where customerid in (select customerid from sparkbug > where customerid in (2,3)) > TOK_QUERY > TOK_FROM > TOK_TABREF > TOK_TABNAME > sparkbug > TOK_INSERT > TOK_DESTINATION > TOK_DIR > TOK_TMP_FILE > TOK_SELECT > TOK_SELEXPR > TOK_TABLE_OR_COL > customerid > TOK_WHERE > TOK_SUBQUERY_EXPR > TOK_SUBQUERY_OP > in > TOK_QUERY > TOK_FROM > TOK_TABREF > TOK_TABNAME > sparkbug > TOK_INSERT > TOK_DESTINATION > TOK_DIR > TOK_TMP_FILE > TOK_SELECT > TOK_SELEXPR > TOK_TABLE_OR_COL > customerid > TOK_WHERE > TOK_FUNCTION > in > TOK_TABLE_OR_COL > customerid > 2 > 3 > TOK_TABLE_OR_COL > customerid > scala.NotImplementedError: No parse rules for ASTNode type: 817, text: > TOK_SUBQUERY_EXPR : > TOK_SUBQUERY_EXPR > TOK_SUBQUERY_OP > in > TOK_QUERY > TOK_FROM > TOK_TABREF > TOK_TABNAME > sparkbug > TOK_INSERT > TOK_DESTINATION > TOK_DIR > TOK_TMP_FILE > TOK_SELECT > TOK_SELEXPR > TOK_TABLE_OR_COL > customerid > TOK_WHERE > TOK_FUNCTION > in > TOK_TABLE_OR_COL > customerid > 2 > 3 > TOK_TABLE_OR_COL > customerid > " + > > org.apache.spark.sql.hive.HiveQl$.nodeToExpr(HiveQl.scala:1098) > > at scala.sys.package$.error(package.scala:27) > at org.apache.spark.sql.hive.HiveQl$.createPlan(HiveQl.scala:252) > at > org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:50) > at > org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:49) > at > scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136) > {noformat} > [This > thread|http://apache-spark-user-list.1001560.n3.nabble.com/Subquery-in-having-clause-Spark-1-1-0-td17401.html] > also brings up lack of subquery support in SparkSQL. It would be nice to > have subquery predicate support in a near, future release (1.3, maybe?). -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-11076) Decimal Support for Ceil/Floor
Cheng Hao created SPARK-11076: - Summary: Decimal Support for Ceil/Floor Key: SPARK-11076 URL: https://issues.apache.org/jira/browse/SPARK-11076 Project: Spark Issue Type: Improvement Components: SQL Reporter: Cheng Hao Currently, Ceil & Floor doesn't support decimal, but Hive does. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Closed] (SPARK-11041) Add (NOT) IN / EXISTS support for predicates
[ https://issues.apache.org/jira/browse/SPARK-11041?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Hao closed SPARK-11041. - Resolution: Duplicate > Add (NOT) IN / EXISTS support for predicates > > > Key: SPARK-11041 > URL: https://issues.apache.org/jira/browse/SPARK-11041 > Project: Spark > Issue Type: Improvement > Components: SQL >Reporter: Cheng Hao > -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-11041) Add (NOT) IN / EXISTS support for predicates
Cheng Hao created SPARK-11041: - Summary: Add (NOT) IN / EXISTS support for predicates Key: SPARK-11041 URL: https://issues.apache.org/jira/browse/SPARK-11041 Project: Spark Issue Type: Improvement Components: SQL Reporter: Cheng Hao -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-10992) Partial Aggregation Support for Hive UDAF
Cheng Hao created SPARK-10992: - Summary: Partial Aggregation Support for Hive UDAF Key: SPARK-10992 URL: https://issues.apache.org/jira/browse/SPARK-10992 Project: Spark Issue Type: Improvement Components: SQL Reporter: Cheng Hao -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-10992) Partial Aggregation Support for Hive UDAF
[ https://issues.apache.org/jira/browse/SPARK-10992?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Hao updated SPARK-10992: -- Issue Type: Sub-task (was: Improvement) Parent: SPARK-4366 > Partial Aggregation Support for Hive UDAF > - > > Key: SPARK-10992 > URL: https://issues.apache.org/jira/browse/SPARK-10992 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Cheng Hao > -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-10831) Spark SQL Configuration missing in the doc
Cheng Hao created SPARK-10831: - Summary: Spark SQL Configuration missing in the doc Key: SPARK-10831 URL: https://issues.apache.org/jira/browse/SPARK-10831 Project: Spark Issue Type: Documentation Components: SQL Reporter: Cheng Hao E.g. spark.sql.codegen spark.sql.planner.sortMergeJoin spark.sql.dialect spark.sql.caseSensitive -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-10829) Scan DataSource with predicate expression combine partition key and attributes doesn't work
Cheng Hao created SPARK-10829: - Summary: Scan DataSource with predicate expression combine partition key and attributes doesn't work Key: SPARK-10829 URL: https://issues.apache.org/jira/browse/SPARK-10829 Project: Spark Issue Type: Bug Components: SQL Reporter: Cheng Hao Priority: Blocker To reproduce that with the code: {code} withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") { withTempPath { dir => val path = s"${dir.getCanonicalPath}/part=1" (1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(path) // If the "part = 1" filter gets pushed down, this query will throw an exception since // "part" is not a valid column in the actual Parquet file checkAnswer( sqlContext.read.parquet(path).filter("a > 0 and (part = 0 or a > 1)"), (2 to 3).map(i => Row(i, i.toString, 1))) } } {code} We expect the result as: {code} 2, 1 3, 1 {code} But we got: {code} 1, 1 2, 1 3, 1 {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-10733) TungstenAggregation cannot acquire page after switching to sort-based
[ https://issues.apache.org/jira/browse/SPARK-10733?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14904778#comment-14904778 ] Cheng Hao commented on SPARK-10733: --- [~jameszhouyi] Can you please patch the https://github.com/chenghao-intel/spark/commit/91af33397100802d6ba577a3f423bb47d5a761ea and try your workload? And be sure set the log level to `INFO`. [~andrewor14] [~yhuai] One possibility is Sort-Merge-Join eat out all of the memory, as Sort-Merge-Join will not free the memory until we finish iterating all join result, however, partial aggregation will actually accept the iterator the join result, which means possible no memory at all for aggregation. > TungstenAggregation cannot acquire page after switching to sort-based > - > > Key: SPARK-10733 > URL: https://issues.apache.org/jira/browse/SPARK-10733 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.5.0 >Reporter: Andrew Or >Assignee: Andrew Or >Priority: Blocker > > This is uncovered after fixing SPARK-10474. Stack trace: > {code} > 15/09/21 12:51:46 WARN scheduler.TaskSetManager: Lost task 115.0 in stage > 152.0 (TID 1736, bb-node2): java.io.IOException: Unable to acquire 16777216 > bytes of memory > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:378) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:359) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertKVRecord(UnsafeExternalSorter.java:488) > at > org.apache.spark.sql.execution.UnsafeKVExternalSorter.insertKV(UnsafeKVExternalSorter.java:144) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.switchToSortBasedAggregation(TungstenAggregationIterator.scala:465) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:379) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.start(TungstenAggregationIterator.scala:622) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.org$apache$spark$sql$execution$aggregate$TungstenAggregate$$anonfun$$executePartition$1(TungstenAggregate.scala:110) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119) > at > org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:64) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) > at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) > at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) > at org.apache.spark.scheduler.Task.run(Task.scala:88) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-10474) Aggregation failed with unable to acquire memory
[ https://issues.apache.org/jira/browse/SPARK-10474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14802912#comment-14802912 ] Cheng Hao edited comment on SPARK-10474 at 9/17/15 1:48 PM: The root reason for this failure, is the trigger condition from hash-based aggregation to sort-based aggregation in the `TungstenAggregationIterator`, current code logic is if no more memory to can be allocated, then turn to sort-based aggregation, however, since no memory left, the data spill will also failed in UnsafeExternalSorter.initializeWriting. I post a workaround solution PR for discussion. was (Author: chenghao): The root reason for this failure, is because of the `TungstenAggregationIterator.switchToSortBasedAggregation`, as it's eat out memory by HashAggregation, and then, we cannot allocate memory when turn the sort-based aggregation even in the spilling time. I post a workaround solution PR for discussion. > Aggregation failed with unable to acquire memory > > > Key: SPARK-10474 > URL: https://issues.apache.org/jira/browse/SPARK-10474 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.5.0 >Reporter: Yi Zhou >Priority: Blocker > > In aggregation case, a Lost task happened with below error. > {code} > java.io.IOException: Could not acquire 65536 bytes of memory > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.initializeForWriting(UnsafeExternalSorter.java:169) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:220) > at > org.apache.spark.sql.execution.UnsafeKVExternalSorter.(UnsafeKVExternalSorter.java:126) > at > org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap.destructAndCreateExternalSorter(UnsafeFixedWidthAggregationMap.java:257) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.switchToSortBasedAggregation(TungstenAggregationIterator.scala:435) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:379) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.start(TungstenAggregationIterator.scala:622) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.org$apache$spark$sql$execution$aggregate$TungstenAggregate$$anonfun$$executePartition$1(TungstenAggregate.scala:110) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119) > at > org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:64) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) > at org.apache.spark.scheduler.Task.run(Task.scala:88) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > {code} > Key SQL Query > {code:sql} > INSERT INTO TABLE test_table > SELECT > ss.ss_customer_sk AS cid, > count(CASE WHEN i.i_class_id=1 THEN 1 ELSE NULL END) AS id1, > count(CASE WHEN i.i_class_id=3 THEN 1 ELSE NULL END) AS id3, > count(CASE WHEN i.i_class_id=5 THEN 1 ELSE NULL END) AS id5, > count(CASE WHEN i.i_class_id=7 THEN 1 ELSE NULL END) AS id7, > count(CASE WHEN i.i_class_id=9 THEN 1 ELSE NULL END) AS id9, > count(CASE WHEN i.i_class_id=11 THEN 1 ELSE NULL END) AS id11, > count(CASE WHEN i.i_class_id=13 THEN 1 ELSE NULL END) AS id13, > count(CASE WHEN i.i_class_id=15 THEN 1 ELSE NULL END) AS id15, > count(CASE WHEN i.i_class_id=2 THEN 1 ELSE NULL END) AS id2, > count(CASE WHEN i.i_class_id=4 THEN 1 ELSE NULL END) AS id4, > count(CASE WHEN i.i_class_id=6 THEN 1 ELSE NULL END) AS id6, > count(CASE WHEN i.i_class_id=8 THEN 1 ELSE NULL END) AS id8, > count(CASE WHEN i.i_class_id=10 THEN 1
[jira] [Commented] (SPARK-10474) Aggregation failed with unable to acquire memory
[ https://issues.apache.org/jira/browse/SPARK-10474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14802912#comment-14802912 ] Cheng Hao commented on SPARK-10474: --- The root reason for this failure, is because of the `TungstenAggregationIterator.switchToSortBasedAggregation`, as it's eat out memory by HashAggregation, and then, we cannot allocate memory when turn the sort-based aggregation even in the spilling time. I post a workaround solution PR for discussion. > Aggregation failed with unable to acquire memory > > > Key: SPARK-10474 > URL: https://issues.apache.org/jira/browse/SPARK-10474 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.5.0 >Reporter: Yi Zhou >Priority: Blocker > > In aggregation case, a Lost task happened with below error. > {code} > java.io.IOException: Could not acquire 65536 bytes of memory > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.initializeForWriting(UnsafeExternalSorter.java:169) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:220) > at > org.apache.spark.sql.execution.UnsafeKVExternalSorter.(UnsafeKVExternalSorter.java:126) > at > org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap.destructAndCreateExternalSorter(UnsafeFixedWidthAggregationMap.java:257) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.switchToSortBasedAggregation(TungstenAggregationIterator.scala:435) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:379) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.start(TungstenAggregationIterator.scala:622) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.org$apache$spark$sql$execution$aggregate$TungstenAggregate$$anonfun$$executePartition$1(TungstenAggregate.scala:110) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119) > at > org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:64) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) > at org.apache.spark.scheduler.Task.run(Task.scala:88) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > {code} > Key SQL Query > {code:sql} > INSERT INTO TABLE test_table > SELECT > ss.ss_customer_sk AS cid, > count(CASE WHEN i.i_class_id=1 THEN 1 ELSE NULL END) AS id1, > count(CASE WHEN i.i_class_id=3 THEN 1 ELSE NULL END) AS id3, > count(CASE WHEN i.i_class_id=5 THEN 1 ELSE NULL END) AS id5, > count(CASE WHEN i.i_class_id=7 THEN 1 ELSE NULL END) AS id7, > count(CASE WHEN i.i_class_id=9 THEN 1 ELSE NULL END) AS id9, > count(CASE WHEN i.i_class_id=11 THEN 1 ELSE NULL END) AS id11, > count(CASE WHEN i.i_class_id=13 THEN 1 ELSE NULL END) AS id13, > count(CASE WHEN i.i_class_id=15 THEN 1 ELSE NULL END) AS id15, > count(CASE WHEN i.i_class_id=2 THEN 1 ELSE NULL END) AS id2, > count(CASE WHEN i.i_class_id=4 THEN 1 ELSE NULL END) AS id4, > count(CASE WHEN i.i_class_id=6 THEN 1 ELSE NULL END) AS id6, > count(CASE WHEN i.i_class_id=8 THEN 1 ELSE NULL END) AS id8, > count(CASE WHEN i.i_class_id=10 THEN 1 ELSE NULL END) AS id10, > count(CASE WHEN i.i_class_id=14 THEN 1 ELSE NULL END) AS id14, > count(CASE WHEN i.i_class_id=16 THEN 1 ELSE NULL END) AS id16 > FROM store_sales ss > INNER JOIN item i ON ss.ss_item_sk = i.i_item_sk > WHERE i.i_category IN ('Books') > AND ss.ss_customer_sk IS NOT NULL > GROUP BY ss.ss_customer_sk > HAVING count(ss.ss_item_sk) > 5 > {code} > Note: > the store_sales is a big fact table and item is a small dimension table. -- This message was sent by
[jira] [Commented] (SPARK-10606) Cube/Rollup/GrpSet doesn't create the correct plan when group by is on something other than an AttributeReference
[ https://issues.apache.org/jira/browse/SPARK-10606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14791499#comment-14791499 ] Cheng Hao commented on SPARK-10606: --- [~rhbutani] Which version are you using, actually I've fixed the bug at SPARK-8972, it should be included in 1.5. Can you try that with 1.5? > Cube/Rollup/GrpSet doesn't create the correct plan when group by is on > something other than an AttributeReference > - > > Key: SPARK-10606 > URL: https://issues.apache.org/jira/browse/SPARK-10606 > Project: Spark > Issue Type: Bug > Components: SQL >Reporter: Harish Butani >Priority: Critical > > Consider the following table: t(a : String, b : String) and the query > {code} > select a, concat(b, '1'), count(*) > from t > group by a, concat(b, '1') with cube > {code} > The projections in the Expand operator are not setup correctly. The expand > logic in Analyzer:expand is comparing grouping expressions against > child.output. So {{concat(b, '1')}} is never mapped to a null Literal. > A simple fix is to add a Rule to introduce a Projection below the > Cube/Rollup/GrpSet operator that additionally projects the > groupingExpressions that are missing in the child. > Marking this as Critical, because you get wrong results. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-4226) SparkSQL - Add support for subqueries in predicates
[ https://issues.apache.org/jira/browse/SPARK-4226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14746642#comment-14746642 ] Cheng Hao commented on SPARK-4226: -- Thank you [~brooks], you're right! I meant it will makes more complicated in the implementation, e.g. to resolved and split the conjunction for the condition, that's also what I was trying to avoid in my PR by using the anti-join. > SparkSQL - Add support for subqueries in predicates > --- > > Key: SPARK-4226 > URL: https://issues.apache.org/jira/browse/SPARK-4226 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 1.2.0 > Environment: Spark 1.2 snapshot >Reporter: Terry Siu > > I have a test table defined in Hive as follows: > {code:sql} > CREATE TABLE sparkbug ( > id INT, > event STRING > ) STORED AS PARQUET; > {code} > and insert some sample data with ids 1, 2, 3. > In a Spark shell, I then create a HiveContext and then execute the following > HQL to test out subquery predicates: > {code} > val hc = HiveContext(hc) > hc.hql("select customerid from sparkbug where customerid in (select > customerid from sparkbug where customerid in (2,3))") > {code} > I get the following error: > {noformat} > java.lang.RuntimeException: Unsupported language features in query: select > customerid from sparkbug where customerid in (select customerid from sparkbug > where customerid in (2,3)) > TOK_QUERY > TOK_FROM > TOK_TABREF > TOK_TABNAME > sparkbug > TOK_INSERT > TOK_DESTINATION > TOK_DIR > TOK_TMP_FILE > TOK_SELECT > TOK_SELEXPR > TOK_TABLE_OR_COL > customerid > TOK_WHERE > TOK_SUBQUERY_EXPR > TOK_SUBQUERY_OP > in > TOK_QUERY > TOK_FROM > TOK_TABREF > TOK_TABNAME > sparkbug > TOK_INSERT > TOK_DESTINATION > TOK_DIR > TOK_TMP_FILE > TOK_SELECT > TOK_SELEXPR > TOK_TABLE_OR_COL > customerid > TOK_WHERE > TOK_FUNCTION > in > TOK_TABLE_OR_COL > customerid > 2 > 3 > TOK_TABLE_OR_COL > customerid > scala.NotImplementedError: No parse rules for ASTNode type: 817, text: > TOK_SUBQUERY_EXPR : > TOK_SUBQUERY_EXPR > TOK_SUBQUERY_OP > in > TOK_QUERY > TOK_FROM > TOK_TABREF > TOK_TABNAME > sparkbug > TOK_INSERT > TOK_DESTINATION > TOK_DIR > TOK_TMP_FILE > TOK_SELECT > TOK_SELEXPR > TOK_TABLE_OR_COL > customerid > TOK_WHERE > TOK_FUNCTION > in > TOK_TABLE_OR_COL > customerid > 2 > 3 > TOK_TABLE_OR_COL > customerid > " + > > org.apache.spark.sql.hive.HiveQl$.nodeToExpr(HiveQl.scala:1098) > > at scala.sys.package$.error(package.scala:27) > at org.apache.spark.sql.hive.HiveQl$.createPlan(HiveQl.scala:252) > at > org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:50) > at > org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:49) > at > scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136) > {noformat} > [This > thread|http://apache-spark-user-list.1001560.n3.nabble.com/Subquery-in-having-clause-Spark-1-1-0-td17401.html] > also brings up lack of subquery support in SparkSQL. It would be nice to > have subquery predicate support in a near, future release (1.3, maybe?). -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-4226) SparkSQL - Add support for subqueries in predicates
[ https://issues.apache.org/jira/browse/SPARK-4226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14745467#comment-14745467 ] Cheng Hao commented on SPARK-4226: -- [~marmbrus] [~yhuai] After investigating a little bit, I think using anti-join is much more efficient than rewriting the NOT IN / NOT EXISTS with left outer join followed by null filtering. As the anti-join will return negative once it's found the first matched from the second relation, however the left outer join will go thru every match pairs and then do filtering. Besides, for the NOT EXISTS clause, without the anti-join, seems more complicated in implementation. For example: {code} mysql> select * from d1; +--+--+ | a| b| +--+--+ |2 |2 | |8 | 10 | +--+--+ 2 rows in set (0.00 sec) mysql> select * from d2; +--+--+ | a| b| +--+--+ |1 |1 | |8 | NULL | |0 |0 | +--+--+ 3 rows in set (0.00 sec) mysql> select * from d1 where not exists (select b from d2 where d1.a=d2.a); +--+--+ | a| b| +--+--+ |2 |2 | +--+--+ 1 row in set (0.00 sec) // If we rewrite the above query in left outer join, the filter condition cannot simply be the subquery project list. mysql> select d1.a, d1.b from d1 left join d2 on d1.a=d2.a where d2.b is null; +--+--+ | a| b| +--+--+ |8 | 10 | |2 |2 | +--+--+ 2 rows in set (0.00 sec) // get difference result with NOT EXISTS. {code} If you feel that make sense, I can reopen my PR and do the rebasing. > SparkSQL - Add support for subqueries in predicates > --- > > Key: SPARK-4226 > URL: https://issues.apache.org/jira/browse/SPARK-4226 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 1.2.0 > Environment: Spark 1.2 snapshot >Reporter: Terry Siu > > I have a test table defined in Hive as follows: > {code:sql} > CREATE TABLE sparkbug ( > id INT, > event STRING > ) STORED AS PARQUET; > {code} > and insert some sample data with ids 1, 2, 3. > In a Spark shell, I then create a HiveContext and then execute the following > HQL to test out subquery predicates: > {code} > val hc = HiveContext(hc) > hc.hql("select customerid from sparkbug where customerid in (select > customerid from sparkbug where customerid in (2,3))") > {code} > I get the following error: > {noformat} > java.lang.RuntimeException: Unsupported language features in query: select > customerid from sparkbug where customerid in (select customerid from sparkbug > where customerid in (2,3)) > TOK_QUERY > TOK_FROM > TOK_TABREF > TOK_TABNAME > sparkbug > TOK_INSERT > TOK_DESTINATION > TOK_DIR > TOK_TMP_FILE > TOK_SELECT > TOK_SELEXPR > TOK_TABLE_OR_COL > customerid > TOK_WHERE > TOK_SUBQUERY_EXPR > TOK_SUBQUERY_OP > in > TOK_QUERY > TOK_FROM > TOK_TABREF > TOK_TABNAME > sparkbug > TOK_INSERT > TOK_DESTINATION > TOK_DIR > TOK_TMP_FILE > TOK_SELECT > TOK_SELEXPR > TOK_TABLE_OR_COL > customerid > TOK_WHERE > TOK_FUNCTION > in > TOK_TABLE_OR_COL > customerid > 2 > 3 > TOK_TABLE_OR_COL > customerid > scala.NotImplementedError: No parse rules for ASTNode type: 817, text: > TOK_SUBQUERY_EXPR : > TOK_SUBQUERY_EXPR > TOK_SUBQUERY_OP > in > TOK_QUERY > TOK_FROM > TOK_TABREF > TOK_TABNAME > sparkbug > TOK_INSERT > TOK_DESTINATION > TOK_DIR > TOK_TMP_FILE > TOK_SELECT > TOK_SELEXPR > TOK_TABLE_OR_COL > customerid > TOK_WHERE > TOK_FUNCTION > in > TOK_TABLE_OR_COL > customerid > 2 > 3 > TOK_TABLE_OR_COL > customerid > " + > > org.apache.spark.sql.hive.HiveQl$.nodeToExpr(HiveQl.scala:1098) > > at scala.sys.package$.error(package.scala:27) > at org.apache.spark.sql.hive.HiveQl$.createPlan(HiveQl.scala:252) > at > org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:50) > at > org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:49) > at > scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136) > {noformat} > [This > thread|http://apache-spark-user-list.1001560.n3.nabble.com/Subquery-in-having-clause-Spark-1-1-0-td17401.html] > also brings
[jira] [Commented] (SPARK-10474) Aggregation failed with unable to acquire memory
[ https://issues.apache.org/jira/browse/SPARK-10474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14745008#comment-14745008 ] Cheng Hao commented on SPARK-10474: --- But from the current implementation, we'd better not to throw exception if acquired memory(offheap) is not satisfied, maybe we'd better use fixed memory allocations for both data page and the pointer array, what do you think? > Aggregation failed with unable to acquire memory > > > Key: SPARK-10474 > URL: https://issues.apache.org/jira/browse/SPARK-10474 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.5.0 >Reporter: Yi Zhou >Priority: Blocker > > In aggregation case, a Lost task happened with below error. > {code} > java.io.IOException: Could not acquire 65536 bytes of memory > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.initializeForWriting(UnsafeExternalSorter.java:169) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:220) > at > org.apache.spark.sql.execution.UnsafeKVExternalSorter.(UnsafeKVExternalSorter.java:126) > at > org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap.destructAndCreateExternalSorter(UnsafeFixedWidthAggregationMap.java:257) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.switchToSortBasedAggregation(TungstenAggregationIterator.scala:435) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:379) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.start(TungstenAggregationIterator.scala:622) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.org$apache$spark$sql$execution$aggregate$TungstenAggregate$$anonfun$$executePartition$1(TungstenAggregate.scala:110) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119) > at > org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:64) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) > at org.apache.spark.scheduler.Task.run(Task.scala:88) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > {code} > Key SQL Query > {code:sql} > INSERT INTO TABLE test_table > SELECT > ss.ss_customer_sk AS cid, > count(CASE WHEN i.i_class_id=1 THEN 1 ELSE NULL END) AS id1, > count(CASE WHEN i.i_class_id=3 THEN 1 ELSE NULL END) AS id3, > count(CASE WHEN i.i_class_id=5 THEN 1 ELSE NULL END) AS id5, > count(CASE WHEN i.i_class_id=7 THEN 1 ELSE NULL END) AS id7, > count(CASE WHEN i.i_class_id=9 THEN 1 ELSE NULL END) AS id9, > count(CASE WHEN i.i_class_id=11 THEN 1 ELSE NULL END) AS id11, > count(CASE WHEN i.i_class_id=13 THEN 1 ELSE NULL END) AS id13, > count(CASE WHEN i.i_class_id=15 THEN 1 ELSE NULL END) AS id15, > count(CASE WHEN i.i_class_id=2 THEN 1 ELSE NULL END) AS id2, > count(CASE WHEN i.i_class_id=4 THEN 1 ELSE NULL END) AS id4, > count(CASE WHEN i.i_class_id=6 THEN 1 ELSE NULL END) AS id6, > count(CASE WHEN i.i_class_id=8 THEN 1 ELSE NULL END) AS id8, > count(CASE WHEN i.i_class_id=10 THEN 1 ELSE NULL END) AS id10, > count(CASE WHEN i.i_class_id=14 THEN 1 ELSE NULL END) AS id14, > count(CASE WHEN i.i_class_id=16 THEN 1 ELSE NULL END) AS id16 > FROM store_sales ss > INNER JOIN item i ON ss.ss_item_sk = i.i_item_sk > WHERE i.i_category IN ('Books') > AND ss.ss_customer_sk IS NOT NULL > GROUP BY ss.ss_customer_sk > HAVING count(ss.ss_item_sk) > 5 > {code} > Note: > the store_sales is a big fact table and item is a small dimension table. -- This message was sent by Atlassian JIRA (v6.3.4#6332) ---
[jira] [Commented] (SPARK-10474) Aggregation failed with unable to acquire memory
[ https://issues.apache.org/jira/browse/SPARK-10474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14744969#comment-14744969 ] Cheng Hao commented on SPARK-10474: --- The root causes for the exception is the executor don't have enough memory for external sorting(UnsafeXXXSorter), The memory used for the sorting is MAX_JVM_HEAP * spark.shuffle.memoryFraction * spark.shuffle.safetyFraction. So a workaround is to set a bigger memory for jvm, or the spark conf keys "spark.shuffle.memoryFraction"(0.2 by default) and "spark.shuffle.safetyFraction"(0.8 by default). > Aggregation failed with unable to acquire memory > > > Key: SPARK-10474 > URL: https://issues.apache.org/jira/browse/SPARK-10474 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.5.0 >Reporter: Yi Zhou >Priority: Blocker > > In aggregation case, a Lost task happened with below error. > {code} > java.io.IOException: Could not acquire 65536 bytes of memory > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.initializeForWriting(UnsafeExternalSorter.java:169) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:220) > at > org.apache.spark.sql.execution.UnsafeKVExternalSorter.(UnsafeKVExternalSorter.java:126) > at > org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap.destructAndCreateExternalSorter(UnsafeFixedWidthAggregationMap.java:257) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.switchToSortBasedAggregation(TungstenAggregationIterator.scala:435) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:379) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.start(TungstenAggregationIterator.scala:622) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.org$apache$spark$sql$execution$aggregate$TungstenAggregate$$anonfun$$executePartition$1(TungstenAggregate.scala:110) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119) > at > org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:64) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) > at org.apache.spark.scheduler.Task.run(Task.scala:88) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > {code} > Key SQL Query > {code:sql} > INSERT INTO TABLE test_table > SELECT > ss.ss_customer_sk AS cid, > count(CASE WHEN i.i_class_id=1 THEN 1 ELSE NULL END) AS id1, > count(CASE WHEN i.i_class_id=3 THEN 1 ELSE NULL END) AS id3, > count(CASE WHEN i.i_class_id=5 THEN 1 ELSE NULL END) AS id5, > count(CASE WHEN i.i_class_id=7 THEN 1 ELSE NULL END) AS id7, > count(CASE WHEN i.i_class_id=9 THEN 1 ELSE NULL END) AS id9, > count(CASE WHEN i.i_class_id=11 THEN 1 ELSE NULL END) AS id11, > count(CASE WHEN i.i_class_id=13 THEN 1 ELSE NULL END) AS id13, > count(CASE WHEN i.i_class_id=15 THEN 1 ELSE NULL END) AS id15, > count(CASE WHEN i.i_class_id=2 THEN 1 ELSE NULL END) AS id2, > count(CASE WHEN i.i_class_id=4 THEN 1 ELSE NULL END) AS id4, > count(CASE WHEN i.i_class_id=6 THEN 1 ELSE NULL END) AS id6, > count(CASE WHEN i.i_class_id=8 THEN 1 ELSE NULL END) AS id8, > count(CASE WHEN i.i_class_id=10 THEN 1 ELSE NULL END) AS id10, > count(CASE WHEN i.i_class_id=14 THEN 1 ELSE NULL END) AS id14, > count(CASE WHEN i.i_class_id=16 THEN 1 ELSE NULL END) AS id16 > FROM store_sales ss > INNER JOIN item i ON ss.ss_item_sk = i.i_item_sk > WHERE i.i_category IN ('Books') > AND ss.ss_customer_sk IS NOT NULL > GROUP BY ss.ss_customer_sk > HAVING count(ss.ss_item_sk) > 5 > {code} > Note: > the s
[jira] [Issue Comment Deleted] (SPARK-10466) UnsafeRow exception in Sort-Based Shuffle with data spill
[ https://issues.apache.org/jira/browse/SPARK-10466?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Hao updated SPARK-10466: -- Comment: was deleted (was: [~naliazheli] It's an irrelevant issue, you'd better to subscribe the spark mail list and then ask question in English. See(http://spark.apache.org/community.html)) > UnsafeRow exception in Sort-Based Shuffle with data spill > -- > > Key: SPARK-10466 > URL: https://issues.apache.org/jira/browse/SPARK-10466 > Project: Spark > Issue Type: Bug > Components: SQL >Reporter: Cheng Hao >Assignee: Cheng Hao >Priority: Blocker > Fix For: 1.6.0, 1.5.1 > > > In sort-based shuffle, if we have data spill, it will cause assert exception, > the follow code can reproduce that > {code} > withSparkConf(("spark.shuffle.sort.bypassMergeThreshold", "2")) { > withSQLConf(("spark.sql.autoBroadcastJoinThreshold", "0")) { > withTempTable("mytemp") { > sparkContext.parallelize(1 to 1000, 3).map(i => (i, > i)).toDF("key", "value").registerTempTable("mytemp") > sql("select key, value as v1 from mytemp where key > > 1").registerTempTable("l") > sql("select key, value as v2 from mytemp where key > > 3").registerTempTable("r") > val df3 = sql("select v1, v2 from l left join r on l.key=r.key") > df3.count() > } > } > } > {code} > {code} > java.lang.AssertionError: assertion failed > at scala.Predef$.assert(Predef.scala:165) > at > org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2.writeKey(UnsafeRowSerializer.scala:75) > at > org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:180) > at > org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2$$anonfun$apply$1.apply(ExternalSorter.scala:688) > at > org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2$$anonfun$apply$1.apply(ExternalSorter.scala:687) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at > org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2.apply(ExternalSorter.scala:687) > at > org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2.apply(ExternalSorter.scala:683) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at > org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:683) > at > org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:80) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) > at org.apache.spark.scheduler.Task.run(Task.scala:88) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) > at java.lang.Thread.run(Thread.java:722) > 17:32:06.172 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 1.0 in > stage 1.0 (TID 4, localhost): java.lang.AssertionError: assertion failed > at scala.Predef$.assert(Predef.scala:165) > at > org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2.writeKey(UnsafeRowSerializer.scala:75) > at > org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:180) > at > org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2$$anonfun$apply$1.apply(ExternalSorter.scala:688) > at > org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2$$anonfun$apply$1.apply(ExternalSorter.scala:687) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at > org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2.apply(ExternalSorter.scala:687) > at > org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2.apply(ExternalSorter.scala:683) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at > org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:683) > at > org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:80) > at > org.apache.spa
[jira] [Commented] (SPARK-10466) UnsafeRow exception in Sort-Based Shuffle with data spill
[ https://issues.apache.org/jira/browse/SPARK-10466?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14744967#comment-14744967 ] Cheng Hao commented on SPARK-10466: --- [~naliazheli] It's an irrelevant issue, you'd better to subscribe the spark mail list and then ask question in English. See(http://spark.apache.org/community.html) > UnsafeRow exception in Sort-Based Shuffle with data spill > -- > > Key: SPARK-10466 > URL: https://issues.apache.org/jira/browse/SPARK-10466 > Project: Spark > Issue Type: Bug > Components: SQL >Reporter: Cheng Hao >Assignee: Cheng Hao >Priority: Blocker > Fix For: 1.6.0, 1.5.1 > > > In sort-based shuffle, if we have data spill, it will cause assert exception, > the follow code can reproduce that > {code} > withSparkConf(("spark.shuffle.sort.bypassMergeThreshold", "2")) { > withSQLConf(("spark.sql.autoBroadcastJoinThreshold", "0")) { > withTempTable("mytemp") { > sparkContext.parallelize(1 to 1000, 3).map(i => (i, > i)).toDF("key", "value").registerTempTable("mytemp") > sql("select key, value as v1 from mytemp where key > > 1").registerTempTable("l") > sql("select key, value as v2 from mytemp where key > > 3").registerTempTable("r") > val df3 = sql("select v1, v2 from l left join r on l.key=r.key") > df3.count() > } > } > } > {code} > {code} > java.lang.AssertionError: assertion failed > at scala.Predef$.assert(Predef.scala:165) > at > org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2.writeKey(UnsafeRowSerializer.scala:75) > at > org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:180) > at > org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2$$anonfun$apply$1.apply(ExternalSorter.scala:688) > at > org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2$$anonfun$apply$1.apply(ExternalSorter.scala:687) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at > org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2.apply(ExternalSorter.scala:687) > at > org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2.apply(ExternalSorter.scala:683) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at > org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:683) > at > org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:80) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) > at org.apache.spark.scheduler.Task.run(Task.scala:88) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) > at java.lang.Thread.run(Thread.java:722) > 17:32:06.172 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 1.0 in > stage 1.0 (TID 4, localhost): java.lang.AssertionError: assertion failed > at scala.Predef$.assert(Predef.scala:165) > at > org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2.writeKey(UnsafeRowSerializer.scala:75) > at > org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:180) > at > org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2$$anonfun$apply$1.apply(ExternalSorter.scala:688) > at > org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2$$anonfun$apply$1.apply(ExternalSorter.scala:687) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at > org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2.apply(ExternalSorter.scala:687) > at > org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2.apply(ExternalSorter.scala:683) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at > org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:683) > at > org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:80) >
[jira] [Commented] (SPARK-10466) UnsafeRow exception in Sort-Based Shuffle with data spill
[ https://issues.apache.org/jira/browse/SPARK-10466?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14744966#comment-14744966 ] Cheng Hao commented on SPARK-10466: --- [~naliazheli] It's an irrelevant issue, you'd better to subscribe the spark mail list and then ask question in English. See(http://spark.apache.org/community.html) > UnsafeRow exception in Sort-Based Shuffle with data spill > -- > > Key: SPARK-10466 > URL: https://issues.apache.org/jira/browse/SPARK-10466 > Project: Spark > Issue Type: Bug > Components: SQL >Reporter: Cheng Hao >Assignee: Cheng Hao >Priority: Blocker > Fix For: 1.6.0, 1.5.1 > > > In sort-based shuffle, if we have data spill, it will cause assert exception, > the follow code can reproduce that > {code} > withSparkConf(("spark.shuffle.sort.bypassMergeThreshold", "2")) { > withSQLConf(("spark.sql.autoBroadcastJoinThreshold", "0")) { > withTempTable("mytemp") { > sparkContext.parallelize(1 to 1000, 3).map(i => (i, > i)).toDF("key", "value").registerTempTable("mytemp") > sql("select key, value as v1 from mytemp where key > > 1").registerTempTable("l") > sql("select key, value as v2 from mytemp where key > > 3").registerTempTable("r") > val df3 = sql("select v1, v2 from l left join r on l.key=r.key") > df3.count() > } > } > } > {code} > {code} > java.lang.AssertionError: assertion failed > at scala.Predef$.assert(Predef.scala:165) > at > org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2.writeKey(UnsafeRowSerializer.scala:75) > at > org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:180) > at > org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2$$anonfun$apply$1.apply(ExternalSorter.scala:688) > at > org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2$$anonfun$apply$1.apply(ExternalSorter.scala:687) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at > org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2.apply(ExternalSorter.scala:687) > at > org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2.apply(ExternalSorter.scala:683) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at > org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:683) > at > org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:80) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) > at org.apache.spark.scheduler.Task.run(Task.scala:88) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) > at java.lang.Thread.run(Thread.java:722) > 17:32:06.172 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 1.0 in > stage 1.0 (TID 4, localhost): java.lang.AssertionError: assertion failed > at scala.Predef$.assert(Predef.scala:165) > at > org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2.writeKey(UnsafeRowSerializer.scala:75) > at > org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:180) > at > org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2$$anonfun$apply$1.apply(ExternalSorter.scala:688) > at > org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2$$anonfun$apply$1.apply(ExternalSorter.scala:687) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at > org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2.apply(ExternalSorter.scala:687) > at > org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2.apply(ExternalSorter.scala:683) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at > org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:683) > at > org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:80) >
[jira] [Commented] (SPARK-10466) UnsafeRow exception in Sort-Based Shuffle with data spill
[ https://issues.apache.org/jira/browse/SPARK-10466?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14736016#comment-14736016 ] Cheng Hao commented on SPARK-10466: --- Sorry, [~davies], I found the spark conf doens't take effect when applying an existed SparkContext instance, hence it passed the unit test. Actually it will fail if you only run the test. Anyway, I've updated the unit test code in the PR, which will create an new SparkContext instance with the specified Confs. > UnsafeRow exception in Sort-Based Shuffle with data spill > -- > > Key: SPARK-10466 > URL: https://issues.apache.org/jira/browse/SPARK-10466 > Project: Spark > Issue Type: Bug > Components: SQL >Reporter: Cheng Hao >Priority: Blocker > > In sort-based shuffle, if we have data spill, it will cause assert exception, > the follow code can reproduce that > {code} > withSparkConf(("spark.shuffle.sort.bypassMergeThreshold", "2")) { > withSQLConf(("spark.sql.autoBroadcastJoinThreshold", "0")) { > withTempTable("mytemp") { > sparkContext.parallelize(1 to 1000, 3).map(i => (i, > i)).toDF("key", "value").registerTempTable("mytemp") > sql("select key, value as v1 from mytemp where key > > 1").registerTempTable("l") > sql("select key, value as v2 from mytemp where key > > 3").registerTempTable("r") > val df3 = sql("select v1, v2 from l left join r on l.key=r.key") > df3.count() > } > } > } > {code} > {code} > java.lang.AssertionError: assertion failed > at scala.Predef$.assert(Predef.scala:165) > at > org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2.writeKey(UnsafeRowSerializer.scala:75) > at > org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:180) > at > org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2$$anonfun$apply$1.apply(ExternalSorter.scala:688) > at > org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2$$anonfun$apply$1.apply(ExternalSorter.scala:687) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at > org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2.apply(ExternalSorter.scala:687) > at > org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2.apply(ExternalSorter.scala:683) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at > org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:683) > at > org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:80) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) > at org.apache.spark.scheduler.Task.run(Task.scala:88) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) > at java.lang.Thread.run(Thread.java:722) > 17:32:06.172 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 1.0 in > stage 1.0 (TID 4, localhost): java.lang.AssertionError: assertion failed > at scala.Predef$.assert(Predef.scala:165) > at > org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2.writeKey(UnsafeRowSerializer.scala:75) > at > org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:180) > at > org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2$$anonfun$apply$1.apply(ExternalSorter.scala:688) > at > org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2$$anonfun$apply$1.apply(ExternalSorter.scala:687) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at > org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2.apply(ExternalSorter.scala:687) > at > org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2.apply(ExternalSorter.scala:683) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at > org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:683) > at > org.ap
[jira] [Commented] (SPARK-10484) [Spark SQL] Come across lost task(timeout) or GC OOM error when two tables do cross join
[ https://issues.apache.org/jira/browse/SPARK-10484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14734395#comment-14734395 ] Cheng Hao commented on SPARK-10484: --- In cartesian produce implementation, there is 2 level nested loops, and exchanging the order of the join tables, will reduce the outer loop times(with smaller table), but increase the looping times of the inner loop(bigger table), this is actually a manually optimization for the sql query. I created a PR for optimizing the cartesian join by involving the broadcast join. > [Spark SQL] Come across lost task(timeout) or GC OOM error when two tables > do cross join > - > > Key: SPARK-10484 > URL: https://issues.apache.org/jira/browse/SPARK-10484 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.5.0 >Reporter: Yi Zhou >Priority: Critical > > Found that it lost task or GC OOM when below cross join happen. The left big > table is ~1.2G in size and the right small table is ~2.2K. > Key SQL > {code:sql} > SELECT > CONCAT(s_store_sk,"_", s_store_name ) AS store_ID, > pr_review_date, > pr_review_content > FROM product_reviews pr, temp_stores_with_regression stores_with_regression > WHERE locate(lower(stores_with_regression.s_store_name), > lower(pr.pr_review_content), 1) >= 1 ; > {code} > Physical Plan > {code:sql} > TungstenProject [concat(cast(s_store_sk#456L as string),_,s_store_name#457) > AS store_ID#446,pr_review_date#449,pr_review_content#455] > Filter (locate(lower(s_store_name#457),lower(pr_review_content#455),1) >= 1) > CartesianProduct >HiveTableScan [pr_review_date#449,pr_review_content#455], > (MetastoreRelation bigbench, product_reviews, Some(pr)) >HiveTableScan [s_store_sk#456L,s_store_name#457], (MetastoreRelation > bigbench, temp_stores_with_regression, Some(stores_with_regression)) > Code Generation: true > {code} > We also found a strange behavior that exchanging the two table in 'From' > clause can pass. > Key SQL > {code:sql} > SELECT > CONCAT(s_store_sk,"_", s_store_name ) AS store_ID, > pr_review_date, > pr_review_content > FROM temp_stores_with_regression stores_with_regression, product_reviews pr > WHERE locate(lower(stores_with_regression.s_store_name), > lower(pr.pr_review_content), 1) >= 1 ; > {code} > Physical Plan > {code:sql} > TungstenProject [concat(cast(s_store_sk#448L as string),_,s_store_name#449) > AS store_ID#446,pr_review_date#451,pr_review_content#457] > Filter (locate(lower(s_store_name#449),lower(pr_review_content#457),1) >= 1) > CartesianProduct >HiveTableScan [s_store_sk#448L,s_store_name#449], (MetastoreRelation > bigbench, temp_stores_with_regression, Some(stores_with_regression)) >HiveTableScan [pr_review_date#451,pr_review_content#457], > (MetastoreRelation bigbench, product_reviews, Some(pr)) > Code Generation: true > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-10466) UnsafeRow exception in Sort-Based Shuffle with data spill
Cheng Hao created SPARK-10466: - Summary: UnsafeRow exception in Sort-Based Shuffle with data spill Key: SPARK-10466 URL: https://issues.apache.org/jira/browse/SPARK-10466 Project: Spark Issue Type: Bug Components: SQL Reporter: Cheng Hao Priority: Blocker In sort-based shuffle, if we have data spill, it will cause assert exception, the follow code can reproduce that {code} withSparkConf(("spark.shuffle.sort.bypassMergeThreshold", "2")) { withSQLConf(("spark.sql.autoBroadcastJoinThreshold", "0")) { withTempTable("mytemp") { sparkContext.parallelize(1 to 1000, 3).map(i => (i, i)).toDF("key", "value").registerTempTable("mytemp") sql("select key, value as v1 from mytemp where key > 1").registerTempTable("l") sql("select key, value as v2 from mytemp where key > 3").registerTempTable("r") val df3 = sql("select v1, v2 from l left join r on l.key=r.key") df3.count() } } } {code} {code} java.lang.AssertionError: assertion failed at scala.Predef$.assert(Predef.scala:165) at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2.writeKey(UnsafeRowSerializer.scala:75) at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:180) at org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2$$anonfun$apply$1.apply(ExternalSorter.scala:688) at org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2$$anonfun$apply$1.apply(ExternalSorter.scala:687) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2.apply(ExternalSorter.scala:687) at org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2.apply(ExternalSorter.scala:683) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:683) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:80) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) at java.lang.Thread.run(Thread.java:722) 17:32:06.172 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 1.0 in stage 1.0 (TID 4, localhost): java.lang.AssertionError: assertion failed at scala.Predef$.assert(Predef.scala:165) at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2.writeKey(UnsafeRowSerializer.scala:75) at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:180) at org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2$$anonfun$apply$1.apply(ExternalSorter.scala:688) at org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2$$anonfun$apply$1.apply(ExternalSorter.scala:687) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2.apply(ExternalSorter.scala:687) at org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2.apply(ExternalSorter.scala:683) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:683) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:80) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) at java.lang.Thread.run(Thread.java:722) {code}
[jira] [Created] (SPARK-10327) Cache Table is not working while subquery has alias in its project list
Cheng Hao created SPARK-10327: - Summary: Cache Table is not working while subquery has alias in its project list Key: SPARK-10327 URL: https://issues.apache.org/jira/browse/SPARK-10327 Project: Spark Issue Type: Bug Components: SQL Reporter: Cheng Hao Code to reproduce that: {code} import org.apache.spark.sql.hive.execution.HiveTableScan sql("select key, value, key + 1 from src").registerTempTable("abc") cacheTable("abc") val sparkPlan = sql( """select a.key, b.key, c.key from |abc a join abc b on a.key=b.key |join abc c on a.key=c.key""".stripMargin).queryExecution.sparkPlan assert(sparkPlan.collect { case e: InMemoryColumnarTableScan => e }.size === 3) // failed assert(sparkPlan.collect { case e: HiveTableScan => e }.size === 0) // failed {code} The query plan like: {code} == Parsed Logical Plan == 'Project [unresolvedalias('a.key),unresolvedalias('b.key),unresolvedalias('c.key)] 'Join Inner, Some(('a.key = 'c.key)) 'Join Inner, Some(('a.key = 'b.key)) 'UnresolvedRelation [abc], Some(a) 'UnresolvedRelation [abc], Some(b) 'UnresolvedRelation [abc], Some(c) == Analyzed Logical Plan == key: int, key: int, key: int Project [key#14,key#61,key#66] Join Inner, Some((key#14 = key#66)) Join Inner, Some((key#14 = key#61)) Subquery a Subquery abc Project [key#14,value#15,(key#14 + 1) AS _c2#16] MetastoreRelation default, src, None Subquery b Subquery abc Project [key#61,value#62,(key#61 + 1) AS _c2#58] MetastoreRelation default, src, None Subquery c Subquery abc Project [key#66,value#67,(key#66 + 1) AS _c2#63] MetastoreRelation default, src, None == Optimized Logical Plan == Project [key#14,key#61,key#66] Join Inner, Some((key#14 = key#66)) Project [key#14,key#61] Join Inner, Some((key#14 = key#61)) Project [key#14] InMemoryRelation [key#14,value#15,_c2#16], true, 1, StorageLevel(true, true, false, true, 1), (Project [key#14,value#15,(key#14 + 1) AS _c2#16]), Some(abc) Project [key#61] MetastoreRelation default, src, None Project [key#66] MetastoreRelation default, src, None == Physical Plan == TungstenProject [key#14,key#61,key#66] BroadcastHashJoin [key#14], [key#66], BuildRight TungstenProject [key#14,key#61] BroadcastHashJoin [key#14], [key#61], BuildRight ConvertToUnsafe InMemoryColumnarTableScan [key#14], (InMemoryRelation [key#14,value#15,_c2#16], true, 1, StorageLevel(true, true, false, true, 1), (Project [key#14,value#15,(key#14 + 1) AS _c2#16]), Some(abc)) ConvertToUnsafe HiveTableScan [key#61], (MetastoreRelation default, src, None) ConvertToUnsafe HiveTableScan [key#66], (MetastoreRelation default, src, None) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-10270) Add/Replace some Java friendly DataFrame API
Cheng Hao created SPARK-10270: - Summary: Add/Replace some Java friendly DataFrame API Key: SPARK-10270 URL: https://issues.apache.org/jira/browse/SPARK-10270 Project: Spark Issue Type: Improvement Components: SQL Reporter: Cheng Hao Currently in DataFrame, we have API like: {code} def join(right: DataFrame, usingColumns: Seq[String]): DataFrame def dropDuplicates(colNames: Seq[String]): DataFrame def dropDuplicates(colNames: Array[String]): DataFrame {code} Those API not like the so friendly to Java programmers, change it to: {code} def join(right: DataFrame, usingColumns: String*): DataFrame def dropDuplicates(colNames: String*): DataFrame {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-10215) Div of Decimal returns null
[ https://issues.apache.org/jira/browse/SPARK-10215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14710719#comment-14710719 ] Cheng Hao commented on SPARK-10215: --- Yes, that's a blocker issue for our customer, I will try to fix that by the end of today. > Div of Decimal returns null > --- > > Key: SPARK-10215 > URL: https://issues.apache.org/jira/browse/SPARK-10215 > Project: Spark > Issue Type: Bug > Components: SQL >Reporter: Cheng Hao >Priority: Blocker > > {code} > val d = Decimal(1.12321) > val df = Seq((d, 1)).toDF("a", "b") > df.selectExpr("b * a / b").collect() => Array(Row(null)) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-10215) Div of Decimal returns null
Cheng Hao created SPARK-10215: - Summary: Div of Decimal returns null Key: SPARK-10215 URL: https://issues.apache.org/jira/browse/SPARK-10215 Project: Spark Issue Type: Bug Components: SQL Reporter: Cheng Hao Priority: Critical {code} val d = Decimal(1.12321) val df = Seq((d, 1)).toDF("a", "b") df.selectExpr("b * a / b").collect() => Array(Row(null)) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-10134) Improve the performance of Binary Comparison
[ https://issues.apache.org/jira/browse/SPARK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Hao updated SPARK-10134: -- Priority: Minor (was: Major) > Improve the performance of Binary Comparison > > > Key: SPARK-10134 > URL: https://issues.apache.org/jira/browse/SPARK-10134 > Project: Spark > Issue Type: Improvement > Components: SQL >Reporter: Cheng Hao >Priority: Minor > > Currently, compare the binary byte by byte is quite slow, use the Guava > utility to improve the performance, which take 8 bytes one time in the > comparison. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-10134) Improve the performance of Binary Comparison
[ https://issues.apache.org/jira/browse/SPARK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Hao updated SPARK-10134: -- Fix Version/s: (was: 1.6.0) > Improve the performance of Binary Comparison > > > Key: SPARK-10134 > URL: https://issues.apache.org/jira/browse/SPARK-10134 > Project: Spark > Issue Type: Improvement > Components: SQL >Reporter: Cheng Hao >Priority: Minor > > Currently, compare the binary byte by byte is quite slow, use the Guava > utility to improve the performance, which take 8 bytes one time in the > comparison. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-10134) Improve the performance of Binary Comparison
[ https://issues.apache.org/jira/browse/SPARK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14708766#comment-14708766 ] Cheng Hao commented on SPARK-10134: --- We can improve that by enable the comparison every 8 bytes for a 64 bit OS. https://bugs.openjdk.java.net/browse/JDK-8033148 > Improve the performance of Binary Comparison > > > Key: SPARK-10134 > URL: https://issues.apache.org/jira/browse/SPARK-10134 > Project: Spark > Issue Type: Improvement > Components: SQL >Reporter: Cheng Hao > > Currently, compare the binary byte by byte is quite slow, use the Guava > utility to improve the performance, which take 8 bytes one time in the > comparison. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-10130) type coercion for IF should have children resolved first
[ https://issues.apache.org/jira/browse/SPARK-10130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14704513#comment-14704513 ] Cheng Hao commented on SPARK-10130: --- Can you change the fix version to "1.5"? Lots of people suffer from the issue I think. > type coercion for IF should have children resolved first > > > Key: SPARK-10130 > URL: https://issues.apache.org/jira/browse/SPARK-10130 > Project: Spark > Issue Type: Bug > Components: SQL >Reporter: Adrian Wang > > SELECT IF(a > 0, a, 0) FROM (SELECT key a FROM src) temp; -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-10134) Improve the performance of Binary Comparison
Cheng Hao created SPARK-10134: - Summary: Improve the performance of Binary Comparison Key: SPARK-10134 URL: https://issues.apache.org/jira/browse/SPARK-10134 Project: Spark Issue Type: Improvement Components: SQL Reporter: Cheng Hao Fix For: 1.6.0 Currently, compare the binary byte by byte is quite slow, use the Guava utility to improve the performance, which take 8 bytes one time in the comparison. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-9357) Remove JoinedRow
[ https://issues.apache.org/jira/browse/SPARK-9357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14704311#comment-14704311 ] Cheng Hao edited comment on SPARK-9357 at 8/20/15 5:29 AM: --- JoinedRow does increase the overhead by adding layer of indirection, however, it is a trade-off, as copying the 2 non-continue pieces of memory together also causes performance overhead, particularly the case I listed above, only a few records really need by the downstream operators(writing to files) after the filtering. The n-ary JoinedRow will be really helpful in case like the sequential joins. For example: {code} SELECT* FROM a join b on a.key=b.key join c on a.key=c.key and a.col1>b.col1 and b.col2b.col1 and b.col2b.col1 and b.col2b.col1 and b.col2 Remove JoinedRow > > > Key: SPARK-9357 > URL: https://issues.apache.org/jira/browse/SPARK-9357 > Project: Spark > Issue Type: Umbrella > Components: SQL >Reporter: Reynold Xin > > JoinedRow was introduced to join two rows together, in aggregation (join key > and value), joins (left, right), window functions, etc. > It aims to reduce the amount of data copied, but incurs branches when the row > is actually read. Given all the fields will be read almost all the time > (otherwise they get pruned out by the optimizer), branch predictor cannot do > anything about those branches. > I think a better way is just to remove this thing, and materializes the row > data directly. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-9357) Remove JoinedRow
[ https://issues.apache.org/jira/browse/SPARK-9357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14704311#comment-14704311 ] Cheng Hao edited comment on SPARK-9357 at 8/20/15 5:28 AM: --- JoinedRow does increase the overhead by adding layer of indirection, however, it is a trade-off, as copying the 2 non-continue pieces of memory together is also causes performance issue, particularly the case I listed above, only a few records really need by the downstream operators(writing to files) after the filtering. The n-ary JoinedRow will be really helpful in case like the sequential joins. For example: {code} SELECT* FROM a join b on a.key=b.key join c on a.key=c.key and a.col1>b.col1 and b.col2b.col1 and b.col2b.col1 and b.col2b.col1 and b.col2 Remove JoinedRow > > > Key: SPARK-9357 > URL: https://issues.apache.org/jira/browse/SPARK-9357 > Project: Spark > Issue Type: Umbrella > Components: SQL >Reporter: Reynold Xin > > JoinedRow was introduced to join two rows together, in aggregation (join key > and value), joins (left, right), window functions, etc. > It aims to reduce the amount of data copied, but incurs branches when the row > is actually read. Given all the fields will be read almost all the time > (otherwise they get pruned out by the optimizer), branch predictor cannot do > anything about those branches. > I think a better way is just to remove this thing, and materializes the row > data directly. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-9357) Remove JoinedRow
[ https://issues.apache.org/jira/browse/SPARK-9357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14704311#comment-14704311 ] Cheng Hao commented on SPARK-9357: -- JoinedRow does increase the overhead by adding layer of indirection, however, it is a trade-off, as copying the 2 non-continue pieces of memory together is also causes performance issue, particularly the case I listed above, only a few records really need by the downstream operators(writing to files) after the filtering. The n-ary JoinedRow will be really helpful in case like the sequential joins. For example: {code} SELECT* FROM a join b on a.key=b.key join c on a.key=c.key and a.col1>b.col1 and b.col2b.col1 and b.col2 Remove JoinedRow > > > Key: SPARK-9357 > URL: https://issues.apache.org/jira/browse/SPARK-9357 > Project: Spark > Issue Type: Umbrella > Components: SQL >Reporter: Reynold Xin > > JoinedRow was introduced to join two rows together, in aggregation (join key > and value), joins (left, right), window functions, etc. > It aims to reduce the amount of data copied, but incurs branches when the row > is actually read. Given all the fields will be read almost all the time > (otherwise they get pruned out by the optimizer), branch predictor cannot do > anything about those branches. > I think a better way is just to remove this thing, and materializes the row > data directly. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-9357) Remove JoinedRow
[ https://issues.apache.org/jira/browse/SPARK-9357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14702603#comment-14702603 ] Cheng Hao commented on SPARK-9357: -- JoinedRow is probably in high efficiency for case like: {code} CREATE TABLE a AS SELECT * FROM t1 JOIN t2 on t1.key=t2.key and t1.col1 < t2.col1; {code} If the table t1 and t2 are large tables with lots of columns, and most of records will be filtered out in t1.col1 < t2.col2. Maybe we can create an multi-nary JoinedRow instead of the binary JoinedRow, any thoughts? > Remove JoinedRow > > > Key: SPARK-9357 > URL: https://issues.apache.org/jira/browse/SPARK-9357 > Project: Spark > Issue Type: Umbrella > Components: SQL >Reporter: Reynold Xin > > JoinedRow was introduced to join two rows together, in aggregation (join key > and value), joins (left, right), window functions, etc. > It aims to reduce the amount of data copied, but incurs branches when the row > is actually read. Given all the fields will be read almost all the time > (otherwise they get pruned out by the optimizer), branch predictor cannot do > anything about those branches. > I think a better way is just to remove this thing, and materializes the row > data directly. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-7218) Create a real iterator with open/close for Spark SQL
[ https://issues.apache.org/jira/browse/SPARK-7218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14702584#comment-14702584 ] Cheng Hao commented on SPARK-7218: -- Can you give some BKM for this task? > Create a real iterator with open/close for Spark SQL > > > Key: SPARK-7218 > URL: https://issues.apache.org/jira/browse/SPARK-7218 > Project: Spark > Issue Type: New Feature > Components: SQL >Reporter: Reynold Xin > -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-10044) AnalysisException in resolving reference for sorting with aggregation
Cheng Hao created SPARK-10044: - Summary: AnalysisException in resolving reference for sorting with aggregation Key: SPARK-10044 URL: https://issues.apache.org/jira/browse/SPARK-10044 Project: Spark Issue Type: Bug Components: SQL Reporter: Cheng Hao Unit test as: {code} withTempTable("mytable") { sqlContext.sparkContext.parallelize(1 to 10).map(i => (i, i.toString)) .toDF("key", "value") .registerTempTable("mytable") checkAnswer(sql( """select max(value) from mytable group by key % 2 |order by max(concat(value,",", key)), min(substr(value, 0, 4)) |""".stripMargin), Row("8") :: Row("9") :: Nil) } {code} Exception like: {code} cannot resolve '_aggOrdering' given input columns _c0, _aggOrdering, _aggOrdering; org.apache.spark.sql.AnalysisException: cannot resolve '_aggOrdering' given input columns _c0, _aggOrdering, _aggOrdering; at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:56) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:53) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:293) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:293) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:51) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:292) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:290) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:290) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:249) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-8240) string function: concat
[ https://issues.apache.org/jira/browse/SPARK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14696471#comment-14696471 ] Cheng Hao commented on SPARK-8240: -- It's probably very difficult to define the function when mix the column type and string type. > string function: concat > --- > > Key: SPARK-8240 > URL: https://issues.apache.org/jira/browse/SPARK-8240 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Reynold Xin >Assignee: Reynold Xin > Fix For: 1.5.0 > > > concat(string|binary A, string|binary B...): string / binary > Returns the string or bytes resulting from concatenating the strings or bytes > passed in as parameters in order. For example, concat('foo', 'bar') results > in 'foobar'. Note that this function can take any number of input strings. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-8240) string function: concat
[ https://issues.apache.org/jira/browse/SPARK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14696469#comment-14696469 ] Cheng Hao commented on SPARK-8240: -- It works for me like: {code} sql("select concat('a', ',', key, ',', value) from src").collect() {code} or {code} df.select(concat($"a", lit(","), $"b")), df.select(concat($"a", lit(","),$"b", lit(","),$"c")) {code} > string function: concat > --- > > Key: SPARK-8240 > URL: https://issues.apache.org/jira/browse/SPARK-8240 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Reynold Xin >Assignee: Reynold Xin > Fix For: 1.5.0 > > > concat(string|binary A, string|binary B...): string / binary > Returns the string or bytes resulting from concatenating the strings or bytes > passed in as parameters in order. For example, concat('foo', 'bar') results > in 'foobar'. Note that this function can take any number of input strings. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-9879) OOM in LIMIT clause with large number
[ https://issues.apache.org/jira/browse/SPARK-9879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14696338#comment-14696338 ] Cheng Hao commented on SPARK-9879: -- I create a new physical operator called LargeLimit, and will take effect when the limit is equal or greater than the SqlConf.LIMIT_ROWS in LIMIT clause. LargeLimit will trigger the children RDD execution and persist its result, and we need to iterate the persisted data twice. The first iteration to get the number of records in each partition, then we can compute how many records we need to take from each of the partition, to satisfy the total number of records we need; in the second iteration, we just take the records from each of partition, according to the specified numbers. The main advantage of this approach: - No single node shuffle required, even no data shuffle required, and the result data is still in distributed mode. - Keep the same output partitioning as its child. > OOM in LIMIT clause with large number > - > > Key: SPARK-9879 > URL: https://issues.apache.org/jira/browse/SPARK-9879 > Project: Spark > Issue Type: Bug > Components: SQL >Reporter: Cheng Hao > > {code} > create table spark.tablsetest as select * from dpa_ord_bill_tf order by > member_id limit 2000; > {code} > > {code} > spark-sql --driver-memory 48g --executor-memory 24g --driver-java-options > -XX:PermSize=1024M -XX:MaxPermSize=2048M > Error logs > 15/07/27 10:22:43 ERROR ActorSystemImpl: Uncaught fatal error from thread > [sparkDriver-akka.actor.default-dispatcher-20]shutting down ActorSystem > [sparkDriver] > java.lang.OutOfMemoryError: Requested array size exceeds VM limit > at java.util.Arrays.copyOf(Arrays.java:2271) > at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) > at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) > at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140) > at > java.io.ObjectOutputStream$BlockDataOutputStream.write(ObjectOutputStream.java:1852) > at java.io.ObjectOutputStream.write(ObjectOutputStream.java:708) > at org.apache.spark.util.Utils$$anon$2.write(Utils.scala:134) > at com.esotericsoftware.kryo.io.Output.flush(Output.java:155) > at com.esotericsoftware.kryo.io.Output.close(Output.java:165) > at > org.apache.spark.serializer.KryoSerializationStream.close(KryoSerializer.scala:162) > at org.apache.spark.util.Utils$.serializeViaNestedStream(Utils.scala:139) > at > org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$writeObject$1.apply$mcV$sp(ParallelCollectionRDD.scala:65) > at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1239) > at > org.apache.spark.rdd.ParallelCollectionPartition.writeObject(ParallelCollectionRDD.scala:51) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) > at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) > at > org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44) > at > org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81) > at org.apache.spark.scheduler.Task$.serializeWithDependencies(Task.scala:168) > at > org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:467) > at > org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet$1.apply$mcVI$sp(TaskSchedulerImpl.scala:231) > 15/07/27 10:22:43 ERROR ErrorMonitor: Uncaught fatal error from thread > [sparkDriver-akka.actor.default-dispatcher-20]shutting down ActorSystem > [sparkDriver] > java.lang.OutOfMemoryError: Requested array size exceeds VM limit > at java.util.Arrays.copyOf(Arrays.java:2271) > at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) > at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) > at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140
[jira] [Updated] (SPARK-9879) OOM in LIMIT clause with large number
[ https://issues.apache.org/jira/browse/SPARK-9879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Hao updated SPARK-9879: - Summary: OOM in LIMIT clause with large number (was: OOM in CTAS with LIMIT) > OOM in LIMIT clause with large number > - > > Key: SPARK-9879 > URL: https://issues.apache.org/jira/browse/SPARK-9879 > Project: Spark > Issue Type: Bug > Components: SQL >Reporter: Cheng Hao > > {code} > create table spark.tablsetest as select * from dpa_ord_bill_tf order by > member_id limit 2000; > {code} > > {code} > spark-sql --driver-memory 48g --executor-memory 24g --driver-java-options > -XX:PermSize=1024M -XX:MaxPermSize=2048M > Error logs > 15/07/27 10:22:43 ERROR ActorSystemImpl: Uncaught fatal error from thread > [sparkDriver-akka.actor.default-dispatcher-20]shutting down ActorSystem > [sparkDriver] > java.lang.OutOfMemoryError: Requested array size exceeds VM limit > at java.util.Arrays.copyOf(Arrays.java:2271) > at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) > at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) > at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140) > at > java.io.ObjectOutputStream$BlockDataOutputStream.write(ObjectOutputStream.java:1852) > at java.io.ObjectOutputStream.write(ObjectOutputStream.java:708) > at org.apache.spark.util.Utils$$anon$2.write(Utils.scala:134) > at com.esotericsoftware.kryo.io.Output.flush(Output.java:155) > at com.esotericsoftware.kryo.io.Output.close(Output.java:165) > at > org.apache.spark.serializer.KryoSerializationStream.close(KryoSerializer.scala:162) > at org.apache.spark.util.Utils$.serializeViaNestedStream(Utils.scala:139) > at > org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$writeObject$1.apply$mcV$sp(ParallelCollectionRDD.scala:65) > at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1239) > at > org.apache.spark.rdd.ParallelCollectionPartition.writeObject(ParallelCollectionRDD.scala:51) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) > at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) > at > org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44) > at > org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81) > at org.apache.spark.scheduler.Task$.serializeWithDependencies(Task.scala:168) > at > org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:467) > at > org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet$1.apply$mcVI$sp(TaskSchedulerImpl.scala:231) > 15/07/27 10:22:43 ERROR ErrorMonitor: Uncaught fatal error from thread > [sparkDriver-akka.actor.default-dispatcher-20]shutting down ActorSystem > [sparkDriver] > java.lang.OutOfMemoryError: Requested array size exceeds VM limit > at java.util.Arrays.copyOf(Arrays.java:2271) > at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) > at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) > at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140) > at > java.io.ObjectOutputStream$BlockDataOutputStream.write(ObjectOutputStream.java:1852) > at java.io.ObjectOutputStream.write(ObjectOutputStream.java:708) > at org.apache.spark.util.Utils$$anon$2.write(Utils.scala:134) > at com.esotericsoftware.kryo.io.Output.flush(Output.java:155) > at com.esotericsoftware.kryo.io.Output.close(Output.java:165) > at > org.apache.spark.serializer.KryoSerializationStream.close(KryoSerializer.scala:162) > at org.apache.spark.util.Utils$.serializeViaNestedStream(Utils.scala:139) > at > org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$writeObject$1.apply$mcV$sp(ParallelCollectionRDD.scala:65) > at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1239) > at > org.apache.spark.rdd.ParallelCollectionPartitio
[jira] [Created] (SPARK-9879) OOM in CTAS with LIMIT
Cheng Hao created SPARK-9879: Summary: OOM in CTAS with LIMIT Key: SPARK-9879 URL: https://issues.apache.org/jira/browse/SPARK-9879 Project: Spark Issue Type: Bug Components: SQL Reporter: Cheng Hao [code] create table spark.tablsetest as select * from dpa_ord_bill_tf order by member_id limit 2000; [code] [code] spark-sql --driver-memory 48g --executor-memory 24g --driver-java-options -XX:PermSize=1024M -XX:MaxPermSize=2048M unfortunately,the driver exists following bugs: 15/07/27 10:22:43 ERROR ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-20]shutting down ActorSystem [sparkDriver] java.lang.OutOfMemoryError: Requested array size exceeds VM limit at java.util.Arrays.copyOf(Arrays.java:2271) at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140) at java.io.ObjectOutputStream$BlockDataOutputStream.write(ObjectOutputStream.java:1852) at java.io.ObjectOutputStream.write(ObjectOutputStream.java:708) at org.apache.spark.util.Utils$$anon$2.write(Utils.scala:134) at com.esotericsoftware.kryo.io.Output.flush(Output.java:155) at com.esotericsoftware.kryo.io.Output.close(Output.java:165) at org.apache.spark.serializer.KryoSerializationStream.close(KryoSerializer.scala:162) at org.apache.spark.util.Utils$.serializeViaNestedStream(Utils.scala:139) at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$writeObject$1.apply$mcV$sp(ParallelCollectionRDD.scala:65) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1239) at org.apache.spark.rdd.ParallelCollectionPartition.writeObject(ParallelCollectionRDD.scala:51) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81) at org.apache.spark.scheduler.Task$.serializeWithDependencies(Task.scala:168) at org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:467) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet$1.apply$mcVI$sp(TaskSchedulerImpl.scala:231) 15/07/27 10:22:43 ERROR ErrorMonitor: Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-20]shutting down ActorSystem [sparkDriver] java.lang.OutOfMemoryError: Requested array size exceeds VM limit at java.util.Arrays.copyOf(Arrays.java:2271) at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140) at java.io.ObjectOutputStream$BlockDataOutputStream.write(ObjectOutputStream.java:1852) at java.io.ObjectOutputStream.write(ObjectOutputStream.java:708) at org.apache.spark.util.Utils$$anon$2.write(Utils.scala:134) at com.esotericsoftware.kryo.io.Output.flush(Output.java:155) at com.esotericsoftware.kryo.io.Output.close(Output.java:165) at org.apache.spark.serializer.KryoSerializationStream.close(KryoSerializer.scala:162) at org.apache.spark.util.Utils$.serializeViaNestedStream(Utils.scala:139) at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$writeObject$1.apply$mcV$sp(ParallelCollectionRDD.scala:65) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1239) at org.apache.spark.rdd.ParallelCollectionPartition.writeObject(ParallelCollectionRDD.scala:51) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeWriteObj
[jira] [Updated] (SPARK-9879) OOM in CTAS with LIMIT
[ https://issues.apache.org/jira/browse/SPARK-9879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Hao updated SPARK-9879: - Description: {code} create table spark.tablsetest as select * from dpa_ord_bill_tf order by member_id limit 2000; {code} {code} spark-sql --driver-memory 48g --executor-memory 24g --driver-java-options -XX:PermSize=1024M -XX:MaxPermSize=2048M Error logs 15/07/27 10:22:43 ERROR ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-20]shutting down ActorSystem [sparkDriver] java.lang.OutOfMemoryError: Requested array size exceeds VM limit at java.util.Arrays.copyOf(Arrays.java:2271) at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140) at java.io.ObjectOutputStream$BlockDataOutputStream.write(ObjectOutputStream.java:1852) at java.io.ObjectOutputStream.write(ObjectOutputStream.java:708) at org.apache.spark.util.Utils$$anon$2.write(Utils.scala:134) at com.esotericsoftware.kryo.io.Output.flush(Output.java:155) at com.esotericsoftware.kryo.io.Output.close(Output.java:165) at org.apache.spark.serializer.KryoSerializationStream.close(KryoSerializer.scala:162) at org.apache.spark.util.Utils$.serializeViaNestedStream(Utils.scala:139) at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$writeObject$1.apply$mcV$sp(ParallelCollectionRDD.scala:65) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1239) at org.apache.spark.rdd.ParallelCollectionPartition.writeObject(ParallelCollectionRDD.scala:51) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81) at org.apache.spark.scheduler.Task$.serializeWithDependencies(Task.scala:168) at org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:467) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet$1.apply$mcVI$sp(TaskSchedulerImpl.scala:231) 15/07/27 10:22:43 ERROR ErrorMonitor: Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-20]shutting down ActorSystem [sparkDriver] java.lang.OutOfMemoryError: Requested array size exceeds VM limit at java.util.Arrays.copyOf(Arrays.java:2271) at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140) at java.io.ObjectOutputStream$BlockDataOutputStream.write(ObjectOutputStream.java:1852) at java.io.ObjectOutputStream.write(ObjectOutputStream.java:708) at org.apache.spark.util.Utils$$anon$2.write(Utils.scala:134) at com.esotericsoftware.kryo.io.Output.flush(Output.java:155) at com.esotericsoftware.kryo.io.Output.close(Output.java:165) at org.apache.spark.serializer.KryoSerializationStream.close(KryoSerializer.scala:162) at org.apache.spark.util.Utils$.serializeViaNestedStream(Utils.scala:139) at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$writeObject$1.apply$mcV$sp(ParallelCollectionRDD.scala:65) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1239) at org.apache.spark.rdd.ParallelCollectionPartition.writeObject(ParallelCollectionRDD.scala:51) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495) at java.io.ObjectOutputStream.writeOrdinaryObject
[jira] [Updated] (SPARK-9879) OOM in CTAS with LIMIT
[ https://issues.apache.org/jira/browse/SPARK-9879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Hao updated SPARK-9879: - Description: {code} create table spark.tablsetest as select * from dpa_ord_bill_tf order by member_id limit 2000; {code} {code} spark-sql --driver-memory 48g --executor-memory 24g --driver-java-options -XX:PermSize=1024M -XX:MaxPermSize=2048M unfortunately,the driver exists following bugs: 15/07/27 10:22:43 ERROR ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-20]shutting down ActorSystem [sparkDriver] java.lang.OutOfMemoryError: Requested array size exceeds VM limit at java.util.Arrays.copyOf(Arrays.java:2271) at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140) at java.io.ObjectOutputStream$BlockDataOutputStream.write(ObjectOutputStream.java:1852) at java.io.ObjectOutputStream.write(ObjectOutputStream.java:708) at org.apache.spark.util.Utils$$anon$2.write(Utils.scala:134) at com.esotericsoftware.kryo.io.Output.flush(Output.java:155) at com.esotericsoftware.kryo.io.Output.close(Output.java:165) at org.apache.spark.serializer.KryoSerializationStream.close(KryoSerializer.scala:162) at org.apache.spark.util.Utils$.serializeViaNestedStream(Utils.scala:139) at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$writeObject$1.apply$mcV$sp(ParallelCollectionRDD.scala:65) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1239) at org.apache.spark.rdd.ParallelCollectionPartition.writeObject(ParallelCollectionRDD.scala:51) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81) at org.apache.spark.scheduler.Task$.serializeWithDependencies(Task.scala:168) at org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:467) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet$1.apply$mcVI$sp(TaskSchedulerImpl.scala:231) 15/07/27 10:22:43 ERROR ErrorMonitor: Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-20]shutting down ActorSystem [sparkDriver] java.lang.OutOfMemoryError: Requested array size exceeds VM limit at java.util.Arrays.copyOf(Arrays.java:2271) at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140) at java.io.ObjectOutputStream$BlockDataOutputStream.write(ObjectOutputStream.java:1852) at java.io.ObjectOutputStream.write(ObjectOutputStream.java:708) at org.apache.spark.util.Utils$$anon$2.write(Utils.scala:134) at com.esotericsoftware.kryo.io.Output.flush(Output.java:155) at com.esotericsoftware.kryo.io.Output.close(Output.java:165) at org.apache.spark.serializer.KryoSerializationStream.close(KryoSerializer.scala:162) at org.apache.spark.util.Utils$.serializeViaNestedStream(Utils.scala:139) at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$writeObject$1.apply$mcV$sp(ParallelCollectionRDD.scala:65) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1239) at org.apache.spark.rdd.ParallelCollectionPartition.writeObject(ParallelCollectionRDD.scala:51) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495) at java.io.Ob
[jira] [Created] (SPARK-9735) Auto infer partition schema of HadoopFsRelation should should respected the user specified one
Cheng Hao created SPARK-9735: Summary: Auto infer partition schema of HadoopFsRelation should should respected the user specified one Key: SPARK-9735 URL: https://issues.apache.org/jira/browse/SPARK-9735 Project: Spark Issue Type: Bug Components: SQL Reporter: Cheng Hao This code is copied from the hadoopFsRelationSuite.scala {code} partitionedTestDF = (for { i <- 1 to 3 p2 <- Seq("foo", "bar") } yield (i, s"val_$i", 1, p2)).toDF("a", "b", "p1", "p2") withTempPath { file => val input = partitionedTestDF.select('a, 'b, 'p1.cast(StringType).as('ps), 'p2) input .write .format(dataSourceName) .mode(SaveMode.Overwrite) .partitionBy("ps", "p2") .saveAsTable("t") withTempTable("t") { checkAnswer(sqlContext.table("t"), input.collect()) } } 11.521 ERROR org.apache.spark.executor.Executor: Exception in task 2.0 in stage 2.0 (TID 130) java.lang.ClassCastException: java.lang.Integer cannot be cast to org.apache.spark.unsafe.types.UTF8String at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getUTF8String(rows.scala:45) at org.apache.spark.sql.catalyst.expressions.SpecificMutableRow.getUTF8String(SpecificMutableRow.scala:195) at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toScalaImpl(CatalystTypeConverters.scala:297) at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toScalaImpl(CatalystTypeConverters.scala:289) at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toScala(CatalystTypeConverters.scala:110) at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toScala(CatalystTypeConverters.scala:278) at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toScala(CatalystTypeConverters.scala:245) at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToScalaConverter$2.apply(CatalystTypeConverters.scala:406) at org.apache.spark.sql.execution.SparkPlan$$anonfun$3$$anonfun$apply$2.apply(SparkPlan.scala:194) at org.apache.spark.sql.execution.SparkPlan$$anonfun$3$$anonfun$apply$2.apply(SparkPlan.scala:194) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:905) at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:905) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1836) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1836) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:88) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-9689) Cache doesn't refresh for HadoopFsRelation based table
[ https://issues.apache.org/jira/browse/SPARK-9689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14661359#comment-14661359 ] Cheng Hao commented on SPARK-9689: -- After investigation, the root cause for the failure, is in the CacheManager.recache(), it assumes the spark plan is immutable, even after the file under the table path changed. And, for `InsertIntoHadoopFsRelation`, we even didn't call the `CacheManager.recache()`. > Cache doesn't refresh for HadoopFsRelation based table > -- > > Key: SPARK-9689 > URL: https://issues.apache.org/jira/browse/SPARK-9689 > Project: Spark > Issue Type: Bug > Components: SQL >Reporter: Cheng Hao > > {code:title=example|borderStyle=solid} > // create a HadoopFsRelation based table > sql(s""" > |CREATE TEMPORARY TABLE jsonTable (a int, b string) > |USING org.apache.spark.sql.json.DefaultSource > |OPTIONS ( > | path '${path.toString}' > |)""".stripMargin) > > // give the value from table jt > sql( > s""" > |INSERT OVERWRITE TABLE jsonTable SELECT a, b FROM jt > """.stripMargin) > // cache the HadoopFsRelation Table > sqlContext.cacheTable("jsonTable") > > // update the HadoopFsRelation Table > sql( > s""" > |INSERT OVERWRITE TABLE jsonTable SELECT a * 2, b FROM jt > """.stripMargin) > // Even this will fail > sql("SELECT a, b FROM jsonTable").collect() > // This will fail, as the cache doesn't refresh > checkAnswer( > sql("SELECT a, b FROM jsonTable"), > sql("SELECT a * 2, b FROM jt").collect()) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-9689) Cache doesn't refresh for HadoopFsRelation based table
[ https://issues.apache.org/jira/browse/SPARK-9689?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Hao updated SPARK-9689: - Description: {code:title=example|borderStyle=solid} // create a HadoopFsRelation based table sql(s""" |CREATE TEMPORARY TABLE jsonTable (a int, b string) |USING org.apache.spark.sql.json.DefaultSource |OPTIONS ( | path '${path.toString}' |)""".stripMargin) // give the value from table jt sql( s""" |INSERT OVERWRITE TABLE jsonTable SELECT a, b FROM jt """.stripMargin) // cache the HadoopFsRelation Table sqlContext.cacheTable("jsonTable") // update the HadoopFsRelation Table sql( s""" |INSERT OVERWRITE TABLE jsonTable SELECT a * 2, b FROM jt """.stripMargin) // Even this will fail sql("SELECT a, b FROM jsonTable").collect() // This will fail, as the cache doesn't refresh checkAnswer( sql("SELECT a, b FROM jsonTable"), sql("SELECT a * 2, b FROM jt").collect()) {code} was: {code:title=example|borderStyle=solid} // create a HadoopFsRelation based table sql(s""" |CREATE TEMPORARY TABLE jsonTable (a int, b string) |USING org.apache.spark.sql.json.DefaultSource |OPTIONS ( | path '${path.toString}' |)""".stripMargin) // give the value from table jt sql( s""" |INSERT OVERWRITE TABLE jsonTable SELECT a, b FROM jt """.stripMargin) // cache the HadoopFsRelation Table sqlContext.cacheTable("jsonTable") // update the HadoopFsRelation Table sql( s""" |INSERT OVERWRITE TABLE jsonTable SELECT a * 2, b FROM jt """.stripMargin) // This will fail, as the cache doesn't refresh checkAnswer( sql("SELECT a, b FROM jsonTable"), sql("SELECT a * 2, b FROM jt").collect()) {code} > Cache doesn't refresh for HadoopFsRelation based table > -- > > Key: SPARK-9689 > URL: https://issues.apache.org/jira/browse/SPARK-9689 > Project: Spark > Issue Type: Bug > Components: SQL >Reporter: Cheng Hao > > {code:title=example|borderStyle=solid} > // create a HadoopFsRelation based table > sql(s""" > |CREATE TEMPORARY TABLE jsonTable (a int, b string) > |USING org.apache.spark.sql.json.DefaultSource > |OPTIONS ( > | path '${path.toString}' > |)""".stripMargin) > > // give the value from table jt > sql( > s""" > |INSERT OVERWRITE TABLE jsonTable SELECT a, b FROM jt > """.stripMargin) > // cache the HadoopFsRelation Table > sqlContext.cacheTable("jsonTable") > > // update the HadoopFsRelation Table > sql( > s""" > |INSERT OVERWRITE TABLE jsonTable SELECT a * 2, b FROM jt > """.stripMargin) > // Even this will fail > sql("SELECT a, b FROM jsonTable").collect() > // This will fail, as the cache doesn't refresh > checkAnswer( > sql("SELECT a, b FROM jsonTable"), > sql("SELECT a * 2, b FROM jt").collect()) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-9689) Cache doesn't refresh for HadoopFsRelation based table
Cheng Hao created SPARK-9689: Summary: Cache doesn't refresh for HadoopFsRelation based table Key: SPARK-9689 URL: https://issues.apache.org/jira/browse/SPARK-9689 Project: Spark Issue Type: Bug Components: SQL Reporter: Cheng Hao {code:title=example|borderStyle=solid} // create a HadoopFsRelation based table sql(s""" |CREATE TEMPORARY TABLE jsonTable (a int, b string) |USING org.apache.spark.sql.json.DefaultSource |OPTIONS ( | path '${path.toString}' |)""".stripMargin) // give the value from table jt sql( s""" |INSERT OVERWRITE TABLE jsonTable SELECT a, b FROM jt """.stripMargin) // cache the HadoopFsRelation Table sqlContext.cacheTable("jsonTable") // update the HadoopFsRelation Table sql( s""" |INSERT OVERWRITE TABLE jsonTable SELECT a * 2, b FROM jt """.stripMargin) // This will fail, as the cache doesn't refresh checkAnswer( sql("SELECT a, b FROM jsonTable"), sql("SELECT a * 2, b FROM jt").collect()) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-7119) ScriptTransform doesn't consider the output data type
[ https://issues.apache.org/jira/browse/SPARK-7119?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14652892#comment-14652892 ] Cheng Hao commented on SPARK-7119: -- [~marmbrus] This is actually a bug fixing, and it blocks the bigbench testing for quite a long time, since the code is ready (to be reviewed), can we add it back to the 1.5 target list? > ScriptTransform doesn't consider the output data type > - > > Key: SPARK-7119 > URL: https://issues.apache.org/jira/browse/SPARK-7119 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.3.0, 1.3.1, 1.4.0 >Reporter: Cheng Hao >Priority: Critical > > {code:sql} > from (from src select transform(key, value) using 'cat' as (thing1 int, > thing2 string)) t select thing1 + 2; > {code} > {noformat} > 15/04/24 00:58:55 ERROR CliDriver: org.apache.spark.SparkException: Job > aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent > failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): > java.lang.ClassCastException: org.apache.spark.sql.types.UTF8String cannot be > cast to java.lang.Integer > at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:106) > at scala.math.Numeric$IntIsIntegral$.plus(Numeric.scala:57) > at > org.apache.spark.sql.catalyst.expressions.Add.eval(arithmetic.scala:127) > at > org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:118) > at > org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:68) > at > org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:52) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at > scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) > at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) > at scala.collection.AbstractIterator.to(Iterator.scala:1157) > at > scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) > at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) > at > scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) > at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) > at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:819) > at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:819) > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1618) > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1618) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) > at org.apache.spark.scheduler.Task.run(Task.scala:64) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:209) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) > at java.lang.Thread.run(Thread.java:722) > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-9381) Migrate JSON data source to the new partitioning data source
Cheng Hao created SPARK-9381: Summary: Migrate JSON data source to the new partitioning data source Key: SPARK-9381 URL: https://issues.apache.org/jira/browse/SPARK-9381 Project: Spark Issue Type: New Feature Components: SQL Reporter: Cheng Hao -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-9374) [Spark SQL] Throw out erorr of "AnalysisException: nondeterministic expressions are only allowed in Project or Filter" during the spark sql parse phase
[ https://issues.apache.org/jira/browse/SPARK-9374?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14642706#comment-14642706 ] Cheng Hao commented on SPARK-9374: -- [~cloud_fan]] Can you also take look at this failure? Thx! > [Spark SQL] Throw out erorr of "AnalysisException: nondeterministic > expressions are only allowed in Project or Filter" during the spark sql parse > phase > --- > > Key: SPARK-9374 > URL: https://issues.apache.org/jira/browse/SPARK-9374 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.5.0 >Reporter: Yi Zhou >Priority: Blocker > > #Spark SQL Query > INSERT INTO TABLE TEST_QUERY_0_result > SELECT w_state, i_item_id, > SUM( > CASE WHEN (unix_timestamp(d_date,'-MM-dd') < > unix_timestamp('2001-03-16','-MM-dd')) > THEN ws_sales_price - COALESCE(wr_refunded_cash,0) > ELSE 0.0 END > ) AS sales_before, > SUM( > CASE WHEN (unix_timestamp(d_date,'-MM-dd') >= > unix_timestamp('2001-03-16','-MM-dd')) > THEN ws_sales_price - coalesce(wr_refunded_cash,0) > ELSE 0.0 END > ) AS sales_after > FROM ( > SELECT * > FROM web_sales ws > LEFT OUTER JOIN web_returns wr ON (ws.ws_order_number = wr.wr_order_number > AND ws.ws_item_sk = wr.wr_item_sk) > ) a1 > JOIN item i ON a1.ws_item_sk = i.i_item_sk > JOIN warehouse w ON a1.ws_warehouse_sk = w.w_warehouse_sk > JOIN date_dim d ON a1.ws_sold_date_sk = d.d_date_sk > AND unix_timestamp(d.d_date, '-MM-dd') >= unix_timestamp('2001-03-16', > '-MM-dd') - 30*24*60*60 --subtract 30 days in seconds > AND unix_timestamp(d.d_date, '-MM-dd') <= unix_timestamp('2001-03-16', > '-MM-dd') + 30*24*60*60 --add 30 days in seconds > GROUP BY w_state,i_item_id > CLUSTER BY w_state,i_item_id > Error Message## > org.apache.spark.sql.AnalysisException: nondeterministic expressions are only > allowed in Project or Filter, found: > (((ws_sold_date_sk = d_date_sk) && > (HiveGenericUDF#org.apache.hadoop.hive.ql.udf.generic.GenericUDFUnixTimeStamp(d_date,-MM-dd) > >= > (HiveGenericUDF#org.apache.hadoop.hive.ql.udf.generic.GenericUDFUnixTimeStamp(2001-03-16,-MM-dd) > - CAST30 * 24) * 60) * 60), LongType && > (HiveGenericUDF#org.apache.hadoop.hive.ql.udf.generic.GenericUDFUnixTimeStamp(d_date,-MM-dd) > <= > (HiveGenericUDF#org.apache.hadoop.hive.ql.udf.generic.GenericUDFUnixTimeStamp(2001-03-16,-MM-dd) > + CAST30 * 24) * 60) * 60), LongType > in operator Join Inner, Somews_sold_date_sk#289L = d_date_sk#383L) && > (HiveGenericUDF#org.apache.hadoop.hive.ql.udf.generic.GenericUDFUnixTimeStamp(d_date#385,-MM-dd) > >= > (HiveGenericUDF#org.apache.hadoop.hive.ql.udf.generic.GenericUDFUnixTimeStamp(2001-03-16,-MM-dd) > - CAST30 * 24) * 60) * 60), LongType && > (HiveGenericUDF#org.apache.hadoop.hive.ql.udf.generic.GenericUDFUnixTimeStamp(d_date#385,-MM-dd) > <= > (HiveGenericUDF#org.apache.hadoop.hive.ql.udf.generic.GenericUDFUnixTimeStamp(2001-03-16,-MM-dd) > + CAST30 * 24) * 60) * 60), LongType) > ; > at > org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:37) > at > org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:43) > at > org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:148) > at > org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:49) > at > org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:103) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:102) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:102) > at scala.collection.immutable.List.foreach(List.scala:318) > at > org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:102) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:102) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:102) > at scala.collection.immutable.List.foreach(List.scala:318) > at > org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:102) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:102) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:102) > at scala.collection.immutable.List.foreach(List.scala:318) > at
[jira] [Commented] (SPARK-9239) HiveUDAF support for AggregateFunction2
[ https://issues.apache.org/jira/browse/SPARK-9239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14638276#comment-14638276 ] Cheng Hao commented on SPARK-9239: -- [~yhuai] are you working on this now? Or I can take it. > HiveUDAF support for AggregateFunction2 > --- > > Key: SPARK-9239 > URL: https://issues.apache.org/jira/browse/SPARK-9239 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Yin Huai >Priority: Blocker > > We need to build a wrapper for Hive UDAFs on top of AggregateFunction2. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-8230) complex function: size
[ https://issues.apache.org/jira/browse/SPARK-8230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14629107#comment-14629107 ] Cheng Hao commented on SPARK-8230: -- [~pedrorodriguez], actually [~TarekAuel] set a good example at https://github.com/apache/spark/pull/7214/files, you can start get understanding his PR first, and I would like to review your code once it's ready. > complex function: size > -- > > Key: SPARK-8230 > URL: https://issues.apache.org/jira/browse/SPARK-8230 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Reynold Xin >Assignee: Cheng Hao > > size(Map): int > size(Array): int > return the number of elements in the map or array. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-8956) Rollup produces incorrect result when group by contains expressions
[ https://issues.apache.org/jira/browse/SPARK-8956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14624121#comment-14624121 ] Cheng Hao commented on SPARK-8956: -- Sorry, I didn't notice this jira issue when I created this issue SPARK-8972. > Rollup produces incorrect result when group by contains expressions > --- > > Key: SPARK-8956 > URL: https://issues.apache.org/jira/browse/SPARK-8956 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.4.0 >Reporter: Yana Kadiyska > > Rollup produces incorrect results when group clause contains an expression > {code}case class KeyValue(key: Int, value: String) > val df = sc.parallelize(1 to 50).map(i=>KeyValue(i, i.toString)).toDF > df.registerTempTable("foo") > sqlContext.sql(“select count(*) as cnt, key % 100 as key,GROUPING__ID from > foo group by key%100 with rollup”).show(100) > {code} > As a workaround, this works correctly: > {code} > val df1=df.withColumn("newkey",df("key")%100) > df1.registerTempTable("foo1") > sqlContext.sql("select count(*) as cnt, newkey as key,GROUPING__ID as grp > from foo1 group by newkey with rollup").show(100) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-8972) Incorrect result for rollup
[ https://issues.apache.org/jira/browse/SPARK-8972?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Hao updated SPARK-8972: - Description: {code:java} import sqlContext.implicits._ case class KeyValue(key: Int, value: String) val df = sc.parallelize(1 to 5).map(i=>KeyValue(i, i.toString)).toDF df.registerTempTable("foo") sqlContext.sql("select count(*) as cnt, key % 100,GROUPING__ID from foo group by key%100 with rollup").show(100) // output +---+---++ |cnt|_c1|GROUPING__ID| +---+---++ | 1| 4| 0| | 1| 4| 1| | 1| 5| 0| | 1| 5| 1| | 1| 1| 0| | 1| 1| 1| | 1| 2| 0| | 1| 2| 1| | 1| 3| 0| | 1| 3| 1| +---+---++ {code} After checking with the code, seems we does't support the complex expressions (not just simple column names) for GROUP BY keys for rollup, as well as the cube. And it even will not report it if we have complex expression in the rollup keys, hence we get very confusing result as the example above. was: {code:java} import sqlContext.implicits._ case class KeyValue(key: Int, value: String) val df = sc.parallelize(1 to 5).map(i=>KeyValue(i, i.toString)).toDF df.registerTempTable("foo") sqlContext.sql("select count(*) as cnt, key % 100,GROUPING__ID from foo group by key%100 with rollup").show(100) // output +---+---++ |cnt|_c1|GROUPING__ID| +---+---++ | 1| 4| 0| | 1| 4| 1| | 1| 5| 0| | 1| 5| 1| | 1| 1| 0| | 1| 1| 1| | 1| 2| 0| | 1| 2| 1| | 1| 3| 0| | 1| 3| 1| +---+---++ {code} > Incorrect result for rollup > --- > > Key: SPARK-8972 > URL: https://issues.apache.org/jira/browse/SPARK-8972 > Project: Spark > Issue Type: Bug > Components: SQL >Reporter: Cheng Hao >Priority: Critical > > {code:java} > import sqlContext.implicits._ > case class KeyValue(key: Int, value: String) > val df = sc.parallelize(1 to 5).map(i=>KeyValue(i, i.toString)).toDF > df.registerTempTable("foo") > sqlContext.sql("select count(*) as cnt, key % 100,GROUPING__ID from foo group > by key%100 with rollup").show(100) > // output > +---+---++ > |cnt|_c1|GROUPING__ID| > +---+---++ > | 1| 4| 0| > | 1| 4| 1| > | 1| 5| 0| > | 1| 5| 1| > | 1| 1| 0| > | 1| 1| 1| > | 1| 2| 0| > | 1| 2| 1| > | 1| 3| 0| > | 1| 3| 1| > +---+---++ > {code} > After checking with the code, seems we does't support the complex expressions > (not just simple column names) for GROUP BY keys for rollup, as well as the > cube. And it even will not report it if we have complex expression in the > rollup keys, hence we get very confusing result as the example above. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-8972) Incorrect result for rollup
[ https://issues.apache.org/jira/browse/SPARK-8972?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Hao updated SPARK-8972: - Summary: Incorrect result for rollup (was: Wrong result for rollup) > Incorrect result for rollup > --- > > Key: SPARK-8972 > URL: https://issues.apache.org/jira/browse/SPARK-8972 > Project: Spark > Issue Type: Bug > Components: SQL >Reporter: Cheng Hao >Priority: Critical > > {code:java} > import sqlContext.implicits._ > case class KeyValue(key: Int, value: String) > val df = sc.parallelize(1 to 5).map(i=>KeyValue(i, i.toString)).toDF > df.registerTempTable("foo") > sqlContext.sql("select count(*) as cnt, key % 100,GROUPING__ID from foo group > by key%100 with rollup").show(100) > // output > +---+---++ > |cnt|_c1|GROUPING__ID| > +---+---++ > | 1| 4| 0| > | 1| 4| 1| > | 1| 5| 0| > | 1| 5| 1| > | 1| 1| 0| > | 1| 1| 1| > | 1| 2| 0| > | 1| 2| 1| > | 1| 3| 0| > | 1| 3| 1| > +---+---++ > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-8972) Wrong result for rollup
Cheng Hao created SPARK-8972: Summary: Wrong result for rollup Key: SPARK-8972 URL: https://issues.apache.org/jira/browse/SPARK-8972 Project: Spark Issue Type: Bug Components: SQL Reporter: Cheng Hao Priority: Critical {code:java} import sqlContext.implicits._ case class KeyValue(key: Int, value: String) val df = sc.parallelize(1 to 5).map(i=>KeyValue(i, i.toString)).toDF df.registerTempTable("foo") sqlContext.sql("select count(*) as cnt, key % 100,GROUPING__ID from foo group by key%100 with rollup").show(100) // output +---+---++ |cnt|_c1|GROUPING__ID| +---+---++ | 1| 4| 0| | 1| 4| 1| | 1| 5| 0| | 1| 5| 1| | 1| 1| 0| | 1| 1| 1| | 1| 2| 0| | 1| 2| 1| | 1| 3| 0| | 1| 3| 1| +---+---++ {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-7119) ScriptTransform doesn't consider the output data type
[ https://issues.apache.org/jira/browse/SPARK-7119?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Hao updated SPARK-7119: - Priority: Blocker (was: Major) > ScriptTransform doesn't consider the output data type > - > > Key: SPARK-7119 > URL: https://issues.apache.org/jira/browse/SPARK-7119 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.3.0, 1.3.1, 1.4.0 >Reporter: Cheng Hao >Priority: Blocker > > {code:sql} > from (from src select transform(key, value) using 'cat' as (thing1 int, > thing2 string)) t select thing1 + 2; > {code} > {noformat} > 15/04/24 00:58:55 ERROR CliDriver: org.apache.spark.SparkException: Job > aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent > failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): > java.lang.ClassCastException: org.apache.spark.sql.types.UTF8String cannot be > cast to java.lang.Integer > at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:106) > at scala.math.Numeric$IntIsIntegral$.plus(Numeric.scala:57) > at > org.apache.spark.sql.catalyst.expressions.Add.eval(arithmetic.scala:127) > at > org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:118) > at > org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:68) > at > org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:52) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at > scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) > at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) > at scala.collection.AbstractIterator.to(Iterator.scala:1157) > at > scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) > at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) > at > scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) > at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) > at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:819) > at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:819) > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1618) > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1618) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) > at org.apache.spark.scheduler.Task.run(Task.scala:64) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:209) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) > at java.lang.Thread.run(Thread.java:722) > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-7119) ScriptTransform doesn't consider the output data type
[ https://issues.apache.org/jira/browse/SPARK-7119?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Hao updated SPARK-7119: - Target Version/s: 1.5.0 Fix Version/s: (was: 1.5.0) > ScriptTransform doesn't consider the output data type > - > > Key: SPARK-7119 > URL: https://issues.apache.org/jira/browse/SPARK-7119 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.3.0, 1.3.1, 1.4.0 >Reporter: Cheng Hao > > {code:sql} > from (from src select transform(key, value) using 'cat' as (thing1 int, > thing2 string)) t select thing1 + 2; > {code} > {noformat} > 15/04/24 00:58:55 ERROR CliDriver: org.apache.spark.SparkException: Job > aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent > failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): > java.lang.ClassCastException: org.apache.spark.sql.types.UTF8String cannot be > cast to java.lang.Integer > at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:106) > at scala.math.Numeric$IntIsIntegral$.plus(Numeric.scala:57) > at > org.apache.spark.sql.catalyst.expressions.Add.eval(arithmetic.scala:127) > at > org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:118) > at > org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:68) > at > org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:52) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at > scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) > at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) > at scala.collection.AbstractIterator.to(Iterator.scala:1157) > at > scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) > at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) > at > scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) > at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) > at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:819) > at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:819) > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1618) > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1618) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) > at org.apache.spark.scheduler.Task.run(Task.scala:64) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:209) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) > at java.lang.Thread.run(Thread.java:722) > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-7119) ScriptTransform doesn't consider the output data type
[ https://issues.apache.org/jira/browse/SPARK-7119?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Hao updated SPARK-7119: - Fix Version/s: 1.5.0 > ScriptTransform doesn't consider the output data type > - > > Key: SPARK-7119 > URL: https://issues.apache.org/jira/browse/SPARK-7119 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.3.0, 1.3.1, 1.4.0 >Reporter: Cheng Hao > Fix For: 1.5.0 > > > {code:sql} > from (from src select transform(key, value) using 'cat' as (thing1 int, > thing2 string)) t select thing1 + 2; > {code} > {noformat} > 15/04/24 00:58:55 ERROR CliDriver: org.apache.spark.SparkException: Job > aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent > failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): > java.lang.ClassCastException: org.apache.spark.sql.types.UTF8String cannot be > cast to java.lang.Integer > at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:106) > at scala.math.Numeric$IntIsIntegral$.plus(Numeric.scala:57) > at > org.apache.spark.sql.catalyst.expressions.Add.eval(arithmetic.scala:127) > at > org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:118) > at > org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:68) > at > org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:52) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at > scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) > at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) > at scala.collection.AbstractIterator.to(Iterator.scala:1157) > at > scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) > at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) > at > scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) > at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) > at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:819) > at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:819) > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1618) > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1618) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) > at org.apache.spark.scheduler.Task.run(Task.scala:64) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:209) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) > at java.lang.Thread.run(Thread.java:722) > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Issue Comment Deleted] (SPARK-8864) Date/time function and data type design
[ https://issues.apache.org/jira/browse/SPARK-8864?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cheng Hao updated SPARK-8864: - Comment: was deleted (was: Thanks for explanation. The design looks good to me now.) > Date/time function and data type design > --- > > Key: SPARK-8864 > URL: https://issues.apache.org/jira/browse/SPARK-8864 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Reynold Xin >Assignee: Reynold Xin > Fix For: 1.5.0 > > Attachments: SparkSQLdatetimeudfs (1).pdf > > > Please see the attached design doc. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-8864) Date/time function and data type design
[ https://issues.apache.org/jira/browse/SPARK-8864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14618201#comment-14618201 ] Cheng Hao commented on SPARK-8864: -- Thanks for explanation. The design looks good to me now. > Date/time function and data type design > --- > > Key: SPARK-8864 > URL: https://issues.apache.org/jira/browse/SPARK-8864 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Reynold Xin >Assignee: Reynold Xin > Fix For: 1.5.0 > > Attachments: SparkSQLdatetimeudfs (1).pdf > > > Please see the attached design doc. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-8864) Date/time function and data type design
[ https://issues.apache.org/jira/browse/SPARK-8864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14618200#comment-14618200 ] Cheng Hao commented on SPARK-8864: -- Thanks for explanation. The design looks good to me now. > Date/time function and data type design > --- > > Key: SPARK-8864 > URL: https://issues.apache.org/jira/browse/SPARK-8864 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Reynold Xin >Assignee: Reynold Xin > Fix For: 1.5.0 > > Attachments: SparkSQLdatetimeudfs (1).pdf > > > Please see the attached design doc. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-8864) Date/time function and data type design
[ https://issues.apache.org/jira/browse/SPARK-8864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14617846#comment-14617846 ] Cheng Hao commented on SPARK-8864: -- Long = 2 ^ 63 = 9.2E18, the timestamp is in us, the max value is 2*1*365*24*60*60* E7=6.3E18, so LONG will be enough for timestamp,right? And probably 8 bytes is enough for internal representation of `interval`: Let's say the first 18 bits for month, and the later 46 bits are for us. Month(18 bits): max value = 1 * 12, still less than 2^18(262,144) (the highest bit will always be 0). us(46 bits): max value is 31 * 24 * 3600 * 1E7 = 2.67E13, still less than 2^46(7.0E13) Let me know if I made mistake in the calculation. > Date/time function and data type design > --- > > Key: SPARK-8864 > URL: https://issues.apache.org/jira/browse/SPARK-8864 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Reynold Xin >Assignee: Reynold Xin > Fix For: 1.5.0 > > Attachments: SparkSQLdatetimeudfs (1).pdf > > > Please see the attached design doc. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-8883) Remove the class OverrideFunctionRegistry
Cheng Hao created SPARK-8883: Summary: Remove the class OverrideFunctionRegistry Key: SPARK-8883 URL: https://issues.apache.org/jira/browse/SPARK-8883 Project: Spark Issue Type: Improvement Components: SQL Reporter: Cheng Hao Priority: Minor The class `OverrideFunctionRegistry` is redundant since the `HiveFunctionRegistry` has its own way to the underlying registry. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-8867) Show the UDF usage for user.
Cheng Hao created SPARK-8867: Summary: Show the UDF usage for user. Key: SPARK-8867 URL: https://issues.apache.org/jira/browse/SPARK-8867 Project: Spark Issue Type: Task Components: SQL Reporter: Cheng Hao As Hive does, we need to provide the feature for user, to show the usage of a UDF. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-8159) Improve SQL/DataFrame expression coverage
[ https://issues.apache.org/jira/browse/SPARK-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14612728#comment-14612728 ] Cheng Hao commented on SPARK-8159: -- Will that possible to add all of the expressions support in a SINGLE PR for Python API and another SINGLE PR for R, after we finish all of the expressions? At least we can save the of jenkins resources compare to adding them one by one. > Improve SQL/DataFrame expression coverage > - > > Key: SPARK-8159 > URL: https://issues.apache.org/jira/browse/SPARK-8159 > Project: Spark > Issue Type: Improvement > Components: SQL >Reporter: Reynold Xin > > This is an umbrella ticket to track new expressions we are adding to > SQL/DataFrame. > For each new expression, we should: > 1. Add a new Expression implementation in > org.apache.spark.sql.catalyst.expressions > 2. If applicable, implement the code generated version (by implementing > genCode). > 3. Add comprehensive unit tests (for all the data types the expressions > support). > 4. If applicable, add a new function for DataFrame in > org.apache.spark.sql.functions, and python/pyspark/sql/functions.py for > Python. > For date/time functions, put them in expressions/datetime.scala, and create a > DateTimeFunctionSuite.scala for testing. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-8791) Make a better hashcode for InternalRow
Cheng Hao created SPARK-8791: Summary: Make a better hashcode for InternalRow Key: SPARK-8791 URL: https://issues.apache.org/jira/browse/SPARK-8791 Project: Spark Issue Type: Improvement Components: SQL Reporter: Cheng Hao Priority: Minor Currently, the InternalRow doesn't support well for complex data type while getting the hashCode. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-8653) Add constraint for Children expression for data type
[ https://issues.apache.org/jira/browse/SPARK-8653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14609629#comment-14609629 ] Cheng Hao commented on SPARK-8653: -- What do you think [~rxin]? > Add constraint for Children expression for data type > > > Key: SPARK-8653 > URL: https://issues.apache.org/jira/browse/SPARK-8653 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Cheng Hao >Assignee: Reynold Xin > > Currently, we have trait in Expression like `ExpectsInputTypes` and also the > `checkInputDataTypes`, but can not convert the children expressions > automatically, except we write the new rules in the `HiveTypeCoercion`. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-8653) Add constraint for Children expression for data type
[ https://issues.apache.org/jira/browse/SPARK-8653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14609627#comment-14609627 ] Cheng Hao commented on SPARK-8653: -- Yes, I agree that we cannot make a clear cut, as people can code the expression in their faviorate way. I am wondering if we can merge my PR first, at least it works for the correct data casting, so we can finish the Hive Expression rewriting job ASAP. And we can refactor the code whenever we have a better idea to simplify that. > Add constraint for Children expression for data type > > > Key: SPARK-8653 > URL: https://issues.apache.org/jira/browse/SPARK-8653 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Cheng Hao >Assignee: Reynold Xin > > Currently, we have trait in Expression like `ExpectsInputTypes` and also the > `checkInputDataTypes`, but can not convert the children expressions > automatically, except we write the new rules in the `HiveTypeCoercion`. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-8653) Add constraint for Children expression for data type
[ https://issues.apache.org/jira/browse/SPARK-8653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14609609#comment-14609609 ] Cheng Hao commented on SPARK-8653: -- For most of the Mathematical expressions, we can get the generic rule for the data type casting implicitly. But seems arbitrary rules for the others. For example, there are lots of custom rules for `Coalesce`, Conditional expressions, (CaseWhen, In, If etc.) already, we probably don't want to keep growing the rules in the `HiveTypeCoercion.scala`. So probably it's a good a idea to expose the data type casting to expression itself, and the UDAF, UDTF will also be benifit from that. Additionally, if we want to make the `Expression` as API for extension in the future, it will be a block issue if the data type casting is only allowed to be done in `HiveTypeCoercion.scala`. > Add constraint for Children expression for data type > > > Key: SPARK-8653 > URL: https://issues.apache.org/jira/browse/SPARK-8653 > Project: Spark > Issue Type: Sub-task > Components: SQL >Reporter: Cheng Hao >Assignee: Reynold Xin > > Currently, we have trait in Expression like `ExpectsInputTypes` and also the > `checkInputDataTypes`, but can not convert the children expressions > automatically, except we write the new rules in the `HiveTypeCoercion`. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org