[jira] [Created] (SPARK-23306) Race condition in TaskMemoryManager
Zhan Zhang created SPARK-23306: -- Summary: Race condition in TaskMemoryManager Key: SPARK-23306 URL: https://issues.apache.org/jira/browse/SPARK-23306 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 2.2.1 Reporter: Zhan Zhang There is race condition in TaskMemoryManger, which may cause OOM. The memory released may be taken by another task because there is a gap between releaseMemory and acquireMemory, e.g., UnifiedMemoryManager, causing the OOM. if the current is the only one that can perform spill. It can happen to BytesToBytesMap, as it only spill required bytes. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21505) A dynamic join operator to improve the join reliability
[ https://issues.apache.org/jira/browse/SPARK-21505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16236365#comment-16236365 ] Zhan Zhang commented on SPARK-21505: Any comments on this feature? Do you think the design is OK? If so, we are going to submit a PR. > A dynamic join operator to improve the join reliability > --- > > Key: SPARK-21505 > URL: https://issues.apache.org/jira/browse/SPARK-21505 > Project: Spark > Issue Type: New Feature > Components: SQL >Affects Versions: 2.2.0, 2.3.0, 3.0.0 >Reporter: Lin >Priority: Major > Labels: features > > As we know, hash join is more efficient than sort merge join. But today hash > join is not so widely used because it may fail with OutOfMemory (OOM) error > due to limited memory resource, data skew, statistics mis-estimation and so > on. For example, if we apply shuffle hash join on an uneven distributed > dataset, some partitions might be so large that we cannot make a Hash table > for this particular partition causing OOM error. When OOM happens, current > Spark technology will throw an Exception, resulting in job failure. On the > other hand, if sort-merge join is used, there will be shuffle, sorting and > extra spill, causing the degradation of the join. Considering the efficiency > of hash join, we want to propose a fallback mechanism to dynamically use hash > join or sort-merge join at runtime at task level to provide a more reliable > join operation. > This new dynamic join operator internally implements the logic of HashJoin, > Iterator Reconstruct, Sort, and MergeJoin. We show the process of this > dynamic join method as following: > HashJoin: We start from building Hash table on one side of join partitions. > If Hash table is built successfully, it would be the same as the current > ShuffledHashJoin operator. > Sort: If we fail to build Hash table due to the large partition size, we do > SortMergeJoin only on this partition. But we need to rebuild the When OOM > happens, a Hash table corresponding to partial part of this partition has > been built successfully (e.g. first 4000 rows of RDD), and the iterator of > this partition is now pointing to the 4001st row of partition. We reuse this > hash table to reconstruct the iterator for the first 4000 rows and > concatenate with the rest rows of this partition so that we can rebuild this > partition completely. On this re-built partition, we apply sorting based on > key values. > MergeJoin: After getting two sorted Iterators, we perform regular merge join > against them and emits the records to downstream operators. > Iterator Reconstruct: BytesToBytesMap has to be spilled to disk to release > the memory for other operators, such as Sort, Join, etc. In addition, it has > to be converted to Iterator, so that it can be concatenated with remaining > items in the original iterator that is used to build the hash table. > Meta Data Population: Necessary metadata, such as sorting keys, jointype, > etc, has to be populated, so that they are used for potential Sort and > MergeJoin operator. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21492) Memory leak in SortMergeJoin
[ https://issues.apache.org/jira/browse/SPARK-21492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16095425#comment-16095425 ] Zhan Zhang commented on SPARK-21492: root cause: In the SortMergeJoin, inner/leftOuter/rightOuter, one side of the SortedIter may not exhausted, that chunk of the memory thus cannot be released, causing memory leak and performance degradtion. > Memory leak in SortMergeJoin > > > Key: SPARK-21492 > URL: https://issues.apache.org/jira/browse/SPARK-21492 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.0.0 >Reporter: Zhan Zhang > > In SortMergeJoin, if the iterator is not exhausted, there will be memory leak > caused by the Sort. The memory is not released until the task end, and cannot > be used by other operators causing performance drop or OOM. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-21492) Memory leak in SortMergeJoin
Zhan Zhang created SPARK-21492: -- Summary: Memory leak in SortMergeJoin Key: SPARK-21492 URL: https://issues.apache.org/jira/browse/SPARK-21492 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.0.0 Reporter: Zhan Zhang In SortMergeJoin, if the iterator is not exhausted, there will be memory leak caused by the Sort. The memory is not released until the task end, and cannot be used by other operators causing performance drop or OOM. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Issue Comment Deleted] (SPARK-20215) ReuseExchange is boken in SparkSQL
[ https://issues.apache.org/jira/browse/SPARK-20215?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhan Zhang updated SPARK-20215: --- Comment: was deleted (was: Seems to be fixed in SPARK-20229) > ReuseExchange is boken in SparkSQL > -- > > Key: SPARK-20215 > URL: https://issues.apache.org/jira/browse/SPARK-20215 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.0 >Reporter: Zhan Zhang >Priority: Minor > > Currently if we have query like: A join B Union A join C... with the same > join key. Table A will be scanned multiple times in sql. It is because the > megastoreRelation are not shared by two joins, and ExprId is different. > canonicalized in Expression will not be able to unify them and two Exchange > will not compatible and cannot be reused. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20215) ReuseExchange is boken in SparkSQL
[ https://issues.apache.org/jira/browse/SPARK-20215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15968734#comment-15968734 ] Zhan Zhang commented on SPARK-20215: Seems to be fixed in SPARK-20229 > ReuseExchange is boken in SparkSQL > -- > > Key: SPARK-20215 > URL: https://issues.apache.org/jira/browse/SPARK-20215 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.0 >Reporter: Zhan Zhang >Priority: Minor > > Currently if we have query like: A join B Union A join C... with the same > join key. Table A will be scanned multiple times in sql. It is because the > megastoreRelation are not shared by two joins, and ExprId is different. > canonicalized in Expression will not be able to unify them and two Exchange > will not compatible and cannot be reused. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-20215) ReuseExchange is boken in SparkSQL
Zhan Zhang created SPARK-20215: -- Summary: ReuseExchange is boken in SparkSQL Key: SPARK-20215 URL: https://issues.apache.org/jira/browse/SPARK-20215 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.1.0 Reporter: Zhan Zhang Priority: Minor Currently if we have query like: A join B Union A join C... with the same join key. Table A will be scanned multiple times in sql. It is because the megastoreRelation are not shared by two joins, and ExprId is different. canonicalized in Expression will not be able to unify them and two Exchange will not compatible and cannot be reused. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-20006) Separate threshold for broadcast and shuffled hash join
[ https://issues.apache.org/jira/browse/SPARK-20006?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15931054#comment-15931054 ] Zhan Zhang edited comment on SPARK-20006 at 3/18/17 4:42 AM: - The default ShuffledHashJoin threshold can fallback to the broadcast one. A separate configuration does provide us opportunities to optimize the join dramatically. It would be great if CBO can automatically find the best strategy. But probably I miss something. Currently the CBO does not collect right statistics, especially for partitioned table. I have opened a JIRA for that issue as well. https://issues.apache.org/jira/browse/SPARK-19890 was (Author: zhzhan): The default ShuffledHashJoin threshold can fallback to the broadcast one. A separate configuration does provide us opportunities to optimize the join dramatically. It would be great if CBO can automatically find the best strategy. But probably I miss something. Currently the CBO does not collect right statistics, especially for partitioned table. https://issues.apache.org/jira/browse/SPARK-19890 > Separate threshold for broadcast and shuffled hash join > --- > > Key: SPARK-20006 > URL: https://issues.apache.org/jira/browse/SPARK-20006 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.0 >Reporter: Zhan Zhang >Priority: Minor > > Currently both canBroadcast and canBuildLocalHashMap use the same > configuration: AUTO_BROADCASTJOIN_THRESHOLD. > But the memory model may be different. For broadcast, currently the hash map > is always build on heap. For shuffledHashJoin, the hash map may be build on > heap(longHash), or off heap(other map if off heap is enabled). The same > configuration makes the configuration hard to tune (how to allocate memory > onheap/offheap). Propose to use different configuration. Please comments > whether it is reasonable. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20006) Separate threshold for broadcast and shuffled hash join
[ https://issues.apache.org/jira/browse/SPARK-20006?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15931054#comment-15931054 ] Zhan Zhang commented on SPARK-20006: The default ShuffledHashJoin threshold can fallback to the broadcast one. A separate configuration does provide us opportunities to optimize the join dramatically. It would be great if CBO can automatically find the best strategy. But probably I miss something. Currently the CBO does not collect right statistics, especially for partitioned table. https://issues.apache.org/jira/browse/SPARK-19890 > Separate threshold for broadcast and shuffled hash join > --- > > Key: SPARK-20006 > URL: https://issues.apache.org/jira/browse/SPARK-20006 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.0 >Reporter: Zhan Zhang >Priority: Minor > > Currently both canBroadcast and canBuildLocalHashMap use the same > configuration: AUTO_BROADCASTJOIN_THRESHOLD. > But the memory model may be different. For broadcast, currently the hash map > is always build on heap. For shuffledHashJoin, the hash map may be build on > heap(longHash), or off heap(other map if off heap is enabled). The same > configuration makes the configuration hard to tune (how to allocate memory > onheap/offheap). Propose to use different configuration. Please comments > whether it is reasonable. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-20006) Separate threshold for broadcast and shuffled hash join
[ https://issues.apache.org/jira/browse/SPARK-20006?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhan Zhang updated SPARK-20006: --- Description: Currently both canBroadcast and canBuildLocalHashMap use the same configuration: AUTO_BROADCASTJOIN_THRESHOLD. But the memory model may be different. For broadcast, currently the hash map is always build on heap. For shuffledHashJoin, the hash map may be build on heap(longHash), or off heap(other map if off heap is enabled). The same configuration makes the configuration hard to tune (how to allocate memory onheap/offheap). Propose to use different configuration. Please comments whether it is reasonable. was: Currently both canBroadcast and canBuildLocalHashMap use the same configuration: AUTO_BROADCASTJOIN_THRESHOLD. But the memory model may be different. For broadcast, currently the hash map is always build on heap. For shuffledHashJoin, the hash map may be build on heap(longHash), or off heap(other map if off heap is enabled). The same configuration makes the configuration hard to tune (how to allocate memory onheap/offheap). Propose to use different configuration. > Separate threshold for broadcast and shuffled hash join > --- > > Key: SPARK-20006 > URL: https://issues.apache.org/jira/browse/SPARK-20006 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.0 >Reporter: Zhan Zhang >Priority: Minor > > Currently both canBroadcast and canBuildLocalHashMap use the same > configuration: AUTO_BROADCASTJOIN_THRESHOLD. > But the memory model may be different. For broadcast, currently the hash map > is always build on heap. For shuffledHashJoin, the hash map may be build on > heap(longHash), or off heap(other map if off heap is enabled). The same > configuration makes the configuration hard to tune (how to allocate memory > onheap/offheap). Propose to use different configuration. Please comments > whether it is reasonable. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-20006) Separate threshold for broadcast and shuffled hash join
Zhan Zhang created SPARK-20006: -- Summary: Separate threshold for broadcast and shuffled hash join Key: SPARK-20006 URL: https://issues.apache.org/jira/browse/SPARK-20006 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.1.0 Reporter: Zhan Zhang Priority: Minor Currently both canBroadcast and canBuildLocalHashMap use the same configuration: AUTO_BROADCASTJOIN_THRESHOLD. But the memory model may be different. For broadcast, currently the hash map is always build on heap. For shuffledHashJoin, the hash map may be build on heap(longHash), or off heap(other map if off heap is enabled). The same configuration makes the configuration hard to tune (how to allocate memory onheap/offheap). Propose to use different configuration. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-19908) Direct buffer memory OOM should not cause stage retries.
Zhan Zhang created SPARK-19908: -- Summary: Direct buffer memory OOM should not cause stage retries. Key: SPARK-19908 URL: https://issues.apache.org/jira/browse/SPARK-19908 Project: Spark Issue Type: Bug Components: Shuffle Affects Versions: 2.1.0 Reporter: Zhan Zhang Priority: Minor Currently if there is java.lang.OutOfMemoryError: Direct buffer memory, the exception will be changed to FetchFailedException, causing stage retries. org.apache.spark.shuffle.FetchFailedException: Direct buffer memory at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:357) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:332) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:54) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:40) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) at org.apache.spark.sql.execution.RowIteratorFromScala.advanceNext(RowIterator.scala:83) at org.apache.spark.sql.execution.joins.SortMergeJoinScanner.advancedStreamed(SortMergeJoinExec.scala:731) at org.apache.spark.sql.execution.joins.SortMergeJoinScanner.findNextOuterJoinRows(SortMergeJoinExec.scala:692) at org.apache.spark.sql.execution.joins.OneSideOuterIterator.advanceStream(SortMergeJoinExec.scala:854) at org.apache.spark.sql.execution.joins.OneSideOuterIterator.advanceNext(SortMergeJoinExec.scala:887) at org.apache.spark.sql.execution.RowIteratorToScala.hasNext(RowIterator.scala:68) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:161) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) at org.apache.spark.scheduler.Task.run(Task.scala:86) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:278) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.OutOfMemoryError: Direct buffer memory at java.nio.Bits.reserveMemory(Bits.java:658) at java.nio.DirectByteBuffer.(DirectByteBuffer.java:123) at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311) at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:645) at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:228) at io.netty.buffer.PoolArena.allocate(PoolArena.java:212) at io.netty.buffer.PoolArena.allocate(PoolArena.java:132) at io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:271) at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:155) at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:146) at io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:107) at io.netty.channel.AdaptiveRecvByteBufAllocator$HandleImpl.allocate(AdaptiveRecvByteBufAllocator.java:104) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:117) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at
[jira] [Updated] (SPARK-19890) Make MetastoreRelation statistics estimation more accurately
[ https://issues.apache.org/jira/browse/SPARK-19890?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhan Zhang updated SPARK-19890: --- Description: Currently the MetastoreRelation statistics is retrieved on the analyze phase, and the size is based on the table scope. But for partitioned table, this statistics is not useful as table size may 100x+ larger than the input partition size. As a result, the join optimization techniques is not applicable. It would be great if we can postpone the statistics to the optimization phase to get partition information but before physical plan generation phase so that JoinSelection can choose better join methd (broadcast, shuffledjoin, or sortmerjoin). Although the metastorerelation does not associated with partitions, but through PhysicalOperation we can get the partition info for the table. Multiple plan can use the same meatstorerelation, but the estimation is still much better than table size. This way, retrieving statistics is straightforward. Another possible way is to have a another data structure associating the metastore relation and partitions with the plan to get most accurate estimation. was: Currently the MetastoreRelation statistics is retrieved on the analyze phase, and the size is based on the table scope. But for partitioned table, this statistics is not useful as table size may 100x+ larger than the input partition size. As a result, the join optimization techniques is not applicable. It would be great if we can postpone the statistics to the optimization phase to get partition information but before physical plan generation phase so that JoinSelection can choose better join methd (broadcast, shuffledjoin, or sortmerjoin). Although the metastorerelation does not associated with partitions, but through PhysicalOperation we can get the partition info for the table. Although multiple plan can use the same meatstorerelation, but the estimation still much better than table size. This way, retrieving statistics is straightforward. Another possible way is to have a another data structure associating the metastore relation and partitions with the plan to get most accurate estimation. > Make MetastoreRelation statistics estimation more accurately > > > Key: SPARK-19890 > URL: https://issues.apache.org/jira/browse/SPARK-19890 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.0 >Reporter: Zhan Zhang >Priority: Minor > > Currently the MetastoreRelation statistics is retrieved on the analyze phase, > and the size is based on the table scope. But for partitioned table, this > statistics is not useful as table size may 100x+ larger than the input > partition size. As a result, the join optimization techniques is not > applicable. > It would be great if we can postpone the statistics to the optimization phase > to get partition information but before physical plan generation phase so > that JoinSelection can choose better join methd (broadcast, shuffledjoin, or > sortmerjoin). > Although the metastorerelation does not associated with partitions, but > through PhysicalOperation we can get the partition info for the table. > Multiple plan can use the same meatstorerelation, but the estimation is still > much better than table size. This way, retrieving statistics is > straightforward. > Another possible way is to have a another data structure associating the > metastore relation and partitions with the plan to get most accurate > estimation. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-19890) Make MetastoreRelation statistics estimation more accurately
Zhan Zhang created SPARK-19890: -- Summary: Make MetastoreRelation statistics estimation more accurately Key: SPARK-19890 URL: https://issues.apache.org/jira/browse/SPARK-19890 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.1.0 Reporter: Zhan Zhang Priority: Minor Currently the MetastoreRelation statistics is retrieved on the analyze phase, and the size is based on the table scope. But for partitioned table, this statistics is not useful as table size may 100x+ larger than the input partition size. As a result, the join optimization techniques is not applicable. It would be great if we can postpone the statistics to the optimization phase to get partition information but before physical plan generation phase so that JoinSelection can choose better join methd (broadcast, shuffledjoin, or sortmerjoin). Although the metastorerelation does not associated with partitions, but through PhysicalOperation we can get the partition info for the table. Although multiple plan can use the same meatstorerelation, but the estimation still much better than table size. This way, retrieving statistics is straightforward. Another possible way is to have a another data structure associating the metastore relation and partitions with the plan to get most accurate estimation. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19839) Fix memory leak in BytesToBytesMap
[ https://issues.apache.org/jira/browse/SPARK-19839?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897975#comment-15897975 ] Zhan Zhang commented on SPARK-19839: When BytesToBytesMap spills, its longArray should be released. Otherwise, it may not released until the task complete. This array may take a significant amount of memory, which cannot be used by later operator, such as UnsafeShuffleExternalSorter, resulting in more frequent spill in sorter. This patch release the array as destructive iterator will not use this array anymore. > Fix memory leak in BytesToBytesMap > -- > > Key: SPARK-19839 > URL: https://issues.apache.org/jira/browse/SPARK-19839 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.0 >Reporter: Zhan Zhang > -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-19839) Fix memory leak in BytesToBytesMap
Zhan Zhang created SPARK-19839: -- Summary: Fix memory leak in BytesToBytesMap Key: SPARK-19839 URL: https://issues.apache.org/jira/browse/SPARK-19839 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 2.1.0 Reporter: Zhan Zhang -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19815) Not orderable should be applied to right key instead of left key
[ https://issues.apache.org/jira/browse/SPARK-19815?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15895522#comment-15895522 ] Zhan Zhang commented on SPARK-19815: I am thinking the logic again. On the surface, the logic may be correct. Since in the join, the left and right key should be the same type. Please close this JIRA. > Not orderable should be applied to right key instead of left key > > > Key: SPARK-19815 > URL: https://issues.apache.org/jira/browse/SPARK-19815 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.0 >Reporter: Zhan Zhang >Priority: Minor > > When generating ShuffledHashJoinExec, the orderable condition should be > applied to right key instead of left key. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-19815) Not orderable should be applied to right key instead of left key
[ https://issues.apache.org/jira/browse/SPARK-19815?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhan Zhang updated SPARK-19815: --- Summary: Not orderable should be applied to right key instead of left key (was: Not order able should be applied to right key instead of left key) > Not orderable should be applied to right key instead of left key > > > Key: SPARK-19815 > URL: https://issues.apache.org/jira/browse/SPARK-19815 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.0 >Reporter: Zhan Zhang >Priority: Minor > > When generating ShuffledHashJoinExec, the orderable condition should be > applied to right key instead of left key. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-19815) Not order able should be applied to right key instead of left key
Zhan Zhang created SPARK-19815: -- Summary: Not order able should be applied to right key instead of left key Key: SPARK-19815 URL: https://issues.apache.org/jira/browse/SPARK-19815 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.1.0 Reporter: Zhan Zhang Priority: Minor When generating ShuffledHashJoinExec, the orderable condition should be applied to right key instead of left key. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19354) Killed tasks are getting marked as FAILED
[ https://issues.apache.org/jira/browse/SPARK-19354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15862588#comment-15862588 ] Zhan Zhang commented on SPARK-19354: This fix is actually critical. In production, we found that this behavior can cause job retry and failure especially speculation is enabled. /cc [~rxin] Specifically we observe that: When sorter spill to disk, the task is killed. Then a interruptedExecption is thrown. Then OOM will be thrown, which cause the unhandledexception in executor, and eventually shutdown the executor. It happens a lot in speculative tasks. With healthy tasks in the same executor marked as failed as well. Retries will happen. Even worse, such retries may fail again due to same reason, eventually causing job failure. 17/02/11 15:39:38 ERROR TaskMemoryManager: error while calling spill() on org.apache.spark.shuffle.sort.ShuffleExternalSorter@714b17b0 java.nio.channels.ClosedByInterruptException at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202) at sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:269) at org.apache.spark.storage.DiskBlockObjectWriter.commitAndGet(DiskBlockObjectWriter.scala:178) at org.apache.spark.shuffle.sort.ShuffleExternalSorter.writeSortedFile(ShuffleExternalSorter.java:186) at org.apache.spark.shuffle.sort.ShuffleExternalSorter.spill(ShuffleExternalSorter.java:254) at org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:171) at org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:245) at org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:121) at org.apache.spark.shuffle.sort.ShuffleExternalSorter.acquireNewPageIfNecessary(ShuffleExternalSorter.java:359) at org.apache.spark.shuffle.sort.ShuffleExternalSorter.insertRecord(ShuffleExternalSorter.java:382) at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.insertRecordIntoSorter(UnsafeShuffleWriter.java:241) at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:162) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) at org.apache.spark.scheduler.Task.run(Task.scala:86) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:278) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) java.lang.OutOfMemoryError: error while calling spill() on org.apache.spark.shuffle.sort.ShuffleExternalSorter@714b17b0 : null at org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:180) at org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:245) at org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:121) at org.apache.spark.shuffle.sort.ShuffleExternalSorter.acquireNewPageIfNecessary(ShuffleExternalSorter.java:359) at org.apache.spark.shuffle.sort.ShuffleExternalSorter.insertRecord(ShuffleExternalSorter.java:382) at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.insertRecordIntoSorter(UnsafeShuffleWriter.java:241) at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:162) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) at org.apache.spark.scheduler.Task.run(Task.scala:86) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:278) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) > Killed tasks are getting marked as FAILED > - > > Key: SPARK-19354 > URL: https://issues.apache.org/jira/browse/SPARK-19354 > Project: Spark > Issue Type: Improvement > Components: Scheduler, Spark Core >Reporter: Devaraj K >Priority: Minor > > When we enable speculation, we can see there are multiple attempts running > for the same task when the first task progress is slow. If any of the task > attempt succeeds then the other attempts will be killed, during killing the > attempts those attempts are getting marked as failed due to the below error. > We need to handle this error and mark the attempt as KILLED instead of FAILED. > ||93 ||214
[jira] [Commented] (SPARK-13450) SortMergeJoin will OOM when join rows have lot of same keys
[ https://issues.apache.org/jira/browse/SPARK-13450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15812550#comment-15812550 ] Zhan Zhang commented on SPARK-13450: ExternalAppendOnlyMap estimate the size of the data saved. In SortMergeJoin, I think we can leverage UnsafeExternalSorter to get more accurate and controllable behavior. > SortMergeJoin will OOM when join rows have lot of same keys > --- > > Key: SPARK-13450 > URL: https://issues.apache.org/jira/browse/SPARK-13450 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.0, 2.0.2, 2.1.0 >Reporter: Hong Shen > Attachments: heap-dump-analysis.png > > > When I run a sql with join, task throw java.lang.OutOfMemoryError and sql > failed. I have set spark.executor.memory 4096m. > SortMergeJoin use a ArrayBuffer[InternalRow] to store bufferedMatches, if > the join rows have a lot of same key, it will throw OutOfMemoryError. > {code} > /** Buffered rows from the buffered side of the join. This is empty if > there are no matches. */ > private[this] val bufferedMatches: ArrayBuffer[InternalRow] = new > ArrayBuffer[InternalRow] > {code} > Here is the stackTrace: > {code} > org.xerial.snappy.SnappyNative.arrayCopy(Native Method) > org.xerial.snappy.Snappy.arrayCopy(Snappy.java:84) > org.xerial.snappy.SnappyInputStream.rawRead(SnappyInputStream.java:190) > org.xerial.snappy.SnappyInputStream.read(SnappyInputStream.java:163) > java.io.DataInputStream.readFully(DataInputStream.java:195) > java.io.DataInputStream.readLong(DataInputStream.java:416) > org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillReader.loadNext(UnsafeSorterSpillReader.java:71) > org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillMerger$2.loadNext(UnsafeSorterSpillMerger.java:79) > org.apache.spark.sql.execution.UnsafeExternalRowSorter$1.next(UnsafeExternalRowSorter.java:136) > org.apache.spark.sql.execution.UnsafeExternalRowSorter$1.next(UnsafeExternalRowSorter.java:123) > org.apache.spark.sql.execution.RowIteratorFromScala.advanceNext(RowIterator.scala:84) > org.apache.spark.sql.execution.joins.SortMergeJoinScanner.advancedBufferedToRowWithNullFreeJoinKey(SortMergeJoin.scala:300) > org.apache.spark.sql.execution.joins.SortMergeJoinScanner.bufferMatchingRows(SortMergeJoin.scala:329) > org.apache.spark.sql.execution.joins.SortMergeJoinScanner.findNextInnerJoinRows(SortMergeJoin.scala:229) > org.apache.spark.sql.execution.joins.SortMergeJoin$$anonfun$doExecute$1$$anon$1.advanceNext(SortMergeJoin.scala:105) > org.apache.spark.sql.execution.RowIteratorToScala.hasNext(RowIterator.scala:68) > scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:88) > org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:86) > org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:741) > org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:741) > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:337) > org.apache.spark.rdd.RDD.iterator(RDD.scala:301) > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:337) > org.apache.spark.rdd.RDD.iterator(RDD.scala:301) > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) > org.apache.spark.scheduler.Task.run(Task.scala:89) > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:215) > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > java.lang.Thread.run(Thread.java:744) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18637) Stateful UDF should be considered as nondeterministic
[ https://issues.apache.org/jira/browse/SPARK-18637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15706961#comment-15706961 ] Zhan Zhang commented on SPARK-18637: [~hvanhovell] It is an annotation. /** * UDFType annotations are used to describe properties of a UDF. This gives * important information to the optimizer. * If the UDF is not deterministic, or if it is stateful, it is necessary to * annotate it as such for correctness. * */ @Public @Evolving @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Inherited public @interface UDFType { /** * Certain optimizations should not be applied if UDF is not deterministic. * Deterministic UDF returns same result each time it is invoked with a * particular input. This determinism just needs to hold within the context of * a query. * * @return true if the UDF is deterministic */ boolean deterministic() default true; /** * If a UDF stores state based on the sequence of records it has processed, it * is stateful. A stateful UDF cannot be used in certain expressions such as * case statement and certain optimizations such as AND/OR short circuiting * don't apply for such UDFs, as they need to be invoked for each record. * row_sequence is an example of stateful UDF. A stateful UDF is considered to * be non-deterministic, irrespective of what deterministic() returns. * * @return true */ boolean stateful() default false; /** * A UDF is considered distinctLike if the UDF can be evaluated on just the * distinct values of a column. Examples include min and max UDFs. This * information is used by metadata-only optimizer. * * @return true if UDF is distinctLike */ boolean distinctLike() default false; /** * Using in analytical functions to specify that UDF implies an ordering * * @return true if the function implies order */ boolean impliesOrder() default false; } > Stateful UDF should be considered as nondeterministic > - > > Key: SPARK-18637 > URL: https://issues.apache.org/jira/browse/SPARK-18637 > Project: Spark > Issue Type: Bug > Components: SQL >Reporter: Zhan Zhang > > If the annotation UDFType of a udf is stateful, it shoudl be considered as > non-deterministic. Otherwise, the catalyst may optimize the plan and return > the wrong result. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-18637) Stateful UDF should be considered as nondeterministic
[ https://issues.apache.org/jira/browse/SPARK-18637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15706961#comment-15706961 ] Zhan Zhang edited comment on SPARK-18637 at 11/29/16 11:52 PM: --- [~hvanhovell] It is an annotation. /** * UDFType annotations are used to describe properties of a UDF. This gives * important information to the optimizer. * If the UDF is not deterministic, or if it is stateful, it is necessary to * annotate it as such for correctness. * */ was (Author: zhzhan): [~hvanhovell] It is an annotation. /** * UDFType annotations are used to describe properties of a UDF. This gives * important information to the optimizer. * If the UDF is not deterministic, or if it is stateful, it is necessary to * annotate it as such for correctness. * */ @Public @Evolving @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Inherited public @interface UDFType { /** * Certain optimizations should not be applied if UDF is not deterministic. * Deterministic UDF returns same result each time it is invoked with a * particular input. This determinism just needs to hold within the context of * a query. * * @return true if the UDF is deterministic */ boolean deterministic() default true; /** * If a UDF stores state based on the sequence of records it has processed, it * is stateful. A stateful UDF cannot be used in certain expressions such as * case statement and certain optimizations such as AND/OR short circuiting * don't apply for such UDFs, as they need to be invoked for each record. * row_sequence is an example of stateful UDF. A stateful UDF is considered to * be non-deterministic, irrespective of what deterministic() returns. * * @return true */ boolean stateful() default false; /** * A UDF is considered distinctLike if the UDF can be evaluated on just the * distinct values of a column. Examples include min and max UDFs. This * information is used by metadata-only optimizer. * * @return true if UDF is distinctLike */ boolean distinctLike() default false; /** * Using in analytical functions to specify that UDF implies an ordering * * @return true if the function implies order */ boolean impliesOrder() default false; } > Stateful UDF should be considered as nondeterministic > - > > Key: SPARK-18637 > URL: https://issues.apache.org/jira/browse/SPARK-18637 > Project: Spark > Issue Type: Bug > Components: SQL >Reporter: Zhan Zhang > > If the annotation UDFType of a udf is stateful, it shoudl be considered as > non-deterministic. Otherwise, the catalyst may optimize the plan and return > the wrong result. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-18637) Stateful UDF should be considered as nondeterministic
[ https://issues.apache.org/jira/browse/SPARK-18637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhan Zhang updated SPARK-18637: --- Component/s: SQL > Stateful UDF should be considered as nondeterministic > - > > Key: SPARK-18637 > URL: https://issues.apache.org/jira/browse/SPARK-18637 > Project: Spark > Issue Type: Bug > Components: SQL >Reporter: Zhan Zhang > > If the annotation UDFType of a udf is stateful, it shoudl be considered as > non-deterministic. Otherwise, the catalyst may optimize the plan and return > the wrong result. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18637) Stateful UDF should be considered as nondeterministic
[ https://issues.apache.org/jira/browse/SPARK-18637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15706905#comment-15706905 ] Zhan Zhang commented on SPARK-18637: Here is the comments from UDFType /** * If a UDF stores state based on the sequence of records it has processed, it * is stateful. A stateful UDF cannot be used in certain expressions such as * case statement and certain optimizations such as AND/OR short circuiting * don't apply for such UDFs, as they need to be invoked for each record. * row_sequence is an example of stateful UDF. A stateful UDF is considered to * be non-deterministic, irrespective of what deterministic() returns. * * @return true */ boolean stateful() default false; > Stateful UDF should be considered as nondeterministic > - > > Key: SPARK-18637 > URL: https://issues.apache.org/jira/browse/SPARK-18637 > Project: Spark > Issue Type: Bug >Reporter: Zhan Zhang > > If the annotation UDFType of a udf is stateful, it shoudl be considered as > non-deterministic. Otherwise, the catalyst may optimize the plan and return > the wrong result. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-18637) Stateful UDF should be considered as nondeterministic
Zhan Zhang created SPARK-18637: -- Summary: Stateful UDF should be considered as nondeterministic Key: SPARK-18637 URL: https://issues.apache.org/jira/browse/SPARK-18637 Project: Spark Issue Type: Bug Reporter: Zhan Zhang If the annotation UDFType of a udf is stateful, it shoudl be considered as non-deterministic. Otherwise, the catalyst may optimize the plan and return the wrong result. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-18550) Make the queue capacity of LiveListenerBus configurable.
[ https://issues.apache.org/jira/browse/SPARK-18550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15688361#comment-15688361 ] Zhan Zhang commented on SPARK-18550: I was not aware it has been fixed already. Please help to close it. > Make the queue capacity of LiveListenerBus configurable. > > > Key: SPARK-18550 > URL: https://issues.apache.org/jira/browse/SPARK-18550 > Project: Spark > Issue Type: Bug > Components: Spark Core >Reporter: Zhan Zhang >Priority: Minor > > We meet issues that driver listener bus cannot catch up the speed of incoming > event. Current value is fixed as 1000. This value should be configurable per > job. Otherwise, when event is dropped, the UI is totally useless. > Bus: Dropping SparkListenerEvent because no remaining room in event queue. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-18550) Make the queue capacity of LiveListenerBus configurable.
Zhan Zhang created SPARK-18550: -- Summary: Make the queue capacity of LiveListenerBus configurable. Key: SPARK-18550 URL: https://issues.apache.org/jira/browse/SPARK-18550 Project: Spark Issue Type: Bug Components: Spark Core Reporter: Zhan Zhang Priority: Minor We meet issues that driver listener bus cannot catch up the speed of incoming event. Current value is fixed as 1000. This value should be configurable per job. Otherwise, when event is dropped, the UI is totally useless. Bus: Dropping SparkListenerEvent because no remaining room in event queue. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17637) Packed scheduling for Spark tasks across executors
[ https://issues.apache.org/jira/browse/SPARK-17637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15687474#comment-15687474 ] Zhan Zhang commented on SPARK-17637: [~hvanhovell] Thanks. PR is updated with conflicts resolved. > Packed scheduling for Spark tasks across executors > -- > > Key: SPARK-17637 > URL: https://issues.apache.org/jira/browse/SPARK-17637 > Project: Spark > Issue Type: Improvement > Components: Scheduler >Reporter: Zhan Zhang >Assignee: Zhan Zhang >Priority: Minor > > Currently Spark scheduler implements round robin scheduling for tasks to > executors. Which is great as it distributes the load evenly across the > cluster, but this leads to significant resource waste in some cases, > especially when dynamic allocation is enabled. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17637) Packed scheduling for Spark tasks across executors
[ https://issues.apache.org/jira/browse/SPARK-17637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15516851#comment-15516851 ] Zhan Zhang commented on SPARK-17637: [~jerryshao] The idea is straightforward. Instead of doing round robin on the executers with available cores, the new scheduling will try to allocate tasks to the executors with least available cores. As a result for the executors who has more free resources may not have new tasks allocated. With dynamic allocation enabled, these executors may be released so that other jobs can get required resources from underlying resource manager. It is not specific bound to dynamic allocation, but it is an easy way to understand the gains of the new scheduler. In addition, in the patch (soon to be sent out) there is also another scheduler which does exactly opposite thing by allocating tasks to executors with most available cores in order to balance the workload to all executors. > Packed scheduling for Spark tasks across executors > -- > > Key: SPARK-17637 > URL: https://issues.apache.org/jira/browse/SPARK-17637 > Project: Spark > Issue Type: Improvement > Components: Scheduler >Reporter: Zhan Zhang >Priority: Minor > > Currently Spark scheduler implements round robin scheduling for tasks to > executors. Which is great as it distributes the load evenly across the > cluster, but this leads to significant resource waste in some cases, > especially when dynamic allocation is enabled. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17637) Packed scheduling for Spark tasks across executors
[ https://issues.apache.org/jira/browse/SPARK-17637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15515247#comment-15515247 ] Zhan Zhang commented on SPARK-17637: cc [~rxin] A quick prototype shows that for a tested pipeline, the job can save around 45% regarding the reserved cpu and memory when the dynamic allocation is enabled. > Packed scheduling for Spark tasks across executors > -- > > Key: SPARK-17637 > URL: https://issues.apache.org/jira/browse/SPARK-17637 > Project: Spark > Issue Type: Improvement > Components: Scheduler >Reporter: Zhan Zhang >Priority: Minor > > Currently Spark scheduler implements round robin scheduling for tasks to > executors. Which is great as it distributes the load evenly across the > cluster, but this leads to significant resource waste in some cases, > especially when dynamic allocation is enabled. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17637) Packed scheduling for Spark tasks across executors
[ https://issues.apache.org/jira/browse/SPARK-17637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15514105#comment-15514105 ] Zhan Zhang commented on SPARK-17637: The plan is to introduce a new configuration so that different scheduling algorithms can be used for the task scheduling. > Packed scheduling for Spark tasks across executors > -- > > Key: SPARK-17637 > URL: https://issues.apache.org/jira/browse/SPARK-17637 > Project: Spark > Issue Type: Improvement > Components: Scheduler >Reporter: Zhan Zhang >Priority: Minor > > Currently Spark scheduler implements round robin scheduling for tasks to > executors. Which is great as it distributes the load evenly across the > cluster, but this leads to significant resource waste in some cases, > especially when dynamic allocation is enabled. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-17637) Packed scheduling for Spark tasks across executors
Zhan Zhang created SPARK-17637: -- Summary: Packed scheduling for Spark tasks across executors Key: SPARK-17637 URL: https://issues.apache.org/jira/browse/SPARK-17637 Project: Spark Issue Type: Improvement Components: Scheduler Reporter: Zhan Zhang Priority: Minor Currently Spark scheduler implements round robin scheduling for tasks to executors. Which is great as it distributes the load evenly across the cluster, but this leads to significant resource waste in some cases, especially when dynamic allocation is enabled. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-17526) Display the executor log links with the job failure message on Spark UI and Console
Zhan Zhang created SPARK-17526: -- Summary: Display the executor log links with the job failure message on Spark UI and Console Key: SPARK-17526 URL: https://issues.apache.org/jira/browse/SPARK-17526 Project: Spark Issue Type: Improvement Components: Web UI Reporter: Zhan Zhang Priority: Minor Display the executor log links with the job failure message on Spark UI and Console "Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, HostName): java.lang.Exception: foo" To make this failure message more helpful, we should have the executor log link in the driver log and web ui as well on which the task failed. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-15848) Spark unable to read partitioned table in avro format and column name in upper case
[ https://issues.apache.org/jira/browse/SPARK-15848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhan Zhang updated SPARK-15848: --- Affects Version/s: 1.6.1 > Spark unable to read partitioned table in avro format and column name in > upper case > --- > > Key: SPARK-15848 > URL: https://issues.apache.org/jira/browse/SPARK-15848 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.1 >Reporter: Zhan Zhang > > If external partitioned Hive tables created in Avro format. > Spark is returning "null" values if columns names are in Uppercase in the > Avro schema. > The same tables return proper data when queried in the Hive client. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-15848) Spark unable to read partitioned table in avro format and column name in upper case
[ https://issues.apache.org/jira/browse/SPARK-15848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15323195#comment-15323195 ] Zhan Zhang commented on SPARK-15848: cat > file1.csv< file2.csv< val tbl = sqlContext.table("default.avro_table_uppercase"); scala> tbl.show +--+--+-++ |student_id|subject_id|marks|year| +--+--+-++ | null| null| 100|2000| | null| null| 20|2000| | null| null| 160|2000| | null| null| 963|2000| | null| null| 142|2000| | null| null| 430|2000| | null| null| 91|2002| | null| null| 28|2002| | null| null| 16|2002| | null| null| 96|2002| | null| null| 14|2002| | null| null| 43|2002| +--+--+-++ > Spark unable to read partitioned table in avro format and column name in > upper case > --- > > Key: SPARK-15848 > URL: https://issues.apache.org/jira/browse/SPARK-15848 > Project: Spark > Issue Type: Bug > Components: SQL >Reporter: Zhan Zhang > > If external partitioned Hive tables created in Avro format. > Spark is returning "null" values if columns names are in Uppercase in the > Avro schema. > The same tables return proper data when queried in the Hive client. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-15848) Spark unable to read partitioned table in avro format and column name in upper case
Zhan Zhang created SPARK-15848: -- Summary: Spark unable to read partitioned table in avro format and column name in upper case Key: SPARK-15848 URL: https://issues.apache.org/jira/browse/SPARK-15848 Project: Spark Issue Type: Bug Components: SQL Reporter: Zhan Zhang If external partitioned Hive tables created in Avro format. Spark is returning "null" values if columns names are in Uppercase in the Avro schema. The same tables return proper data when queried in the Hive client. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-15441) dataset outer join seems to return incorrect result
[ https://issues.apache.org/jira/browse/SPARK-15441?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15297881#comment-15297881 ] Zhan Zhang commented on SPARK-15441: Currently new GenericInternalRow(right.output.length) is used as nullRow, but actually it cannot be used to identify the difference of row itself is null or all columns are null. Probably we can add a special row nullRow to represent that the InternalRow itself is null, so that Encoder can identify whether the object itself is null or not. > dataset outer join seems to return incorrect result > --- > > Key: SPARK-15441 > URL: https://issues.apache.org/jira/browse/SPARK-15441 > Project: Spark > Issue Type: Bug > Components: sq; >Reporter: Reynold Xin >Assignee: Wenchen Fan >Priority: Critical > > See notebook > https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/6122906529858466/2836020637783173/5382278320999420/latest.html > {code} > import org.apache.spark.sql.functions > val left = List(("a", 1), ("a", 2), ("b", 3), ("c", 4)).toDS() > val right = List(("a", "x"), ("b", "y"), ("d", "z")).toDS() > // The last row _1 should be null, rather than (null, -1) > left.toDF("k", "v").as[(String, Int)].alias("left") > .joinWith(right.toDF("k", "u").as[(String, String)].alias("right"), > functions.col("left.k") === functions.col("right.k"), "right_outer") > .show() > {code} > The returned result currently is > {code} > +-+-+ > | _1| _2| > +-+-+ > |(a,2)|(a,x)| > |(a,1)|(a,x)| > |(b,3)|(b,y)| > |(null,-1)|(d,z)| > +-+-+ > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-7009) Build assembly JAR via ant to avoid zip64 problems
[ https://issues.apache.org/jira/browse/SPARK-7009?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15125205#comment-15125205 ] Zhan Zhang commented on SPARK-7009: --- Yes. This one is obsoleted. > Build assembly JAR via ant to avoid zip64 problems > -- > > Key: SPARK-7009 > URL: https://issues.apache.org/jira/browse/SPARK-7009 > Project: Spark > Issue Type: Improvement > Components: Build >Affects Versions: 1.3.0 > Environment: Java 7+ >Reporter: Steve Loughran > Attachments: check_spark_python.sh > > Original Estimate: 2h > Remaining Estimate: 2h > > SPARK-1911 shows the problem that JDK7+ is using zip64 to build large JARs; a > format incompatible with Java and pyspark. > Provided the total number of .class files+resources is <64K, ant can be used > to make the final JAR instead, perhaps by unzipping the maven-generated JAR > then rezipping it with zip64=never, before publishing the artifact via maven. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-11075) Spark SQL Thrift Server authentication issue on kerberized yarn cluster
[ https://issues.apache.org/jira/browse/SPARK-11075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15113560#comment-15113560 ] Zhan Zhang commented on SPARK-11075: Duplicated to SPARK-5159? > Spark SQL Thrift Server authentication issue on kerberized yarn cluster > > > Key: SPARK-11075 > URL: https://issues.apache.org/jira/browse/SPARK-11075 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.4.1, 1.5.0, 1.5.1 > Environment: hive-1.2.1 > hadoop-2.6.0 config kerbers >Reporter: Xiaoyu Wang > > Use proxy user connect to the thrift server by beeline but got permission > exception: > 1.Start the hive 1.2.1 metastore with user hive > {code} > $kinit -kt /tmp/hive.keytab hive/xxx > $nohup ./hive --service metastore 2>&1 >> ../logs/metastore.log & > {code} > 2.Start the spark thrift server with user hive > {code} > $kinit -kt /tmp/hive.keytab hive/xxx > $./start-thriftserver.sh --master yarn > {code} > 3.Connect to the thrift server with proxy user hive01 > {code} > $kinit hive01 > beeline command:!connect > jdbc:hive2://xxx:1/default;principal=hive/x...@hadoop.com;kerberosAuthType=kerberos;hive.server2.proxy.user=hive01 > {code} > 4.Create table and insert data > {code} > create table test(name string); > insert overwrite table test select * from sometable; > {code} > the insert sql got exception: > {noformat} > Error: org.apache.hadoop.security.AccessControlException: Permission denied: > user=hive01, access=WRITE, > inode="/user/hive/warehouse/test/.hive-staging_hive_2015-10-10_09-17-15_972_3267668540808140587-2/-ext-1/_temporary/0/task_201510100917_0003_m_00":hive:hadoop:drwxr-xr-x > at > org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkFsPermission(FSPermissionChecker.java:271) > at > org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:257) > at > org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:238) > at > org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:182) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6512) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.renameToInternal(FSNamesystem.java:3805) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.renameToInt(FSNamesystem.java:3775) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.renameTo(FSNamesystem.java:3739) > at > org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.rename(NameNodeRpcServer.java:754) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.rename(ClientNamenodeProtocolServerSideTranslatorPB.java:565) > at > org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619) > at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:962) > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2039) > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2035) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:415) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628) > at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2033) > (state=,code=0) > {noformat} > The table path on HDFS: > {noformat} > drwxrwxrwx - hive hadoop 0 2015-10-10 09:14 > /user/hive/warehouse/test > drwxrwxrwx - hive01 hadoop 0 2015-10-10 09:17 > /user/hive/warehouse/test/.hive-staging_hive_2015-10-10_09-17-15_972_3267668540808140587-2 > drwxr-xr-x - hive01 hadoop 0 2015-10-10 09:17 > /user/hive/warehouse/test/.hive-staging_hive_2015-10-10_09-17-15_972_3267668540808140587-2/-ext-1 > drwxr-xr-x - hive01 hadoop 0 2015-10-10 09:17 > /user/hive/warehouse/test/.hive-staging_hive_2015-10-10_09-17-15_972_3267668540808140587-2/-ext-1/_temporary > drwxr-xr-x - hive01 hadoop 0 2015-10-10 09:17 > /user/hive/warehouse/test/.hive-staging_hive_2015-10-10_09-17-15_972_3267668540808140587-2/-ext-1/_temporary/0 > drwxr-xr-x - hive hadoop 0 2015-10-10 09:17 > /user/hive/warehouse/test/.hive-staging_hive_2015-10-10_09-17-15_972_3267668540808140587-2/-ext-1/_temporary/0/_temporary > drwxr-xr-x - hive hadoop 0 2015-10-10 09:17 >
[jira] [Commented] (SPARK-5159) Thrift server does not respect hive.server2.enable.doAs=true
[ https://issues.apache.org/jira/browse/SPARK-5159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15102183#comment-15102183 ] Zhan Zhang commented on SPARK-5159: --- What happen if an user have a valid visit to a table, which will be saved in catalog. Another user then also can visit the table as it is cached in local hivecatalog, even if the latter does not have the access to the table, right? To make the impersonate to really work, all the information has to be tagged by user, right? > Thrift server does not respect hive.server2.enable.doAs=true > > > Key: SPARK-5159 > URL: https://issues.apache.org/jira/browse/SPARK-5159 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.2.0 >Reporter: Andrew Ray > Attachments: spark_thrift_server_log.txt > > > I'm currently testing the spark sql thrift server on a kerberos secured > cluster in YARN mode. Currently any user can access any table regardless of > HDFS permissions as all data is read as the hive user. In HiveServer2 the > property hive.server2.enable.doAs=true causes all access to be done as the > submitting user. We should do the same. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-5159) Thrift server does not respect hive.server2.enable.doAs=true
[ https://issues.apache.org/jira/browse/SPARK-5159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15102183#comment-15102183 ] Zhan Zhang edited comment on SPARK-5159 at 1/15/16 5:50 PM: What happen if an user have a valid visit to a table, which will be saved in catalog. Another user then also can visit the table as it is cached in local hivecatalog, even if the latter does not have the access to the table meta data, right? To make the impersonate to work, all the information has to be tagged by user, right? was (Author: zzhan): What happen if an user have a valid visit to a table, which will be saved in catalog. Another user then also can visit the table as it is cached in local hivecatalog, even if the latter does not have the access to the table, right? To make the impersonate to really work, all the information has to be tagged by user, right? > Thrift server does not respect hive.server2.enable.doAs=true > > > Key: SPARK-5159 > URL: https://issues.apache.org/jira/browse/SPARK-5159 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.2.0 >Reporter: Andrew Ray > Attachments: spark_thrift_server_log.txt > > > I'm currently testing the spark sql thrift server on a kerberos secured > cluster in YARN mode. Currently any user can access any table regardless of > HDFS permissions as all data is read as the hive user. In HiveServer2 the > property hive.server2.enable.doAs=true causes all access to be done as the > submitting user. We should do the same. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-5159) Thrift server does not respect hive.server2.enable.doAs=true
[ https://issues.apache.org/jira/browse/SPARK-5159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15098734#comment-15098734 ] Zhan Zhang commented on SPARK-5159: --- This issue is definitely broken. But fixing it needs a complete design being review first. For example, to enable the impersonation (doAs) at runtime, how do we solve the RDD sharing between different users? We can propagate the user to the executor piggybacked by TaskDescription. But what happen if two user operate on two RDDs which share the same parent, cache created by another user. Currently, RDD scope is SparkContext without any user information. It means even we do impersonation, it is meaningless per my understanding. > Thrift server does not respect hive.server2.enable.doAs=true > > > Key: SPARK-5159 > URL: https://issues.apache.org/jira/browse/SPARK-5159 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.2.0 >Reporter: Andrew Ray > Attachments: spark_thrift_server_log.txt > > > I'm currently testing the spark sql thrift server on a kerberos secured > cluster in YARN mode. Currently any user can access any table regardless of > HDFS permissions as all data is read as the hive user. In HiveServer2 the > property hive.server2.enable.doAs=true causes all access to be done as the > submitting user. We should do the same. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-11704) Optimize the Cartesian Join
[ https://issues.apache.org/jira/browse/SPARK-11704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15005682#comment-15005682 ] Zhan Zhang commented on SPARK-11704: [~maropu] You are right. I mean fetching from network is a big overhead. Feel free to work on it. > Optimize the Cartesian Join > --- > > Key: SPARK-11704 > URL: https://issues.apache.org/jira/browse/SPARK-11704 > Project: Spark > Issue Type: Improvement > Components: SQL >Reporter: Zhan Zhang > > Currently CartesianProduct relies on RDD.cartesian, in which the computation > is realized as follows > override def compute(split: Partition, context: TaskContext): Iterator[(T, > U)] = { > val currSplit = split.asInstanceOf[CartesianPartition] > for (x <- rdd1.iterator(currSplit.s1, context); > y <- rdd2.iterator(currSplit.s2, context)) yield (x, y) > } > From the above loop, if rdd1.count is n, rdd2 needs to be recomputed n times. > Which is really heavy and may never finished if n is large, especially when > rdd2 is coming from ShuffleRDD. > We should have some optimization on CartesianProduct by caching rightResults. > The problem is that we don’t have cleanup hook to unpersist rightResults > AFAIK. I think we should have some cleanup hook after query execution. > With the hook available, we can easily optimize such Cartesian join. I > believe such cleanup hook may also benefit other query optimizations. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-11705) Eliminate unnecessary Cartesian Join
[ https://issues.apache.org/jira/browse/SPARK-11705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15004744#comment-15004744 ] Zhan Zhang commented on SPARK-11705: simple reproduce step: import sqlContext.implicits._ case class SimpleRecord(key: Int, value: String) def withDF(name: String) = { val df = sc.parallelize((0 until 10).map(x => SimpleRecord(x, s"record_$x"))).toDF() df.registerTempTable(name) } withDF("p") withDF("s") withDF("l") val d = sqlContext.sql(s"select p.key, p.value, s.value, l.value from p, s, l where l.key = s.key and p.key = l.key") d.queryExecution.sparkPlan res15: org.apache.spark.sql.execution.SparkPlan = TungstenProject [key#0,value#1,value#3,value#5] SortMergeJoin [key#2,key#0], [key#4,key#4] CartesianProduct Scan PhysicalRDD[key#0,value#1] Scan PhysicalRDD[key#2,value#3] Scan PhysicalRDD[key#4,value#5] val d1 = sqlContext.sql(s"select p.key, p.value, s.value, l.value from s, l, p where l.key = s.key and p.key = l.key") d1.queryExecution.sparkPlan res16: org.apache.spark.sql.execution.SparkPlan = TungstenProject [key#0,value#1,value#3,value#5] SortMergeJoin [key#4], [key#0] TungstenProject [key#4,value#5,value#3] SortMergeJoin [key#2], [key#4] Scan PhysicalRDD[key#2,value#3] Scan PhysicalRDD[key#4,value#5] Scan PhysicalRDD[key#0,value#1] > Eliminate unnecessary Cartesian Join > > > Key: SPARK-11705 > URL: https://issues.apache.org/jira/browse/SPARK-11705 > Project: Spark > Issue Type: Improvement > Components: SQL >Reporter: Zhan Zhang > > When we have some queries similar to following (don’t remember the exact > form): > select * from a, b, c, d where a.key1 = c.key1 and b.key2 = c.key2 and c.key3 > = d.key3 > There will be a cartesian join between a and b. But if we just simply change > the table order, for example from a, c, b, d, such cartesian join are > eliminated. > Without such manual tuning, the query will never finish if a, b are big. But > we should not relies on such manual optimization. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-11704) Optimize the Cartesian Join
[ https://issues.apache.org/jira/browse/SPARK-11704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15005136#comment-15005136 ] Zhan Zhang edited comment on SPARK-11704 at 11/14/15 5:16 AM: -- I think we can add a register and a cleanup hook in query context. Before the query is performed, the register handlers are invoked (such as persist), and cleanup hooks are invoked (e.g., unpersist) after the query is done. By this way, the CartesianProduct we can cache the rightResult in the registered handler and unpersist the rightResult after the query. By this way, we avoid the recomputation of RDD2. In my testing, because rdd2 is quite small, I actually reverse the cartesian join by reverting cartesian(rdd1, rdd2) to cartesian(rdd2, rdd1). By this way, the computation is done quite fast, but the original form cannot be finished. was (Author: zzhan): I think we can add a cleanup hook in SQLContext, and when the query is done, we invoke all the cleanup hook registered. By this way, the CartesianProduct we can cache the rightResult and register the cleanup handler(unpersist). By this way, we avoid the recomputation of RDD2. In my testing, because rdd2 is quite small, I actually reverse the cartesian join by reverting cartesian(rdd1, rdd2) to cartesian(rdd2, rdd1). By this way, the computation is done quite fast, but the original form cannot be finished. > Optimize the Cartesian Join > --- > > Key: SPARK-11704 > URL: https://issues.apache.org/jira/browse/SPARK-11704 > Project: Spark > Issue Type: Improvement > Components: SQL >Reporter: Zhan Zhang > > Currently CartesianProduct relies on RDD.cartesian, in which the computation > is realized as follows > override def compute(split: Partition, context: TaskContext): Iterator[(T, > U)] = { > val currSplit = split.asInstanceOf[CartesianPartition] > for (x <- rdd1.iterator(currSplit.s1, context); > y <- rdd2.iterator(currSplit.s2, context)) yield (x, y) > } > From the above loop, if rdd1.count is n, rdd2 needs to be recomputed n times. > Which is really heavy and may never finished if n is large, especially when > rdd2 is coming from ShuffleRDD. > We should have some optimization on CartesianProduct by caching rightResults. > The problem is that we don’t have cleanup hook to unpersist rightResults > AFAIK. I think we should have some cleanup hook after query execution. > With the hook available, we can easily optimize such Cartesian join. I > believe such cleanup hook may also benefit other query optimizations. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-11704) Optimize the Cartesian Join
[ https://issues.apache.org/jira/browse/SPARK-11704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15005134#comment-15005134 ] Zhan Zhang commented on SPARK-11704: [~maropu] Maybe I misunderstand. If RDD2 is coming from ShuffleRDD, each new iterator will try to fetch from network because RDD2 is not cached. Is the ShuffleRDD cached automatically? > Optimize the Cartesian Join > --- > > Key: SPARK-11704 > URL: https://issues.apache.org/jira/browse/SPARK-11704 > Project: Spark > Issue Type: Improvement > Components: SQL >Reporter: Zhan Zhang > > Currently CartesianProduct relies on RDD.cartesian, in which the computation > is realized as follows > override def compute(split: Partition, context: TaskContext): Iterator[(T, > U)] = { > val currSplit = split.asInstanceOf[CartesianPartition] > for (x <- rdd1.iterator(currSplit.s1, context); > y <- rdd2.iterator(currSplit.s2, context)) yield (x, y) > } > From the above loop, if rdd1.count is n, rdd2 needs to be recomputed n times. > Which is really heavy and may never finished if n is large, especially when > rdd2 is coming from ShuffleRDD. > We should have some optimization on CartesianProduct by caching rightResults. > The problem is that we don’t have cleanup hook to unpersist rightResults > AFAIK. I think we should have some cleanup hook after query execution. > With the hook available, we can easily optimize such Cartesian join. I > believe such cleanup hook may also benefit other query optimizations. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-11704) Optimize the Cartesian Join
[ https://issues.apache.org/jira/browse/SPARK-11704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15005136#comment-15005136 ] Zhan Zhang commented on SPARK-11704: I think we can add a cleanup hook in SQLContext, and when the query is done, we invoke all the cleanup hook registered. By this way, the CartesianProduct we can cache the rightResult and register the cleanup handler(unpersist). By this way, we avoid the recomputation of RDD2. In my testing, because rdd2 is quite small, I actually reverse the cartesian join by reverting cartesian(rdd1, rdd2) to cartesian(rdd2, rdd1). By this way, the computation is done quite fast, but the original form cannot be finished. > Optimize the Cartesian Join > --- > > Key: SPARK-11704 > URL: https://issues.apache.org/jira/browse/SPARK-11704 > Project: Spark > Issue Type: Improvement > Components: SQL >Reporter: Zhan Zhang > > Currently CartesianProduct relies on RDD.cartesian, in which the computation > is realized as follows > override def compute(split: Partition, context: TaskContext): Iterator[(T, > U)] = { > val currSplit = split.asInstanceOf[CartesianPartition] > for (x <- rdd1.iterator(currSplit.s1, context); > y <- rdd2.iterator(currSplit.s2, context)) yield (x, y) > } > From the above loop, if rdd1.count is n, rdd2 needs to be recomputed n times. > Which is really heavy and may never finished if n is large, especially when > rdd2 is coming from ShuffleRDD. > We should have some optimization on CartesianProduct by caching rightResults. > The problem is that we don’t have cleanup hook to unpersist rightResults > AFAIK. I think we should have some cleanup hook after query execution. > With the hook available, we can easily optimize such Cartesian join. I > believe such cleanup hook may also benefit other query optimizations. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-11704) Optimize the Cartesian Join
Zhan Zhang created SPARK-11704: -- Summary: Optimize the Cartesian Join Key: SPARK-11704 URL: https://issues.apache.org/jira/browse/SPARK-11704 Project: Spark Issue Type: Bug Components: SQL Reporter: Zhan Zhang Currently CartesianProduct relies on RDD.cartesian, in which the computation is realized as follows override def compute(split: Partition, context: TaskContext): Iterator[(T, U)] = { val currSplit = split.asInstanceOf[CartesianPartition] for (x <- rdd1.iterator(currSplit.s1, context); y <- rdd2.iterator(currSplit.s2, context)) yield (x, y) } >From the above loop, if rdd1.count is n, rdd2 needs to be recomputed n times. >Which is really heavy and may never finished if n is large, especially when >rdd2 is coming from ShuffleRDD. We should have some optimization on CartesianProduct by caching rightResults. The problem is that we don’t have cleanup hook to unpersist rightResults AFAIK. I think we should have some cleanup hook after query execution. With the hook available, we can easily optimize such Cartesian join. I believe such cleanup hook may also benefit other query optimizations. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-11704) Optimize the Cartesian Join
[ https://issues.apache.org/jira/browse/SPARK-11704?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhan Zhang updated SPARK-11704: --- Issue Type: Improvement (was: Bug) > Optimize the Cartesian Join > --- > > Key: SPARK-11704 > URL: https://issues.apache.org/jira/browse/SPARK-11704 > Project: Spark > Issue Type: Improvement > Components: SQL >Reporter: Zhan Zhang > > Currently CartesianProduct relies on RDD.cartesian, in which the computation > is realized as follows > override def compute(split: Partition, context: TaskContext): Iterator[(T, > U)] = { > val currSplit = split.asInstanceOf[CartesianPartition] > for (x <- rdd1.iterator(currSplit.s1, context); > y <- rdd2.iterator(currSplit.s2, context)) yield (x, y) > } > From the above loop, if rdd1.count is n, rdd2 needs to be recomputed n times. > Which is really heavy and may never finished if n is large, especially when > rdd2 is coming from ShuffleRDD. > We should have some optimization on CartesianProduct by caching rightResults. > The problem is that we don’t have cleanup hook to unpersist rightResults > AFAIK. I think we should have some cleanup hook after query execution. > With the hook available, we can easily optimize such Cartesian join. I > believe such cleanup hook may also benefit other query optimizations. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-11705) Eliminate unnecessary Cartesian Join
Zhan Zhang created SPARK-11705: -- Summary: Eliminate unnecessary Cartesian Join Key: SPARK-11705 URL: https://issues.apache.org/jira/browse/SPARK-11705 Project: Spark Issue Type: Improvement Components: SQL Reporter: Zhan Zhang When we have some queries similar to following (don’t remember the exact form): select * from a, b, c, d where a.key1 = c.key1 and b.key2 = c.key2 and c.key3 = d.key3 There will be a cartesian join between a and b. But if we just simply change the table order, for example from a, c, b, d, such cartesian join are eliminated. Without such manual tuning, the query will never finish if a, b are big. But we should not relies on such manual optimization. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-11562) Provide user an option to init SQLContext or HiveContext in spark shell
[ https://issues.apache.org/jira/browse/SPARK-11562?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14994661#comment-14994661 ] Zhan Zhang commented on SPARK-11562: Thanks [~jerrylam] report the issue and provide the suggestion. I think adding the config spark.sql.hive.enabled is a good way to handle it. > Provide user an option to init SQLContext or HiveContext in spark shell > --- > > Key: SPARK-11562 > URL: https://issues.apache.org/jira/browse/SPARK-11562 > Project: Spark > Issue Type: Bug >Reporter: Zhan Zhang >Priority: Minor > > User should be provided an option to initialize the HiveContext or SQLContext > in spark shell. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-11562) Provide user an option to init SQLContext or HiveContext in spark shell
Zhan Zhang created SPARK-11562: -- Summary: Provide user an option to init SQLContext or HiveContext in spark shell Key: SPARK-11562 URL: https://issues.apache.org/jira/browse/SPARK-11562 Project: Spark Issue Type: Bug Reporter: Zhan Zhang Priority: Minor User should be provided an option to initialize the HiveContext or SQLContext in spark shell. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-11087) spark.sql.orc.filterPushdown does not work, No ORC pushdown predicate
[ https://issues.apache.org/jira/browse/SPARK-11087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14967456#comment-14967456 ] Zhan Zhang commented on SPARK-11087: [~patcharee] I tried again, used the step you provided, and could see the OrcInputFormat [INFO] ORC pushdown predicate: leaf-0 = (EQUALS name name_20). I am using master branch. > spark.sql.orc.filterPushdown does not work, No ORC pushdown predicate > - > > Key: SPARK-11087 > URL: https://issues.apache.org/jira/browse/SPARK-11087 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.5.1 > Environment: orc file version 0.12 with HIVE_8732 > hive version 1.2.1.2.3.0.0-2557 >Reporter: patcharee >Priority: Minor > > I have an external hive table stored as partitioned orc file (see the table > schema below). I tried to query from the table with where clause> > hiveContext.setConf("spark.sql.orc.filterPushdown", "true") > hiveContext.sql("select u, v from 4D where zone = 2 and x = 320 and y = > 117")). > But from the log file with debug logging level on, the ORC pushdown predicate > was not generated. > Unfortunately my table was not sorted when I inserted the data, but I > expected the ORC pushdown predicate should be generated (because of the where > clause) though > Table schema > > hive> describe formatted 4D; > OK > # col_namedata_type comment > > date int > hhint > x int > y int > heightfloat > u float > v float > w float > phfloat > phb float > t float > p float > pbfloat > qvaporfloat > qgraupfloat > qnice float > qnrainfloat > tke_pbl float > el_pblfloat > qcloudfloat > > # Partition Information > # col_namedata_type comment > > zone int > z int > year int > month int > > # Detailed Table Information > Database: default > Owner:patcharee > CreateTime: Thu Jul 09 16:46:54 CEST 2015 > LastAccessTime: UNKNOWN > Protect Mode: None > Retention:0 > Location: hdfs://helmhdfs/apps/hive/warehouse/wrf_tables/4D > > Table Type: EXTERNAL_TABLE > Table Parameters: > EXTERNALTRUE > comment this table is imported from rwf_data/*/wrf/* > last_modified_bypatcharee > last_modified_time 1439806692 > orc.compressZLIB > transient_lastDdlTime 1439806692 > > # Storage Information > SerDe Library:org.apache.hadoop.hive.ql.io.orc.OrcSerde > InputFormat: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat > OutputFormat: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat > > Compressed: No > Num Buckets: -1 > Bucket Columns: [] > Sort Columns: [] > Storage Desc Params: > serialization.format1 > Time taken: 0.388 seconds, Fetched: 58 row(s)
[jira] [Commented] (SPARK-11087) spark.sql.orc.filterPushdown does not work, No ORC pushdown predicate
[ https://issues.apache.org/jira/browse/SPARK-11087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1496#comment-1496 ] Zhan Zhang commented on SPARK-11087: [~patcharee] I use the embeded hive metastore without any configuration. If you ran it in real cluster, you have to collect the log from executor, because I think the log is printed inside of executors. > spark.sql.orc.filterPushdown does not work, No ORC pushdown predicate > - > > Key: SPARK-11087 > URL: https://issues.apache.org/jira/browse/SPARK-11087 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.5.1 > Environment: orc file version 0.12 with HIVE_8732 > hive version 1.2.1.2.3.0.0-2557 >Reporter: patcharee >Priority: Minor > > I have an external hive table stored as partitioned orc file (see the table > schema below). I tried to query from the table with where clause> > hiveContext.setConf("spark.sql.orc.filterPushdown", "true") > hiveContext.sql("select u, v from 4D where zone = 2 and x = 320 and y = > 117")). > But from the log file with debug logging level on, the ORC pushdown predicate > was not generated. > Unfortunately my table was not sorted when I inserted the data, but I > expected the ORC pushdown predicate should be generated (because of the where > clause) though > Table schema > > hive> describe formatted 4D; > OK > # col_namedata_type comment > > date int > hhint > x int > y int > heightfloat > u float > v float > w float > phfloat > phb float > t float > p float > pbfloat > qvaporfloat > qgraupfloat > qnice float > qnrainfloat > tke_pbl float > el_pblfloat > qcloudfloat > > # Partition Information > # col_namedata_type comment > > zone int > z int > year int > month int > > # Detailed Table Information > Database: default > Owner:patcharee > CreateTime: Thu Jul 09 16:46:54 CEST 2015 > LastAccessTime: UNKNOWN > Protect Mode: None > Retention:0 > Location: hdfs://helmhdfs/apps/hive/warehouse/wrf_tables/4D > > Table Type: EXTERNAL_TABLE > Table Parameters: > EXTERNALTRUE > comment this table is imported from rwf_data/*/wrf/* > last_modified_bypatcharee > last_modified_time 1439806692 > orc.compressZLIB > transient_lastDdlTime 1439806692 > > # Storage Information > SerDe Library:org.apache.hadoop.hive.ql.io.orc.OrcSerde > InputFormat: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat > OutputFormat: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat > > Compressed: No > Num Buckets: -1 > Bucket Columns: [] > Sort Columns: [] > Storage Desc Params: > serialization.format1 > Time taken: 0.388
[jira] [Comment Edited] (SPARK-11087) spark.sql.orc.filterPushdown does not work, No ORC pushdown predicate
[ https://issues.apache.org/jira/browse/SPARK-11087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14959599#comment-14959599 ] Zhan Zhang edited comment on SPARK-11087 at 10/15/15 8:58 PM: -- [~patcharee] I try to duplicate your table as much as possible, but still didn't hit the problem. Note that the query has to include some valid record in the partition. Otherwise, the partition pruning will trim all predicate before hitting the orc scan. Please refer to the below for the details. case class record(date: Int, hh: Int, x: Int, y: Int, height: Float, u: Float, w: Float, ph: Float, phb: Float, t: Float, p: Float, pb: Float, tke_pbl: Float, el_pbl: Float, qcloud: Float, zone: Int, z: Int, year: Int, month: Int) val records = (1 to 100).map { i => record(i.toInt, i.toInt, i.toInt, i.toInt, i.toFloat, i.toFloat, i.toFloat, i.toFloat, i.toFloat, i.toFloat, i.toFloat, i.toFloat, i.toFloat, i.toFloat, i.toFloat, i.toInt, i.toInt, i.toInt, i.toInt) } sc.parallelize(records).toDF().write.format("org.apache.spark.sql.hive.orc.DefaultSource").mode(org.apache.spark.sql.SaveMode.Append).partitionBy("zone","z","year","month").saveAsTable("5D") sc.parallelize(records).toDF().write.format("org.apache.spark.sql.hive.orc.DefaultSource").partitionBy("zone","z","year","month").save("4D") val test = sqlContext.read.format("orc").load("4D") test.registerTempTable("4D") sqlContext.setConf("spark.sql.orc.filterPushdown", "true") sqlContext.setConf("spark.sql.orc.filterPushdown", "true") sqlContext.sql("select date, month, year, hh, u*0.9122461, u*-0.40964267, z from 4D where x = and y = 117 and zone == 2 and year=2 and z >= 2 and z <= 8").show 2015-10-15 13:37:45 OrcInputFormat [INFO] ORC pushdown predicate: leaf-0 = (EQUALS x 320) leaf-1 = (EQUALS y 117) expr = (and leaf-0 leaf-1) 2507 sqlContext.sql("select date, month, year, hh, u*0.9122461, u*-0.40964267, z from 5D where x = 321 and y = 118 and zone == 2 and year=2 and z >= 2 and z <= 8").show 2015-10-15 13:40:06 OrcInputFormat [INFO] ORC pushdown predicate: leaf-0 = (EQUALS x 321) leaf-1 = (EQUALS y 118) expr = (and leaf-0 leaf-1) was (Author: zzhan): [~patcharee] I try to duplicate your table as much as possible, but still didn't hit the problem. Please refer to the below for the details. case class record(date: Int, hh: Int, x: Int, y: Int, height: Float, u: Float, w: Float, ph: Float, phb: Float, t: Float, p: Float, pb: Float, tke_pbl: Float, el_pbl: Float, qcloud: Float, zone: Int, z: Int, year: Int, month: Int) val records = (1 to 100).map { i => record(i.toInt, i.toInt, i.toInt, i.toInt, i.toFloat, i.toFloat, i.toFloat, i.toFloat, i.toFloat, i.toFloat, i.toFloat, i.toFloat, i.toFloat, i.toFloat, i.toFloat, i.toInt, i.toInt, i.toInt, i.toInt) } sc.parallelize(records).toDF().write.format("org.apache.spark.sql.hive.orc.DefaultSource").mode(org.apache.spark.sql.SaveMode.Append).partitionBy("zone","z","year","month").saveAsTable("5D") sc.parallelize(records).toDF().write.format("org.apache.spark.sql.hive.orc.DefaultSource").partitionBy("zone","z","year","month").save("4D") val test = sqlContext.read.format("orc").load("4D") 2503 test.registerTempTable("4D") 2504 sqlContext.setConf("spark.sql.orc.filterPushdown", "true") 2505 sqlContext.sql("select date, month, year, hh, u*0.9122461, u*-0.40964267, z from 4D where x = 320 and y = 117 and zone == 2 and year=2 and z >= 2 and z <= 8").show 2015-10-15 13:37:45 OrcInputFormat [INFO] ORC pushdown predicate: leaf-0 = (EQUALS x 320) leaf-1 = (EQUALS y 117) expr = (and leaf-0 leaf-1) 2507 sqlContext.sql("select date, month, year, hh, u*0.9122461, u*-0.40964267, z from 5D where x = 321 and y = 118 and zone == 2 and year=2 and z >= 2 and z <= 8").show 2015-10-15 13:40:06 OrcInputFormat [INFO] ORC pushdown predicate: leaf-0 = (EQUALS x 321) leaf-1 = (EQUALS y 118) expr = (and leaf-0 leaf-1) > spark.sql.orc.filterPushdown does not work, No ORC pushdown predicate > - > > Key: SPARK-11087 > URL: https://issues.apache.org/jira/browse/SPARK-11087 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.5.1 > Environment: orc file version 0.12 with HIVE_8732 > hive version 1.2.1.2.3.0.0-2557 >Reporter: patcharee >Priority: Minor > > I have an external hive table stored as partitioned orc file (see the table > schema below). I tried to query from the table with where clause> > hiveContext.setConf("spark.sql.orc.filterPushdown", "true") > hiveContext.sql("select u, v from 4D where zone = 2 and x = 320 and y = > 117")). > But from the log file with debug logging level on, the ORC pushdown predicate > was not generated. > Unfortunately my table was not sorted when I inserted the data, but I > expected the ORC pushdown
[jira] [Commented] (SPARK-11087) spark.sql.orc.filterPushdown does not work, No ORC pushdown predicate
[ https://issues.apache.org/jira/browse/SPARK-11087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14959599#comment-14959599 ] Zhan Zhang commented on SPARK-11087: [~patcharee] I try to duplicate your table as much as possible, but still didn't hit the problem. Please refer to the below for the details. case class record(date: Int, hh: Int, x: Int, y: Int, height: Float, u: Float, w: Float, ph: Float, phb: Float, t: Float, p: Float, pb: Float, tke_pbl: Float, el_pbl: Float, qcloud: Float, zone: Int, z: Int, year: Int, month: Int) val records = (1 to 100).map { i => record(i.toInt, i.toInt, i.toInt, i.toInt, i.toFloat, i.toFloat, i.toFloat, i.toFloat, i.toFloat, i.toFloat, i.toFloat, i.toFloat, i.toFloat, i.toFloat, i.toFloat, i.toInt, i.toInt, i.toInt, i.toInt) } sc.parallelize(records).toDF().write.format("org.apache.spark.sql.hive.orc.DefaultSource").mode(org.apache.spark.sql.SaveMode.Append).partitionBy("zone","z","year","month").saveAsTable("5D") sc.parallelize(records).toDF().write.format("org.apache.spark.sql.hive.orc.DefaultSource").partitionBy("zone","z","year","month").save("4D") val test = sqlContext.read.format("orc").load("4D") 2503 test.registerTempTable("4D") 2504 sqlContext.setConf("spark.sql.orc.filterPushdown", "true") 2505 sqlContext.sql("select date, month, year, hh, u*0.9122461, u*-0.40964267, z from 4D where x = 320 and y = 117 and zone == 2 and year=2 and z >= 2 and z <= 8").show 2015-10-15 13:37:45 OrcInputFormat [INFO] ORC pushdown predicate: leaf-0 = (EQUALS x 320) leaf-1 = (EQUALS y 117) expr = (and leaf-0 leaf-1) 2507 sqlContext.sql("select date, month, year, hh, u*0.9122461, u*-0.40964267, z from 5D where x = 321 and y = 118 and zone == 2 and year=2 and z >= 2 and z <= 8").show 2015-10-15 13:40:06 OrcInputFormat [INFO] ORC pushdown predicate: leaf-0 = (EQUALS x 321) leaf-1 = (EQUALS y 118) expr = (and leaf-0 leaf-1) > spark.sql.orc.filterPushdown does not work, No ORC pushdown predicate > - > > Key: SPARK-11087 > URL: https://issues.apache.org/jira/browse/SPARK-11087 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.5.1 > Environment: orc file version 0.12 with HIVE_8732 > hive version 1.2.1.2.3.0.0-2557 >Reporter: patcharee >Priority: Minor > > I have an external hive table stored as partitioned orc file (see the table > schema below). I tried to query from the table with where clause> > hiveContext.setConf("spark.sql.orc.filterPushdown", "true") > hiveContext.sql("select u, v from 4D where zone = 2 and x = 320 and y = > 117")). > But from the log file with debug logging level on, the ORC pushdown predicate > was not generated. > Unfortunately my table was not sorted when I inserted the data, but I > expected the ORC pushdown predicate should be generated (because of the where > clause) though > Table schema > > hive> describe formatted 4D; > OK > # col_namedata_type comment > > date int > hhint > x int > y int > heightfloat > u float > v float > w float > phfloat > phb float > t float > p float > pbfloat > qvaporfloat > qgraupfloat > qnice float > qnrainfloat > tke_pbl float > el_pblfloat > qcloudfloat > > # Partition Information > # col_namedata_type comment > > zone int > z int > year int > month
[jira] [Commented] (SPARK-11087) spark.sql.orc.filterPushdown does not work, No ORC pushdown predicate
[ https://issues.apache.org/jira/browse/SPARK-11087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14959342#comment-14959342 ] Zhan Zhang commented on SPARK-11087: [~patcharee] I tried a simple case with partition and predicate pushdown, and didn't hit the problem. The predicate is pushdown correctly. I will try to use your same table to see whether it works. 2501 case class Contact(name: String, phone: String) 2502 case class Person(name: String, age: Int, contacts: Seq[Contact]) 2503 val records = (1 to 100).map { i =>; 2504 Person(s"name_$i", i, (0 to 1).map { m => Contact(s"contact_$m", s"phone_$m") }) 2505 } 2506 sqlContext.setConf("spark.sql.orc.filterPushdown", "true") 2507 sc.parallelize(records).toDF().write.format("orc").partitionBy("age").save("peoplePartitioned") 2508 val peoplePartitioned = sqlContext.read.format("orc").load("peoplePartitioned") 2509 peoplePartitioned.registerTempTable("peoplePartitioned") 2510sqlContext.sql("SELECT * FROM peoplePartitioned WHERE age = 20 and name = 'name_20'").count 2511 :history 2512sqlContext.sql("SELECT * FROM peoplePartitioned WHERE name = 'name_20' and age = 20").count 2513 :history scala> 2015-10-15 10:40:45 OrcInputFormat [INFO] ORC pushdown predicate: leaf-0 = (LESS_THAN age 15) expr = leaf-0 2015-10-15 10:48:20 OrcInputFormat [INFO] ORC pushdown predicate: leaf-0 = (EQUALS name name_20) expr = leaf-0 sqlContext.sql("SELECT name FROM people WHERE age == 15 and age < 16").count() 2015-10-15 10:58:35 OrcInputFormat [INFO] ORC pushdown predicate: leaf-0 = (EQUALS age 15) leaf-1 = (LESS_THAN age 16) sqlContext.sql("SELECT name FROM people WHERE age < 15").count() > spark.sql.orc.filterPushdown does not work, No ORC pushdown predicate > - > > Key: SPARK-11087 > URL: https://issues.apache.org/jira/browse/SPARK-11087 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.5.1 > Environment: orc file version 0.12 with HIVE_8732 > hive version 1.2.1.2.3.0.0-2557 >Reporter: patcharee >Priority: Minor > > I have an external hive table stored as partitioned orc file (see the table > schema below). I tried to query from the table with where clause> > hiveContext.setConf("spark.sql.orc.filterPushdown", "true") > hiveContext.sql("select u, v from 4D where zone = 2 and x = 320 and y = > 117")). > But from the log file with debug logging level on, the ORC pushdown predicate > was not generated. > Unfortunately my table was not sorted when I inserted the data, but I > expected the ORC pushdown predicate should be generated (because of the where > clause) though > Table schema > > hive> describe formatted 4D; > OK > # col_namedata_type comment > > date int > hhint > x int > y int > heightfloat > u float > v float > w float > phfloat > phb float > t float > p float > pbfloat > qvaporfloat > qgraupfloat > qnice float > qnrainfloat > tke_pbl float > el_pblfloat > qcloudfloat > > # Partition Information > # col_namedata_type comment > > zone int > z int > year int > month int > > # Detailed Table Information > Database: default > Owner:patcharee
[jira] [Commented] (SPARK-11087) spark.sql.orc.filterPushdown does not work, No ORC pushdown predicate
[ https://issues.apache.org/jira/browse/SPARK-11087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14957301#comment-14957301 ] Zhan Zhang commented on SPARK-11087: I will take a look at this one. > spark.sql.orc.filterPushdown does not work, No ORC pushdown predicate > - > > Key: SPARK-11087 > URL: https://issues.apache.org/jira/browse/SPARK-11087 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.5.1 > Environment: orc file version 0.12 with HIVE_8732 > hive version 1.2.1.2.3.0.0-2557 >Reporter: patcharee >Priority: Minor > > I have an external hive table stored as partitioned orc file (see the table > schema below). I tried to query from the table with where clause> > hiveContext.setConf("spark.sql.orc.filterPushdown", "true") > hiveContext.sql("select u, v from 4D where zone = 2 and x = 320 and y = > 117")). > But from the log file with debug logging level on, the ORC pushdown predicate > was not generated. > Unfortunately my table was not sorted when I inserted the data, but I > expected the ORC pushdown predicate should be generated (because of the where > clause) though > Table schema > > hive> describe formatted 4D; > OK > # col_namedata_type comment > > date int > hhint > x int > y int > heightfloat > u float > v float > w float > phfloat > phb float > t float > p float > pbfloat > qvaporfloat > qgraupfloat > qnice float > qnrainfloat > tke_pbl float > el_pblfloat > qcloudfloat > > # Partition Information > # col_namedata_type comment > > zone int > z int > year int > month int > > # Detailed Table Information > Database: default > Owner:patcharee > CreateTime: Thu Jul 09 16:46:54 CEST 2015 > LastAccessTime: UNKNOWN > Protect Mode: None > Retention:0 > Location: hdfs://helmhdfs/apps/hive/warehouse/wrf_tables/4D > > Table Type: EXTERNAL_TABLE > Table Parameters: > EXTERNALTRUE > comment this table is imported from rwf_data/*/wrf/* > last_modified_bypatcharee > last_modified_time 1439806692 > orc.compressZLIB > transient_lastDdlTime 1439806692 > > # Storage Information > SerDe Library:org.apache.hadoop.hive.ql.io.orc.OrcSerde > InputFormat: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat > OutputFormat: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat > > Compressed: No > Num Buckets: -1 > Bucket Columns: [] > Sort Columns: [] > Storage Desc Params: > serialization.format1 > Time taken: 0.388 seconds, Fetched: 58 row(s) > > Data was inserted into this table by another spark job> >
[jira] [Commented] (SPARK-11087) spark.sql.orc.filterPushdown does not work, No ORC pushdown predicate
[ https://issues.apache.org/jira/browse/SPARK-11087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1499#comment-1499 ] Zhan Zhang commented on SPARK-11087: no matter whether the table is sorted or not, the predicate pushdown should happen. Need to first add some debug msg on the driver side to make sure it happen. > spark.sql.orc.filterPushdown does not work, No ORC pushdown predicate > - > > Key: SPARK-11087 > URL: https://issues.apache.org/jira/browse/SPARK-11087 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.5.1 > Environment: orc file version 0.12 with HIVE_8732 > hive version 1.2.1.2.3.0.0-2557 >Reporter: patcharee >Priority: Minor > > I have an external hive table stored as partitioned orc file (see the table > schema below). I tried to query from the table with where clause> > hiveContext.setConf("spark.sql.orc.filterPushdown", "true") > hiveContext.sql("select u, v from 4D where zone = 2 and x = 320 and y = > 117")). > But from the log file with debug logging level on, the ORC pushdown predicate > was not generated. > Unfortunately my table was not sorted when I inserted the data, but I > expected the ORC pushdown predicate should be generated (because of the where > clause) though > Table schema > > hive> describe formatted 4D; > OK > # col_namedata_type comment > > date int > hhint > x int > y int > heightfloat > u float > v float > w float > phfloat > phb float > t float > p float > pbfloat > qvaporfloat > qgraupfloat > qnice float > qnrainfloat > tke_pbl float > el_pblfloat > qcloudfloat > > # Partition Information > # col_namedata_type comment > > zone int > z int > year int > month int > > # Detailed Table Information > Database: default > Owner:patcharee > CreateTime: Thu Jul 09 16:46:54 CEST 2015 > LastAccessTime: UNKNOWN > Protect Mode: None > Retention:0 > Location: hdfs://helmhdfs/apps/hive/warehouse/wrf_tables/4D > > Table Type: EXTERNAL_TABLE > Table Parameters: > EXTERNALTRUE > comment this table is imported from rwf_data/*/wrf/* > last_modified_bypatcharee > last_modified_time 1439806692 > orc.compressZLIB > transient_lastDdlTime 1439806692 > > # Storage Information > SerDe Library:org.apache.hadoop.hive.ql.io.orc.OrcSerde > InputFormat: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat > OutputFormat: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat > > Compressed: No > Num Buckets: -1 > Bucket Columns: [] > Sort Columns: [] > Storage Desc Params: > serialization.format1 > Time taken: 0.388 seconds, Fetched: 58 row(s) >
[jira] [Commented] (SPARK-10623) turning on predicate pushdown throws nonsuch element exception when RDD is empty
[ https://issues.apache.org/jira/browse/SPARK-10623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14746164#comment-14746164 ] Zhan Zhang commented on SPARK-10623: It is caused by the SearchArgument.Builder is not correctly constructed in And and Or. Will send PR soon. > turning on predicate pushdown throws nonsuch element exception when RDD is > empty > - > > Key: SPARK-10623 > URL: https://issues.apache.org/jira/browse/SPARK-10623 > Project: Spark > Issue Type: Bug > Components: SQL >Reporter: Ram Sriharsha >Assignee: Zhan Zhang > > Turning on predicate pushdown for ORC datasources results in a > NoSuchElementException: > scala> val df = sqlContext.sql("SELECT name FROM people WHERE age < 15") > df: org.apache.spark.sql.DataFrame = [name: string] > scala> sqlContext.setConf("spark.sql.orc.filterPushdown", "true") > scala> df.explain > == Physical Plan == > java.util.NoSuchElementException > Disabling the pushdown makes things work again: > scala> sqlContext.setConf("spark.sql.orc.filterPushdown", "false") > scala> df.explain > == Physical Plan == > Project [name#6] > Filter (age#7 < 15) > Scan > OrcRelation[file:/home/mydir/spark-1.5.0-SNAPSHOT/test/people][name#6,age#7] -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-10304) Partition discovery does not throw an exception if the dir structure is invalid
[ https://issues.apache.org/jira/browse/SPARK-10304?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14735710#comment-14735710 ] Zhan Zhang commented on SPARK-10304: Did more investigation. Currently all files are included (_common_metadata, etc), for example: /Users/zzhang/repo/spark/sql/core/target/tmp/spark-cd6c0332-c6ed-4ef5-8061-7681b895e07a/_common_metadata /Users/zzhang/repo/spark/sql/core/target/tmp/spark-cd6c0332-c6ed-4ef5-8061-7681b895e07a/_metadata /Users/zzhang/repo/spark/sql/core/target/tmp/spark-cd6c0332-c6ed-4ef5-8061-7681b895e07a/id=71/part-r-1-39ef2d6e-2832-4757-ac02-0a938eb83b7d.gz.parquet On the framework level, the partition will be retrieved from both /Users/zzhang/repo/spark/sql/core/target/tmp/spark-cd6c0332-c6ed-4ef5-8061-7681b895e07a/ and /Users/zzhang/repo/spark/sql/core/target/tmp/spark-cd6c0332-c6ed-4ef5-8061-7681b895e07a/id=71/ In this case, the framework cannot differentiate valid and invalid directory. [~lian cheng] Can we filter all unnecessary files, e.g., _metadata, _common_metadata when doing the partition discovery by removing all files start with . or underscore? I didn't see such files are useful for partition discovery, but I may miss something. Otherwise, it seems to be hard to check the validation of the directory. cc [~yhuai] > Partition discovery does not throw an exception if the dir structure is > invalid > --- > > Key: SPARK-10304 > URL: https://issues.apache.org/jira/browse/SPARK-10304 > Project: Spark > Issue Type: Bug > Components: SQL >Reporter: Yin Huai >Assignee: Zhan Zhang >Priority: Critical > > I have a dir structure like {{/path/table1/partition_column=1/}}. When I try > to use {{load("/path/")}}, it works and I get a DF. When I query this DF, if > it is stored as ORC, there will be the following NPE. But, if it is Parquet, > we even can return rows. We should complain to users about the dir struct > because {{table1}} does not meet our format. > {code} > org.apache.spark.SparkException: Job aborted due to stage failure: Task 26 in > stage 57.0 failed 4 times, most recent failure: Lost task 26.3 in stage 57.0 > (TID 3504, 10.0.195.227): java.lang.NullPointerException > at > org.apache.spark.sql.hive.HiveInspectors$class.unwrapperFor(HiveInspectors.scala:466) > at > org.apache.spark.sql.hive.orc.OrcTableScan.unwrapperFor(OrcRelation.scala:224) > at > org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1$$anonfun$9.apply(OrcRelation.scala:261) > at > org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1$$anonfun$9.apply(OrcRelation.scala:261) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at scala.collection.immutable.List.foreach(List.scala:318) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at scala.collection.AbstractTraversable.map(Traversable.scala:105) > at > org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1.apply(OrcRelation.scala:261) > at > org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1.apply(OrcRelation.scala:256) > at scala.Option.map(Option.scala:145) > at > org.apache.spark.sql.hive.orc.OrcTableScan.org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject(OrcRelation.scala:256) > at > org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$execute$3.apply(OrcRelation.scala:318) > at > org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$execute$3.apply(OrcRelation.scala:316) > at > org.apache.spark.rdd.HadoopRDD$HadoopMapPartitionsWithSplitRDD.compute(HadoopRDD.scala:380) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-10304) Partition discovery does not throw an exception if the dir structure is valid
[ https://issues.apache.org/jira/browse/SPARK-10304?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14723851#comment-14723851 ] Zhan Zhang commented on SPARK-10304: [~yhuai] I tried to reproduce the problem with the same directory structure (table is saved in /tmp/table/peoplePartitioned), but didn't hit the problem. val table = sqlContext.read.format("orc").load("/tmp/table") table.registerTempTable("table") sqlContext.sql("SELECT * FROM table WHERE age = 19").show sqlContext.sql("SELECT * FROM table").show val table = sqlContext.read.format("orc").load("/tmp/table/peoplePartitioned") table.registerTempTable("table") sqlContext.sql("SELECT * FROM table WHERE age = 19").show sqlContext.sql("SELECT * FROM table").show I went through the partition parsing code, which is parsing the leaf directory, and thus starting point does not matter , and both /tmp/table and /tmp/table/peoplePartitioned are valid directory as long as it only have the files from the same orc table. Does your /tmp/table have some extra files not belonging to the same table? If not, can you please provide exact reproducing steps? cc [~lian cheng] > Partition discovery does not throw an exception if the dir structure is valid > - > > Key: SPARK-10304 > URL: https://issues.apache.org/jira/browse/SPARK-10304 > Project: Spark > Issue Type: Bug > Components: SQL >Reporter: Yin Huai >Assignee: Zhan Zhang >Priority: Critical > > I have a dir structure like {{/path/table1/partition_column=1/}}. When I try > to use {{load("/path/")}}, it works and I get a DF. When I query this DF, if > it is stored as ORC, there will be the following NPE. But, if it is Parquet, > we even can return rows. We should complain to users about the dir struct > because {{table1}} does not meet our format. > {code} > org.apache.spark.SparkException: Job aborted due to stage failure: Task 26 in > stage 57.0 failed 4 times, most recent failure: Lost task 26.3 in stage 57.0 > (TID 3504, 10.0.195.227): java.lang.NullPointerException > at > org.apache.spark.sql.hive.HiveInspectors$class.unwrapperFor(HiveInspectors.scala:466) > at > org.apache.spark.sql.hive.orc.OrcTableScan.unwrapperFor(OrcRelation.scala:224) > at > org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1$$anonfun$9.apply(OrcRelation.scala:261) > at > org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1$$anonfun$9.apply(OrcRelation.scala:261) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at scala.collection.immutable.List.foreach(List.scala:318) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at scala.collection.AbstractTraversable.map(Traversable.scala:105) > at > org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1.apply(OrcRelation.scala:261) > at > org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1.apply(OrcRelation.scala:256) > at scala.Option.map(Option.scala:145) > at > org.apache.spark.sql.hive.orc.OrcTableScan.org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject(OrcRelation.scala:256) > at > org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$execute$3.apply(OrcRelation.scala:318) > at > org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$execute$3.apply(OrcRelation.scala:316) > at > org.apache.spark.rdd.HadoopRDD$HadoopMapPartitionsWithSplitRDD.compute(HadoopRDD.scala:380) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-10304) Partition discovery does not throw an exception if the dir structure is valid
[ https://issues.apache.org/jira/browse/SPARK-10304?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14723994#comment-14723994 ] Zhan Zhang commented on SPARK-10304: [~yhuai] I think the NPE is caused by the directory has multiple table inside. [~lian cheng] Does the design allow different partition spec. For example, one part of the table is partitioned by key1, key2, the second part is partitioned by key1 only and the third part is not partitioned? If so, then it is hard to decide whether the directory is valid or not. Otherwise, we can simply check the consistency of partitionspec and throw exceptions if not consistent. > Partition discovery does not throw an exception if the dir structure is valid > - > > Key: SPARK-10304 > URL: https://issues.apache.org/jira/browse/SPARK-10304 > Project: Spark > Issue Type: Bug > Components: SQL >Reporter: Yin Huai >Assignee: Zhan Zhang >Priority: Critical > > I have a dir structure like {{/path/table1/partition_column=1/}}. When I try > to use {{load("/path/")}}, it works and I get a DF. When I query this DF, if > it is stored as ORC, there will be the following NPE. But, if it is Parquet, > we even can return rows. We should complain to users about the dir struct > because {{table1}} does not meet our format. > {code} > org.apache.spark.SparkException: Job aborted due to stage failure: Task 26 in > stage 57.0 failed 4 times, most recent failure: Lost task 26.3 in stage 57.0 > (TID 3504, 10.0.195.227): java.lang.NullPointerException > at > org.apache.spark.sql.hive.HiveInspectors$class.unwrapperFor(HiveInspectors.scala:466) > at > org.apache.spark.sql.hive.orc.OrcTableScan.unwrapperFor(OrcRelation.scala:224) > at > org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1$$anonfun$9.apply(OrcRelation.scala:261) > at > org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1$$anonfun$9.apply(OrcRelation.scala:261) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at scala.collection.immutable.List.foreach(List.scala:318) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at scala.collection.AbstractTraversable.map(Traversable.scala:105) > at > org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1.apply(OrcRelation.scala:261) > at > org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1.apply(OrcRelation.scala:256) > at scala.Option.map(Option.scala:145) > at > org.apache.spark.sql.hive.orc.OrcTableScan.org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject(OrcRelation.scala:256) > at > org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$execute$3.apply(OrcRelation.scala:318) > at > org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$execute$3.apply(OrcRelation.scala:316) > at > org.apache.spark.rdd.HadoopRDD$HadoopMapPartitionsWithSplitRDD.compute(HadoopRDD.scala:380) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-10304) Partition discovery does not throw an exception if the dir structure is valid
[ https://issues.apache.org/jira/browse/SPARK-10304?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14724122#comment-14724122 ] Zhan Zhang commented on SPARK-10304: [~lian cheng] forget about my question. From the code, it is not allowed. Here it happens because the directory has nonPartitionedTableX, and the validation check does not handle this case. > Partition discovery does not throw an exception if the dir structure is valid > - > > Key: SPARK-10304 > URL: https://issues.apache.org/jira/browse/SPARK-10304 > Project: Spark > Issue Type: Bug > Components: SQL >Reporter: Yin Huai >Assignee: Zhan Zhang >Priority: Critical > > I have a dir structure like {{/path/table1/partition_column=1/}}. When I try > to use {{load("/path/")}}, it works and I get a DF. When I query this DF, if > it is stored as ORC, there will be the following NPE. But, if it is Parquet, > we even can return rows. We should complain to users about the dir struct > because {{table1}} does not meet our format. > {code} > org.apache.spark.SparkException: Job aborted due to stage failure: Task 26 in > stage 57.0 failed 4 times, most recent failure: Lost task 26.3 in stage 57.0 > (TID 3504, 10.0.195.227): java.lang.NullPointerException > at > org.apache.spark.sql.hive.HiveInspectors$class.unwrapperFor(HiveInspectors.scala:466) > at > org.apache.spark.sql.hive.orc.OrcTableScan.unwrapperFor(OrcRelation.scala:224) > at > org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1$$anonfun$9.apply(OrcRelation.scala:261) > at > org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1$$anonfun$9.apply(OrcRelation.scala:261) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at scala.collection.immutable.List.foreach(List.scala:318) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at scala.collection.AbstractTraversable.map(Traversable.scala:105) > at > org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1.apply(OrcRelation.scala:261) > at > org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1.apply(OrcRelation.scala:256) > at scala.Option.map(Option.scala:145) > at > org.apache.spark.sql.hive.orc.OrcTableScan.org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject(OrcRelation.scala:256) > at > org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$execute$3.apply(OrcRelation.scala:318) > at > org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$execute$3.apply(OrcRelation.scala:316) > at > org.apache.spark.rdd.HadoopRDD$HadoopMapPartitionsWithSplitRDD.compute(HadoopRDD.scala:380) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-10304) Partition discovery does not throw an exception if the dir structure is valid
[ https://issues.apache.org/jira/browse/SPARK-10304?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14717951#comment-14717951 ] Zhan Zhang commented on SPARK-10304: [~yhuai] Thanks for the information, and initial investigation. I will take a look. Partition discovery does not throw an exception if the dir structure is valid - Key: SPARK-10304 URL: https://issues.apache.org/jira/browse/SPARK-10304 Project: Spark Issue Type: Bug Components: SQL Reporter: Yin Huai Assignee: Zhan Zhang Priority: Critical I have a dir structure like {{/path/table1/partition_column=1/}}. When I try to use {{load(/path/)}}, it works and I get a DF. When I query this DF, if it is stored as ORC, there will be the following NPE. But, if it is Parquet, we even can return rows. We should complain to users about the dir struct because {{table1}} does not meet our format. {code} org.apache.spark.SparkException: Job aborted due to stage failure: Task 26 in stage 57.0 failed 4 times, most recent failure: Lost task 26.3 in stage 57.0 (TID 3504, 10.0.195.227): java.lang.NullPointerException at org.apache.spark.sql.hive.HiveInspectors$class.unwrapperFor(HiveInspectors.scala:466) at org.apache.spark.sql.hive.orc.OrcTableScan.unwrapperFor(OrcRelation.scala:224) at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1$$anonfun$9.apply(OrcRelation.scala:261) at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1$$anonfun$9.apply(OrcRelation.scala:261) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1.apply(OrcRelation.scala:261) at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1.apply(OrcRelation.scala:256) at scala.Option.map(Option.scala:145) at org.apache.spark.sql.hive.orc.OrcTableScan.org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject(OrcRelation.scala:256) at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$execute$3.apply(OrcRelation.scala:318) at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$execute$3.apply(OrcRelation.scala:316) at org.apache.spark.rdd.HadoopRDD$HadoopMapPartitionsWithSplitRDD.compute(HadoopRDD.scala:380) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-10304) Need to add a null check in unwrapperFor in HiveInspectors
[ https://issues.apache.org/jira/browse/SPARK-10304?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14716204#comment-14716204 ] Zhan Zhang commented on SPARK-10304: [~yhuai] Is the field.getFieldObjectInspector return null? It should never happen if everything is as expected, right? Do you know how to reproduce the issue? Need to add a null check in unwrapperFor in HiveInspectors -- Key: SPARK-10304 URL: https://issues.apache.org/jira/browse/SPARK-10304 Project: Spark Issue Type: Bug Components: SQL Reporter: Yin Huai Assignee: Zhan Zhang Priority: Critical {code} org.apache.spark.SparkException: Job aborted due to stage failure: Task 26 in stage 57.0 failed 4 times, most recent failure: Lost task 26.3 in stage 57.0 (TID 3504, 10.0.195.227): java.lang.NullPointerException at org.apache.spark.sql.hive.HiveInspectors$class.unwrapperFor(HiveInspectors.scala:466) at org.apache.spark.sql.hive.orc.OrcTableScan.unwrapperFor(OrcRelation.scala:224) at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1$$anonfun$9.apply(OrcRelation.scala:261) at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1$$anonfun$9.apply(OrcRelation.scala:261) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1.apply(OrcRelation.scala:261) at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1.apply(OrcRelation.scala:256) at scala.Option.map(Option.scala:145) at org.apache.spark.sql.hive.orc.OrcTableScan.org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject(OrcRelation.scala:256) at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$execute$3.apply(OrcRelation.scala:318) at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$execute$3.apply(OrcRelation.scala:316) at org.apache.spark.rdd.HadoopRDD$HadoopMapPartitionsWithSplitRDD.compute(HadoopRDD.scala:380) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-10304) Need to add a null check in unwrapperFor in HiveInspectors
[ https://issues.apache.org/jira/browse/SPARK-10304?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14715958#comment-14715958 ] Zhan Zhang commented on SPARK-10304: [~yhuai] NP. Will look at it. Need to add a null check in unwrapperFor in HiveInspectors -- Key: SPARK-10304 URL: https://issues.apache.org/jira/browse/SPARK-10304 Project: Spark Issue Type: Bug Components: SQL Reporter: Yin Huai Assignee: Zhan Zhang Priority: Critical {code} org.apache.spark.SparkException: Job aborted due to stage failure: Task 26 in stage 57.0 failed 4 times, most recent failure: Lost task 26.3 in stage 57.0 (TID 3504, 10.0.195.227): java.lang.NullPointerException at org.apache.spark.sql.hive.HiveInspectors$class.unwrapperFor(HiveInspectors.scala:466) at org.apache.spark.sql.hive.orc.OrcTableScan.unwrapperFor(OrcRelation.scala:224) at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1$$anonfun$9.apply(OrcRelation.scala:261) at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1$$anonfun$9.apply(OrcRelation.scala:261) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1.apply(OrcRelation.scala:261) at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1.apply(OrcRelation.scala:256) at scala.Option.map(Option.scala:145) at org.apache.spark.sql.hive.orc.OrcTableScan.org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject(OrcRelation.scala:256) at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$execute$3.apply(OrcRelation.scala:318) at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$execute$3.apply(OrcRelation.scala:316) at org.apache.spark.rdd.HadoopRDD$HadoopMapPartitionsWithSplitRDD.compute(HadoopRDD.scala:380) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-5111) HiveContext and Thriftserver cannot work in secure cluster beyond hadoop2.5
[ https://issues.apache.org/jira/browse/SPARK-5111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14682189#comment-14682189 ] Zhan Zhang commented on SPARK-5111: --- [~adkathu...@yahoo.com] Hive is upgrade to 1.2 in spark-1.5 Please refer to SPARK-8064. HiveContext and Thriftserver cannot work in secure cluster beyond hadoop2.5 --- Key: SPARK-5111 URL: https://issues.apache.org/jira/browse/SPARK-5111 Project: Spark Issue Type: Bug Components: SQL Reporter: Zhan Zhang Fix For: 1.5.0 Due to java.lang.NoSuchFieldError: SASL_PROPS error. Need to backport some hive-0.14 fix into spark, since there is no effort to upgrade hive to 0.14 support in spark. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-5111) HiveContext and Thriftserver cannot work in secure cluster beyond hadoop2.5
[ https://issues.apache.org/jira/browse/SPARK-5111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14662227#comment-14662227 ] Zhan Zhang commented on SPARK-5111: --- Since hive upgrade is done. This jira is not valid anymore. Please close it. HiveContext and Thriftserver cannot work in secure cluster beyond hadoop2.5 --- Key: SPARK-5111 URL: https://issues.apache.org/jira/browse/SPARK-5111 Project: Spark Issue Type: Bug Components: SQL Reporter: Zhan Zhang Due to java.lang.NoSuchFieldError: SASL_PROPS error. Need to backport some hive-0.14 fix into spark, since there is no effort to upgrade hive to 0.14 support in spark. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-8501) ORC data source may give empty schema if an ORC file containing zero rows is picked for schema discovery
[ https://issues.apache.org/jira/browse/SPARK-8501?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14612643#comment-14612643 ] Zhan Zhang commented on SPARK-8501: --- Because in spark, we will not create the orc file if the record is empty. It is only happens with the ORC file created by hive, right? ORC data source may give empty schema if an ORC file containing zero rows is picked for schema discovery Key: SPARK-8501 URL: https://issues.apache.org/jira/browse/SPARK-8501 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 1.4.0 Environment: Hive 0.13.1 Reporter: Cheng Lian Assignee: Cheng Lian Priority: Critical Not sure whether this should be considered as a bug of ORC bundled with Hive 0.13.1: for an ORC file containing zero rows, the schema written in its footer contains zero fields (e.g. {{struct}}). To reproduce this issue, let's first produce an empty ORC file. Copy data file {{sql/hive/src/test/resources/data/files/kv1.txt}} in Spark code repo to {{/tmp/kv1.txt}} (I just picked a random simple test data file), then run the following lines in Hive 0.13.1 CLI: {noformat} $ hive hive CREATE TABLE foo(key INT, value STRING); hive LOAD DATA LOCAL INPATH '/tmp/kv1.txt' INTO TABLE foo; hive CREATE TABLE bar STORED AS ORC AS SELECT * FROM foo WHERE key = -1; {noformat} Now inspect the empty ORC file we just wrote: {noformat} $ hive --orcfiledump /user/hive/warehouse_hive13/bar/00_0 Structure for /user/hive/warehouse_hive13/bar/00_0 15/06/20 00:42:54 INFO orc.ReaderImpl: Reading ORC rows from /user/hive/warehouse_hive13/bar/00_0 with {include: null, offset: 0, length: 9223372036854775807} Rows: 0 Compression: ZLIB Compression size: 262144 Type: struct Stripe Statistics: File Statistics: Column 0: count: 0 Stripes: {noformat} Notice the {{struct}} part. This feature is OK for Hive, which has a central metastore to save table schema. But for users who read raw data files without Hive metastore with Spark SQL 1.4.0, it causes problem because currently the ORC data source just picks a random part-file whichever comes the first for schema discovery. Expected behavior can be: # Try all files one by one until we find a part-file with non-empty schema. # Throws {{AnalysisException}} if no such part-file can be found. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-2883) Spark Support for ORCFile format
[ https://issues.apache.org/jira/browse/SPARK-2883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14603166#comment-14603166 ] Zhan Zhang commented on SPARK-2883: --- [~philclaridge] Please refer to the test case in the trunk on how to use it. saveAsOrcFile/orcFile is removed from upstream. Spark Support for ORCFile format Key: SPARK-2883 URL: https://issues.apache.org/jira/browse/SPARK-2883 Project: Spark Issue Type: New Feature Components: Input/Output, SQL Reporter: Zhan Zhang Assignee: Zhan Zhang Priority: Critical Fix For: 1.4.0 Attachments: 2014-09-12 07.05.24 pm Spark UI.png, 2014-09-12 07.07.19 pm jobtracker.png, orc.diff Verify the support of OrcInputFormat in spark, fix issues if exists and add documentation of its usage. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-2883) Spark Support for ORCFile format
[ https://issues.apache.org/jira/browse/SPARK-2883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14603164#comment-14603164 ] Zhan Zhang commented on SPARK-2883: --- [~biao luo] saveAsOrcFile and orcFile is not removed from upstream. Please refer to the test case on how to use this feature. Spark Support for ORCFile format Key: SPARK-2883 URL: https://issues.apache.org/jira/browse/SPARK-2883 Project: Spark Issue Type: New Feature Components: Input/Output, SQL Reporter: Zhan Zhang Assignee: Zhan Zhang Priority: Critical Fix For: 1.4.0 Attachments: 2014-09-12 07.05.24 pm Spark UI.png, 2014-09-12 07.07.19 pm jobtracker.png, orc.diff Verify the support of OrcInputFormat in spark, fix issues if exists and add documentation of its usage. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-2883) Spark Support for ORCFile format
[ https://issues.apache.org/jira/browse/SPARK-2883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14603669#comment-14603669 ] Zhan Zhang commented on SPARK-2883: --- [~philclaridge] I try the spark-shell in local mode, and I did not meet the OOM issue. Following is the one I tried. 2502case class AllDataTypes( 2503 stringField: String, 2504 intField: Int, 2505 longField: Long, 2506 floatField: Float, 2507 doubleField: Double, 2508 shortField: Short, 2509 byteField: Byte, 2510 booleanField: Boolean) 2511 val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) 2512 import sqlContext.implicits._ 2513 val range = (0 to 255) 2514 val data = sc.parallelize(range).map(x = AllDataTypes(s$x, x, x.toLong, x.toFloat, x.toDouble, x.toShort, x.toByte, x % 2 == 0)) 2515 data.toDF().write.format(orc).save(orcRDD) 2516 :history If you met OutOfMemoryError: PermGen space, you need to set -XX:PermSize=512M. For further question, as [~marmbrus] said, you can ask this question in the user list. Spark Support for ORCFile format Key: SPARK-2883 URL: https://issues.apache.org/jira/browse/SPARK-2883 Project: Spark Issue Type: New Feature Components: Input/Output, SQL Reporter: Zhan Zhang Assignee: Zhan Zhang Priority: Critical Fix For: 1.4.0 Attachments: 2014-09-12 07.05.24 pm Spark UI.png, 2014-09-12 07.07.19 pm jobtracker.png, orc.diff Verify the support of OrcInputFormat in spark, fix issues if exists and add documentation of its usage. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-5111) HiveContext and Thriftserver cannot work in secure cluster beyond hadoop2.5
[ https://issues.apache.org/jira/browse/SPARK-5111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14596244#comment-14596244 ] Zhan Zhang commented on SPARK-5111: --- [~bolke] Thanks for the feedback. I will take a look on how to migrate this patch to spark-1.4 and beyond when I have bandwidth. HiveContext and Thriftserver cannot work in secure cluster beyond hadoop2.5 --- Key: SPARK-5111 URL: https://issues.apache.org/jira/browse/SPARK-5111 Project: Spark Issue Type: Bug Components: SQL Reporter: Zhan Zhang Due to java.lang.NoSuchFieldError: SASL_PROPS error. Need to backport some hive-0.14 fix into spark, since there is no effort to upgrade hive to 0.14 support in spark. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6112) Provide external block store support through HDFS RAM_DISK
[ https://issues.apache.org/jira/browse/SPARK-6112?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14596277#comment-14596277 ] Zhan Zhang commented on SPARK-6112: --- [~bghit] Here is one example link for the ramdisk configuration. http://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.2.4/bk_hdfs_admin_tools/content/configure_memory_storage.html [~arpitagarwal] Bogdan's configuration looks OK to me. Do you have any comments regarding the new added configuraiton for ramdisk, dfs.datanode.max.locked.memory? Provide external block store support through HDFS RAM_DISK -- Key: SPARK-6112 URL: https://issues.apache.org/jira/browse/SPARK-6112 Project: Spark Issue Type: New Feature Components: Block Manager Reporter: Zhan Zhang Attachments: SparkOffheapsupportbyHDFS.pdf HDFS Lazy_Persist policy provide possibility to cache the RDD off_heap in hdfs. We may want to provide similar capacity to Tachyon by leveraging hdfs RAM_DISK feature, if the user environment does not have tachyon deployed. With this feature, it potentially provides possibility to share RDD in memory across different jobs and even share with jobs other than spark, and avoid the RDD recomputation if executors crash. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6112) Provide external block store support through HDFS RAM_DISK
[ https://issues.apache.org/jira/browse/SPARK-6112?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14594935#comment-14594935 ] Zhan Zhang commented on SPARK-6112: --- Thansk [~arpitagarwal] for the detail setup. [~bghit] Could you clarify which config is still using spark.offHeapStore, as I search the diff and didn't find offheap except one loginfo ( I will change in the next update) and StorageLevel.OFF_HEAP (which is not changed due to backward compatibility). Provide external block store support through HDFS RAM_DISK -- Key: SPARK-6112 URL: https://issues.apache.org/jira/browse/SPARK-6112 Project: Spark Issue Type: New Feature Components: Block Manager Reporter: Zhan Zhang Attachments: SparkOffheapsupportbyHDFS.pdf HDFS Lazy_Persist policy provide possibility to cache the RDD off_heap in hdfs. We may want to provide similar capacity to Tachyon by leveraging hdfs RAM_DISK feature, if the user environment does not have tachyon deployed. With this feature, it potentially provides possibility to share RDD in memory across different jobs and even share with jobs other than spark, and avoid the RDD recomputation if executors crash. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-7009) Build assembly JAR via ant to avoid zip64 problems
[ https://issues.apache.org/jira/browse/SPARK-7009?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14590680#comment-14590680 ] Zhan Zhang commented on SPARK-7009: --- [~airhorns] Please refer to https://issues.apache.org/jira/browse/SPARK-6869 for solution. Now pyspark is a separate package. I guess that it is not in the assembly jar any more, thus all import fails. Build assembly JAR via ant to avoid zip64 problems -- Key: SPARK-7009 URL: https://issues.apache.org/jira/browse/SPARK-7009 Project: Spark Issue Type: Improvement Components: Build Affects Versions: 1.3.0 Environment: Java 7+ Reporter: Steve Loughran Attachments: check_spark_python.sh Original Estimate: 2h Remaining Estimate: 2h SPARK-1911 shows the problem that JDK7+ is using zip64 to build large JARs; a format incompatible with Java and pyspark. Provided the total number of .class files+resources is 64K, ant can be used to make the final JAR instead, perhaps by unzipping the maven-generated JAR then rezipping it with zip64=never, before publishing the artifact via maven. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-7009) Build assembly JAR via ant to avoid zip64 problems
[ https://issues.apache.org/jira/browse/SPARK-7009?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14589108#comment-14589108 ] Zhan Zhang commented on SPARK-7009: --- The PR may be outdated, and not working against current upstream. I have closed it. Build assembly JAR via ant to avoid zip64 problems -- Key: SPARK-7009 URL: https://issues.apache.org/jira/browse/SPARK-7009 Project: Spark Issue Type: Improvement Components: Build Affects Versions: 1.3.0 Environment: Java 7+ Reporter: Steve Loughran Original Estimate: 2h Remaining Estimate: 2h SPARK-1911 shows the problem that JDK7+ is using zip64 to build large JARs; a format incompatible with Java and pyspark. Provided the total number of .class files+resources is 64K, ant can be used to make the final JAR instead, perhaps by unzipping the maven-generated JAR then rezipping it with zip64=never, before publishing the artifact via maven. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-6112) Provide external block store support through HDFS RAM_DISK
[ https://issues.apache.org/jira/browse/SPARK-6112?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhan Zhang updated SPARK-6112: -- Summary: Provide external block store support through HDFS RAM_DISK (was: Provide OffHeap support through HDFS RAM_DISK) Provide external block store support through HDFS RAM_DISK -- Key: SPARK-6112 URL: https://issues.apache.org/jira/browse/SPARK-6112 Project: Spark Issue Type: New Feature Components: Block Manager Reporter: Zhan Zhang Attachments: SparkOffheapsupportbyHDFS.pdf HDFS Lazy_Persist policy provide possibility to cache the RDD off_heap in hdfs. We may want to provide similar capacity to Tachyon by leveraging hdfs RAM_DISK feature, if the user environment does not have tachyon deployed. With this feature, it potentially provides possibility to share RDD in memory across different jobs and even share with jobs other than spark, and avoid the RDD recomputation if executors crash. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-5111) HiveContext and Thriftserver cannot work in secure cluster beyond hadoop2.5
[ https://issues.apache.org/jira/browse/SPARK-5111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14493483#comment-14493483 ] Zhan Zhang commented on SPARK-5111: --- [~crystal_gaoyu] I am not sure. You may try to patch the spark by yourself and give it a try. HiveContext and Thriftserver cannot work in secure cluster beyond hadoop2.5 --- Key: SPARK-5111 URL: https://issues.apache.org/jira/browse/SPARK-5111 Project: Spark Issue Type: Bug Components: SQL Reporter: Zhan Zhang Due to java.lang.NoSuchFieldError: SASL_PROPS error. Need to backport some hive-0.14 fix into spark, since there is no effort to upgrade hive to 0.14 support in spark. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6479) Create off-heap block storage API (internal)
[ https://issues.apache.org/jira/browse/SPARK-6479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14486143#comment-14486143 ] Zhan Zhang commented on SPARK-6479: --- [~rxin] I updated the doc. If you think the overall design is OK, I will upload the initial patch for this JIRA soon. The patch will be just stub class, and the real migration for tachyon and new implementation for hdfs will be submit for review in spark-6112. Please let me know if you have any concern. Create off-heap block storage API (internal) Key: SPARK-6479 URL: https://issues.apache.org/jira/browse/SPARK-6479 Project: Spark Issue Type: Improvement Components: Block Manager, Spark Core Reporter: Reynold Xin Attachments: SPARK-6479.pdf, SPARK-6479OffheapAPIdesign (1).pdf, SPARK-6479OffheapAPIdesign.pdf, SparkOffheapsupportbyHDFS.pdf Would be great to create APIs for off-heap block stores, rather than doing a bunch of if statements everywhere. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-6479) Create off-heap block storage API (internal)
[ https://issues.apache.org/jira/browse/SPARK-6479?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhan Zhang updated SPARK-6479: -- Attachment: spark-6479-tachyon.patch patch with Tachyon migration. Not complete patch, as it will add too much noise for just rename tachyon to offheap. Create off-heap block storage API (internal) Key: SPARK-6479 URL: https://issues.apache.org/jira/browse/SPARK-6479 Project: Spark Issue Type: Improvement Components: Block Manager, Spark Core Reporter: Reynold Xin Attachments: SPARK-6479.pdf, SPARK-6479OffheapAPIdesign (1).pdf, SPARK-6479OffheapAPIdesign.pdf, SparkOffheapsupportbyHDFS.pdf, spark-6479-tachyon.patch Would be great to create APIs for off-heap block stores, rather than doing a bunch of if statements everywhere. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-6479) Create off-heap block storage API (internal)
[ https://issues.apache.org/jira/browse/SPARK-6479?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhan Zhang updated SPARK-6479: -- Attachment: SPARK-6479OffheapAPIdesign (1).pdf Add exception from implementation Create off-heap block storage API (internal) Key: SPARK-6479 URL: https://issues.apache.org/jira/browse/SPARK-6479 Project: Spark Issue Type: Improvement Components: Block Manager, Spark Core Reporter: Reynold Xin Attachments: SPARK-6479.pdf, SPARK-6479OffheapAPIdesign (1).pdf, SPARK-6479OffheapAPIdesign.pdf, SparkOffheapsupportbyHDFS.pdf Would be great to create APIs for off-heap block stores, rather than doing a bunch of if statements everywhere. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-2883) Spark Support for ORCFile format
[ https://issues.apache.org/jira/browse/SPARK-2883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14393141#comment-14393141 ] Zhan Zhang edited comment on SPARK-2883 at 4/2/15 7:54 PM: --- Following code demonstrate the usage of the orc support. @climberus following examples demonstrate how to use it: import org.apache.spark.sql.hive.orc._ import org.apache.spark.sql._ //schema case class AllDataTypes( stringField: String, intField: Int, longField: Long, floatField: Float, doubleField: Double, shortField: Short, byteField: Byte, booleanField: Boolean) //saveAsOrcFile val range = (0 to 255) val data = sc.parallelize(range).map(x = AllDataTypes(s$x, x, x.toLong, x.toFloat, x.toDouble, x.toShort, x.toByte, x % 2 == 0)) data.toDF().saveAsOrcFile(orcTest) //read orcFile val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) //orcFile val orcTest = hiveContext.orcFile(orcTest) orcTest.registerTempTable(orcTest) hiveContext.sql(SELECT * from orcTest where intfield185).collect.foreach(println) //new data source API, read hiveContext.sql(create temporary table orc using org.apache.spark.sql.hive.orc OPTIONS (path \orcTest\)) hiveContext.sql(select * from orc).collect.foreach(println) val table = hiveContext.sql(select * from orc) // new data source API write table.saveAsTable(table, org.apache.spark.sql.hive.orc) val hiveOrc = hiveContext.orcFile(/user/hive/warehouse/table) hiveOrc.registerTempTable(hiveOrc) hiveContext.sql(select * from hiveOrc).collect.foreach(println) table.saveAsOrcFile(/user/ambari-qa/table) hiveContext.sql(create temporary table normal_orc_as_source USING org.apache.spark.sql.hive.orc OPTIONS (path 'saveTable') as select * from table) was (Author: zzhan): Following code demonstrate the usage of the orc support. import org.apache.spark.sql.hive.orc._ import org.apache.spark.sql._ //saveAsOrcFile case class AllDataTypes( stringField: String, intField: Int, longField: Long, floatField: Float, doubleField: Double, shortField: Short, byteField: Byte, booleanField: Boolean) val range = (0 to 255) val data = sc.parallelize(range).map(x = AllDataTypes(s$x, x, x.toLong, x.toFloat, x.toDouble, x.toShort, x.toByte, x % 2 == 0)) data.toDF().saveAsOrcFile(orcTest) //read orcFile val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) val orcTest = hiveContext.orcFile(orcTest) orcTest.registerTempTable(orcTest) hiveContext.sql(SELECT * from orcTest where intfield185).collect.foreach(println) hiveContext.sql(create temporary table orc using org.apache.spark.sql.hive.orc OPTIONS (path \orcTest\)) hiveContext.sql(select * from orc).collect.foreach(println) val table = hiveContext.sql(select * from orc) table.saveAsTable(table, org.apache.spark.sql.hive.orc) val hiveOrc = hiveContext.orcFile(/user/hive/warehouse/table) hiveOrc.registerTempTable(hiveOrc) hiveContext.sql(select * from hiveOrc).collect.foreach(println) table.saveAsOrcFile(/user/ambari-qa/table) hiveContext.sql(create temporary table normal_orc_as_source USING org.apache.spark.sql.hive.orc OPTIONS (path 'saveTable') as select * from table) Spark Support for ORCFile format Key: SPARK-2883 URL: https://issues.apache.org/jira/browse/SPARK-2883 Project: Spark Issue Type: Bug Components: Input/Output, SQL Reporter: Zhan Zhang Priority: Blocker Attachments: 2014-09-12 07.05.24 pm Spark UI.png, 2014-09-12 07.07.19 pm jobtracker.png, orc.diff Verify the support of OrcInputFormat in spark, fix issues if exists and add documentation of its usage. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-6479) Create off-heap block storage API (internal)
[ https://issues.apache.org/jira/browse/SPARK-6479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14393590#comment-14393590 ] Zhan Zhang edited comment on SPARK-6479 at 4/2/15 10:24 PM: [~rxin] Thanks for the feedback. I updated the document with the basic design and example on handling failure case thrown by underlying implementation. Basically, the shim layer handles all the exceptions/failure cases and return expected failure value to upper layer, similar to below example. override def getBytes(blockId: BlockId): Option[ByteBuffer] = { try { offHeapManager.flatMap(_.getBytes(blockId)) } catch { case _ =logError(serror in getBytes from $blockId) None } } was (Author: zzhan): [~rxin] Thanks for the feedback. I updated the document with the basic design and example on handling failure case thrown by underlying implementation. Basically, the shim layer handles all the exceptions/failure cases and return expected failure value to upper layer, similar to below example. override def getBytes(blockId: BlockId): Option[ByteBuffer] = { try { offHeapManager.flatMap(_.getBytes(blockId)) } catch { case _ =logError(serror in getBytes from $blockId) None } } Create off-heap block storage API (internal) Key: SPARK-6479 URL: https://issues.apache.org/jira/browse/SPARK-6479 Project: Spark Issue Type: Improvement Components: Block Manager, Spark Core Reporter: Reynold Xin Attachments: SPARK-6479.pdf, SPARK-6479OffheapAPIdesign.pdf, SparkOffheapsupportbyHDFS.pdf Would be great to create APIs for off-heap block stores, rather than doing a bunch of if statements everywhere. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-2883) Spark Support for ORCFile format
[ https://issues.apache.org/jira/browse/SPARK-2883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14393141#comment-14393141 ] Zhan Zhang edited comment on SPARK-2883 at 4/2/15 7:54 PM: --- Following code demonstrate the usage of the orc support. import org.apache.spark.sql.hive.orc._ import org.apache.spark.sql._ //schema case class AllDataTypes( stringField: String, intField: Int, longField: Long, floatField: Float, doubleField: Double, shortField: Short, byteField: Byte, booleanField: Boolean) //saveAsOrcFile val range = (0 to 255) val data = sc.parallelize(range).map(x = AllDataTypes(s$x, x, x.toLong, x.toFloat, x.toDouble, x.toShort, x.toByte, x % 2 == 0)) data.toDF().saveAsOrcFile(orcTest) //read orcFile val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) //orcFile val orcTest = hiveContext.orcFile(orcTest) orcTest.registerTempTable(orcTest) hiveContext.sql(SELECT * from orcTest where intfield185).collect.foreach(println) //new data source API, read hiveContext.sql(create temporary table orc using org.apache.spark.sql.hive.orc OPTIONS (path \orcTest\)) hiveContext.sql(select * from orc).collect.foreach(println) val table = hiveContext.sql(select * from orc) // new data source API write table.saveAsTable(table, org.apache.spark.sql.hive.orc) val hiveOrc = hiveContext.orcFile(/user/hive/warehouse/table) hiveOrc.registerTempTable(hiveOrc) hiveContext.sql(select * from hiveOrc).collect.foreach(println) table.saveAsOrcFile(/user/ambari-qa/table) hiveContext.sql(create temporary table normal_orc_as_source USING org.apache.spark.sql.hive.orc OPTIONS (path 'saveTable') as select * from table) was (Author: zzhan): Following code demonstrate the usage of the orc support. @climberus following examples demonstrate how to use it: import org.apache.spark.sql.hive.orc._ import org.apache.spark.sql._ //schema case class AllDataTypes( stringField: String, intField: Int, longField: Long, floatField: Float, doubleField: Double, shortField: Short, byteField: Byte, booleanField: Boolean) //saveAsOrcFile val range = (0 to 255) val data = sc.parallelize(range).map(x = AllDataTypes(s$x, x, x.toLong, x.toFloat, x.toDouble, x.toShort, x.toByte, x % 2 == 0)) data.toDF().saveAsOrcFile(orcTest) //read orcFile val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) //orcFile val orcTest = hiveContext.orcFile(orcTest) orcTest.registerTempTable(orcTest) hiveContext.sql(SELECT * from orcTest where intfield185).collect.foreach(println) //new data source API, read hiveContext.sql(create temporary table orc using org.apache.spark.sql.hive.orc OPTIONS (path \orcTest\)) hiveContext.sql(select * from orc).collect.foreach(println) val table = hiveContext.sql(select * from orc) // new data source API write table.saveAsTable(table, org.apache.spark.sql.hive.orc) val hiveOrc = hiveContext.orcFile(/user/hive/warehouse/table) hiveOrc.registerTempTable(hiveOrc) hiveContext.sql(select * from hiveOrc).collect.foreach(println) table.saveAsOrcFile(/user/ambari-qa/table) hiveContext.sql(create temporary table normal_orc_as_source USING org.apache.spark.sql.hive.orc OPTIONS (path 'saveTable') as select * from table) Spark Support for ORCFile format Key: SPARK-2883 URL: https://issues.apache.org/jira/browse/SPARK-2883 Project: Spark Issue Type: Bug Components: Input/Output, SQL Reporter: Zhan Zhang Priority: Blocker Attachments: 2014-09-12 07.05.24 pm Spark UI.png, 2014-09-12 07.07.19 pm jobtracker.png, orc.diff Verify the support of OrcInputFormat in spark, fix issues if exists and add documentation of its usage. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-6479) Create off-heap block storage API (internal)
[ https://issues.apache.org/jira/browse/SPARK-6479?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhan Zhang updated SPARK-6479: -- Attachment: SPARK-6479OffheapAPIdesign.pdf Add failure case handling overall design and example. Create off-heap block storage API (internal) Key: SPARK-6479 URL: https://issues.apache.org/jira/browse/SPARK-6479 Project: Spark Issue Type: Improvement Components: Block Manager, Spark Core Reporter: Reynold Xin Attachments: SPARK-6479.pdf, SPARK-6479OffheapAPIdesign.pdf, SparkOffheapsupportbyHDFS.pdf Would be great to create APIs for off-heap block stores, rather than doing a bunch of if statements everywhere. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6112) Provide OffHeap support through HDFS RAM_DISK
[ https://issues.apache.org/jira/browse/SPARK-6112?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14393287#comment-14393287 ] Zhan Zhang commented on SPARK-6112: --- Design spec for API attached to SPARK-6479 and wait for the community feedback. Provide OffHeap support through HDFS RAM_DISK - Key: SPARK-6112 URL: https://issues.apache.org/jira/browse/SPARK-6112 Project: Spark Issue Type: New Feature Components: Block Manager Reporter: Zhan Zhang Attachments: SparkOffheapsupportbyHDFS.pdf HDFS Lazy_Persist policy provide possibility to cache the RDD off_heap in hdfs. We may want to provide similar capacity to Tachyon by leveraging hdfs RAM_DISK feature, if the user environment does not have tachyon deployed. With this feature, it potentially provides possibility to share RDD in memory across different jobs and even share with jobs other than spark, and avoid the RDD recomputation if executors crash. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-6479) Create off-heap block storage API (internal)
[ https://issues.apache.org/jira/browse/SPARK-6479?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhan Zhang updated SPARK-6479: -- Attachment: SPARK-6479.pdf This is the updated version for offheap store internal api design. Create off-heap block storage API (internal) Key: SPARK-6479 URL: https://issues.apache.org/jira/browse/SPARK-6479 Project: Spark Issue Type: Improvement Components: Block Manager, Spark Core Reporter: Reynold Xin Attachments: SPARK-6479.pdf, SparkOffheapsupportbyHDFS.pdf Would be great to create APIs for off-heap block stores, rather than doing a bunch of if statements everywhere. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-6479) Create off-heap block storage API (internal)
[ https://issues.apache.org/jira/browse/SPARK-6479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14393590#comment-14393590 ] Zhan Zhang edited comment on SPARK-6479 at 4/2/15 10:23 PM: [~rxin] Thanks for the feedback. I updated the document with the basic design and example on handling failure case thrown by underlying implementation. Basically, the shim layer handles all the exceptions/failure cases and return expected failure value to upper layer, similar to below example. override def getBytes(blockId: BlockId): Option[ByteBuffer] = { try { offHeapManager.flatMap(_.getBytes(blockId)) } catch { case _ =logError(serror in getBytes from $blockId) None } } was (Author: zzhan): [~rxin] Thanks for the feedback. I updated the document with the basic design and example on handling failure case thrown by underlying implementation. Basically, the shim layer handles all the exceptions/failure cases and return expected failure value to upper layer, similar to below example. override def getBytes(blockId: BlockId): Option[ByteBuffer] = { try { offHeapManager.flatMap(_.getBytes(blockId)) } catch { case _ =logError(serror in getBytes from $blockId) None } } Create off-heap block storage API (internal) Key: SPARK-6479 URL: https://issues.apache.org/jira/browse/SPARK-6479 Project: Spark Issue Type: Improvement Components: Block Manager, Spark Core Reporter: Reynold Xin Attachments: SPARK-6479.pdf, SPARK-6479OffheapAPIdesign.pdf, SparkOffheapsupportbyHDFS.pdf Would be great to create APIs for off-heap block stores, rather than doing a bunch of if statements everywhere. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6479) Create off-heap block storage API (internal)
[ https://issues.apache.org/jira/browse/SPARK-6479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14393590#comment-14393590 ] Zhan Zhang commented on SPARK-6479: --- [~rxin] Thanks for the feedback. I updated the document with the basic design and example on handling failure case thrown by underlying implementation. Basically, the shim layer handles all the exceptions/failure cases and return expected failure value to upper layer, similar to below example. override def getBytes(blockId: BlockId): Option[ByteBuffer] = { try { offHeapManager.flatMap(_.getBytes(blockId)) } catch { case _ =logError(serror in getBytes from $blockId) None } } Create off-heap block storage API (internal) Key: SPARK-6479 URL: https://issues.apache.org/jira/browse/SPARK-6479 Project: Spark Issue Type: Improvement Components: Block Manager, Spark Core Reporter: Reynold Xin Attachments: SPARK-6479.pdf, SPARK-6479OffheapAPIdesign.pdf, SparkOffheapsupportbyHDFS.pdf Would be great to create APIs for off-heap block stores, rather than doing a bunch of if statements everywhere. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-2883) Spark Support for ORCFile format
[ https://issues.apache.org/jira/browse/SPARK-2883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14393141#comment-14393141 ] Zhan Zhang commented on SPARK-2883: --- Following code demonstrate the usage of the orc support. import org.apache.spark.sql.hive.orc._ import org.apache.spark.sql._ //saveAsOrcFile case class AllDataTypes( stringField: String, intField: Int, longField: Long, floatField: Float, doubleField: Double, shortField: Short, byteField: Byte, booleanField: Boolean) val range = (0 to 255) val data = sc.parallelize(range).map(x = AllDataTypes(s$x, x, x.toLong, x.toFloat, x.toDouble, x.toShort, x.toByte, x % 2 == 0)) data.toDF().saveAsOrcFile(orcTest) //read orcFile val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) val orcTest = hiveContext.orcFile(orcTest) orcTest.registerTempTable(orcTest) hiveContext.sql(SELECT * from orcTest where intfield185).collect.foreach(println) hiveContext.sql(create temporary table orc using org.apache.spark.sql.hive.orc OPTIONS (path \orcTest\)) hiveContext.sql(select * from orc).collect.foreach(println) val table = hiveContext.sql(select * from orc) table.saveAsTable(table, org.apache.spark.sql.hive.orc) val hiveOrc = hiveContext.orcFile(/user/hive/warehouse/table) hiveOrc.registerTempTable(hiveOrc) hiveContext.sql(select * from hiveOrc).collect.foreach(println) table.saveAsOrcFile(/user/ambari-qa/table) hiveContext.sql(create temporary table normal_orc_as_source USING org.apache.spark.sql.hive.orc OPTIONS (path 'saveTable') as select * from table) Spark Support for ORCFile format Key: SPARK-2883 URL: https://issues.apache.org/jira/browse/SPARK-2883 Project: Spark Issue Type: Bug Components: Input/Output, SQL Reporter: Zhan Zhang Priority: Blocker Attachments: 2014-09-12 07.05.24 pm Spark UI.png, 2014-09-12 07.07.19 pm jobtracker.png, orc.diff Verify the support of OrcInputFormat in spark, fix issues if exists and add documentation of its usage. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3720) support ORC in spark sql
[ https://issues.apache.org/jira/browse/SPARK-3720?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14393146#comment-14393146 ] Zhan Zhang commented on SPARK-3720: --- [~iward] I have update the patch with new api support. you can refer to https://issues.apache.org/jira/browse/SPARK-2883 support ORC in spark sql Key: SPARK-3720 URL: https://issues.apache.org/jira/browse/SPARK-3720 Project: Spark Issue Type: New Feature Components: SQL Affects Versions: 1.1.0 Reporter: Fei Wang Attachments: orc.diff The Optimized Row Columnar (ORC) file format provides a highly efficient way to store data on hdfs.ORC file format has many advantages such as: 1 a single file as the output of each task, which reduces the NameNode's load 2 Hive type support including datetime, decimal, and the complex types (struct, list, map, and union) 3 light-weight indexes stored within the file skip row groups that don't pass predicate filtering seek to a given row 4 block-mode compression based on data type run-length encoding for integer columns dictionary encoding for string columns 5 concurrent reads of the same file using separate RecordReaders 6 ability to split files without scanning for markers 7 bound the amount of memory needed for reading or writing 8 metadata stored using Protocol Buffers, which allows addition and removal of fields Now spark sql support Parquet, support ORC provide people more opts. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6479) Create off-heap block storage API (internal)
[ https://issues.apache.org/jira/browse/SPARK-6479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14384872#comment-14384872 ] Zhan Zhang commented on SPARK-6479: --- I have a short version for this API and will post it next week. [~ste...@apache.org] Please help take a look to see whether you can make it more formal afterwards. Create off-heap block storage API (internal) Key: SPARK-6479 URL: https://issues.apache.org/jira/browse/SPARK-6479 Project: Spark Issue Type: Improvement Components: Block Manager, Spark Core Reporter: Reynold Xin Attachments: SparkOffheapsupportbyHDFS.pdf Would be great to create APIs for off-heap block stores, rather than doing a bunch of if statements everywhere. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3720) support ORC in spark sql
[ https://issues.apache.org/jira/browse/SPARK-3720?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14376361#comment-14376361 ] Zhan Zhang commented on SPARK-3720: --- [~iward] Since this jiar is duplicated to Spark-2883, we can move the discussion to that one. In short, Sparkl-2883 wants to support saveAsOrcFile and orcFile similar to parquet file support in spark. But due to the underlying api change, the patch is delayed. support ORC in spark sql Key: SPARK-3720 URL: https://issues.apache.org/jira/browse/SPARK-3720 Project: Spark Issue Type: New Feature Components: SQL Affects Versions: 1.1.0 Reporter: Fei Wang Attachments: orc.diff The Optimized Row Columnar (ORC) file format provides a highly efficient way to store data on hdfs.ORC file format has many advantages such as: 1 a single file as the output of each task, which reduces the NameNode's load 2 Hive type support including datetime, decimal, and the complex types (struct, list, map, and union) 3 light-weight indexes stored within the file skip row groups that don't pass predicate filtering seek to a given row 4 block-mode compression based on data type run-length encoding for integer columns dictionary encoding for string columns 5 concurrent reads of the same file using separate RecordReaders 6 ability to split files without scanning for markers 7 bound the amount of memory needed for reading or writing 8 metadata stored using Protocol Buffers, which allows addition and removal of fields Now spark sql support Parquet, support ORC provide people more opts. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-6112) Provide OffHeap support through HDFS RAM_DISK
[ https://issues.apache.org/jira/browse/SPARK-6112?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhan Zhang updated SPARK-6112: -- Attachment: SparkOffheapsupportbyHDFS.pdf Design doc for hdfs offheap support Provide OffHeap support through HDFS RAM_DISK - Key: SPARK-6112 URL: https://issues.apache.org/jira/browse/SPARK-6112 Project: Spark Issue Type: New Feature Components: Block Manager Reporter: Zhan Zhang Attachments: SparkOffheapsupportbyHDFS.pdf HDFS Lazy_Persist policy provide possibility to cache the RDD off_heap in hdfs. We may want to provide similar capacity to Tachyon by leveraging hdfs RAM_DISK feature, if the user environment does not have tachyon deployed. With this feature, it potentially provides possibility to share RDD in memory across different jobs and even share with jobs other than spark, and avoid the RDD recomputation if executors crash. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-6479) Create off-heap block storage API (internal)
[ https://issues.apache.org/jira/browse/SPARK-6479?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhan Zhang updated SPARK-6479: -- Attachment: SparkOffheapsupportbyHDFS.pdf The design doc also includes stuff from SPARK-6112 Create off-heap block storage API (internal) Key: SPARK-6479 URL: https://issues.apache.org/jira/browse/SPARK-6479 Project: Spark Issue Type: Improvement Components: Block Manager, Spark Core Reporter: Reynold Xin Attachments: SparkOffheapsupportbyHDFS.pdf Would be great to create APIs for off-heap block stores, rather than doing a bunch of if statements everywhere. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6479) Create off-heap block storage API (internal)
[ https://issues.apache.org/jira/browse/SPARK-6479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14376978#comment-14376978 ] Zhan Zhang commented on SPARK-6479: --- The current API may not be good enough as it has some redundant interface and mainly for POC to make HDFS work. After migrating Tachyon to these new APIs, we should have a better version. I will update the doc later. But I do agree that unify offheap api should be separated JIRA, and we can start from here. Create off-heap block storage API (internal) Key: SPARK-6479 URL: https://issues.apache.org/jira/browse/SPARK-6479 Project: Spark Issue Type: Improvement Components: Block Manager, Spark Core Reporter: Reynold Xin Attachments: SparkOffheapsupportbyHDFS.pdf Would be great to create APIs for off-heap block stores, rather than doing a bunch of if statements everywhere. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-6112) Provide OffHeap support through HDFS RAM_DISK
[ https://issues.apache.org/jira/browse/SPARK-6112?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhan Zhang updated SPARK-6112: -- Summary: Provide OffHeap support through HDFS RAM_DISK (was: Leverage HDFS RAM_DISK capacity to provide off_heap feature similar to Tachyon ) Provide OffHeap support through HDFS RAM_DISK - Key: SPARK-6112 URL: https://issues.apache.org/jira/browse/SPARK-6112 Project: Spark Issue Type: New Feature Components: Block Manager Reporter: Zhan Zhang HDFS Lazy_Persist policy provide possibility to cache the RDD off_heap in hdfs. We may want to provide similar capacity to Tachyon by leveraging hdfs RAM_DISK feature, if the user environment does not have tachyon deployed. With this feature, it potentially provides possibility to share RDD in memory across different jobs and even share with jobs other than spark, and avoid the RDD recomputation if executors crash. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org