[jira] [Comment Edited] (SPARK-15687) Columnar execution engine
[ https://issues.apache.org/jira/browse/SPARK-15687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15481268#comment-15481268 ] Kiran Lonikar edited comment on SPARK-15687 at 9/11/16 7:29 AM: Evan Chan, I agree. It will then be possible to offer off-heap (sun.misc.Unsafe), ByteBuffer or even memory mapped files (mmap) based implementation for taking advantage of NVRAM memory based systems. Finally, it will be possible to use directly GPU RAM based implementation. was (Author: klonikar): I agree. It will then be possible to offer off-heap (sun.misc.Unsafe), ByteBuffer or even memory mapped files (mmap) based implementation for taking advantage of NVRAM memory based systems. Finally, it will be possible to use directly GPU RAM based implementation. > Columnar execution engine > - > > Key: SPARK-15687 > URL: https://issues.apache.org/jira/browse/SPARK-15687 > Project: Spark > Issue Type: New Feature > Components: SQL >Reporter: Reynold Xin >Priority: Critical > > This ticket tracks progress in making the entire engine columnar, especially > in the context of nested data type support. > In Spark 2.0, we have used the internal column batch interface in Parquet > reading (via a vectorized Parquet decoder) and low cardinality aggregation. > Other parts of the engine are already using whole-stage code generation, > which is in many ways more efficient than a columnar execution engine for > flat data types. > The goal here is to figure out a story to work towards making column batch > the common data exchange format between operators outside whole-stage code > generation, as well as with external systems (e.g. Pandas). > Some of the important questions to answer are: > From the architectural perspective: > - What is the end state architecture? > - Should aggregation be columnar? > - Should sorting be columnar? > - How do we encode nested data? What are the operations on nested data, and > how do we handle these operations in a columnar format? > - What is the transition plan towards the end state? > From an external API perspective: > - Can we expose a more efficient column batch user-defined function API? > - How do we leverage this to integrate with 3rd party tools? > - Can we have a spec for a fixed version of the column batch format that can > be externalized and use that in data source API v2? -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-15687) Columnar execution engine
[ https://issues.apache.org/jira/browse/SPARK-15687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15376999#comment-15376999 ] Kazuaki Ishizaki edited comment on SPARK-15687 at 7/14/16 2:25 PM: --- It would be good to introduce trait for {{iterator[T]}}. According to my experiment in my [WIP|https://github.com/apache/spark/pull/13171/files#diff-28cb12941b992ff680c277c651b59aa0R445], the following external APIs are necessary: {code:java} def numColumns: Integer def column(column: Integer): org.apache.spark.sql.execution.vectorized.ColumnVector def numRows: Integer def hasNextRow: Boolean {code} For a column structure, since {{ColumnVector}} is declared as {{abstract}}, we can have different implementations. Actually, I implemented {{ByteByfferColumnVector}} to wrap {{CachedBatch}}. Is there any good implementation for new columnar storage by using trait/interface instead of abstract? I am interested in its advantage. was (Author: kiszk): It would be good to introduce trait for {{iterator[T]}}. According to my experiment in my [WIP|https://github.com/apache/spark/pull/13171/files#diff-28cb12941b992ff680c277c651b59aa0R445], the following external APIs are necessary: {{ def numColumns: Integer def column(column: Integer): org.apache.spark.sql.execution.vectorized.ColumnVector def numRows: Integer def hasNextRow: Boolean }} For a column structure, since {{ColumnVector}} is declared as {{abstract}}, we can have different implementations. Actually, I implemented {{ByteByfferColumnVector}} to wrap {{CachedBatch}}. Is there any good implementation for new columnar storage by using trait/interface instead of abstract? I am interested in its advantage. > Columnar execution engine > - > > Key: SPARK-15687 > URL: https://issues.apache.org/jira/browse/SPARK-15687 > Project: Spark > Issue Type: New Feature > Components: SQL >Reporter: Reynold Xin >Priority: Critical > > This ticket tracks progress in making the entire engine columnar, especially > in the context of nested data type support. > In Spark 2.0, we have used the internal column batch interface in Parquet > reading (via a vectorized Parquet decoder) and low cardinality aggregation. > Other parts of the engine are already using whole-stage code generation, > which is in many ways more efficient than a columnar execution engine for > flat data types. > The goal here is to figure out a story to work towards making column batch > the common data exchange format between operators outside whole-stage code > generation, as well as with external systems (e.g. Pandas). > Some of the important questions to answer are: > From the architectural perspective: > - What is the end state architecture? > - Should aggregation be columnar? > - Should sorting be columnar? > - How do we encode nested data? What are the operations on nested data, and > how do we handle these operations in a columnar format? > - What is the transition plan towards the end state? > From an external API perspective: > - Can we expose a more efficient column batch user-defined function API? > - How do we leverage this to integrate with 3rd party tools? > - Can we have a spec for a fixed version of the column batch format that can > be externalized and use that in data source API v2? -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-15687) Columnar execution engine
[ https://issues.apache.org/jira/browse/SPARK-15687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15376418#comment-15376418 ] Evan Chan edited comment on SPARK-15687 at 7/14/16 6:41 AM: [~rxin] I like the idea of an Iterator[ColumnBatch]. I would highly suggest that we make the ColumnBatch and ColumnVector an interface/trait, so that we can support different implementations. This can encapsulate the ColumnarBatch etc. used in Parquet reader, but also allow in the future other implementations and columnar sources to take advantage. This trait would offer common methods, such as def numRows(): Int def atRow(i: Int): A def definedAt(i: Int): Boolean etc. I definitely would not want to create my own version of spark just to work with my own columnar format, which is what I have to do right now :-p was (Author: velvia): [~rxin] I like the idea of an Iterator[ColumnBatch]. I would highly suggest that we make the ColumnBatch and ColumnVector an interface/trait, so that we can support different implementations. This can encapsulate the ColumnarBatch etc. used in Parquet reader, but also allow in the future other implementations and columnar sources to take advantage. This trait would offer common methods, such as def numRows(): Int def atRow(i: Int): A def definedAt(i: Int): Boolean etc. > Columnar execution engine > - > > Key: SPARK-15687 > URL: https://issues.apache.org/jira/browse/SPARK-15687 > Project: Spark > Issue Type: New Feature > Components: SQL >Reporter: Reynold Xin >Priority: Critical > > This ticket tracks progress in making the entire engine columnar, especially > in the context of nested data type support. > In Spark 2.0, we have used the internal column batch interface in Parquet > reading (via a vectorized Parquet decoder) and low cardinality aggregation. > Other parts of the engine are already using whole-stage code generation, > which is in many ways more efficient than a columnar execution engine for > flat data types. > The goal here is to figure out a story to work towards making column batch > the common data exchange format between operators outside whole-stage code > generation, as well as with external systems (e.g. Pandas). > Some of the important questions to answer are: > From the architectural perspective: > - What is the end state architecture? > - Should aggregation be columnar? > - Should sorting be columnar? > - How do we encode nested data? What are the operations on nested data, and > how do we handle these operations in a columnar format? > - What is the transition plan towards the end state? > From an external API perspective: > - Can we expose a more efficient column batch user-defined function API? > - How do we leverage this to integrate with 3rd party tools? > - Can we have a spec for a fixed version of the column batch format that can > be externalized and use that in data source API v2? -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-15687) Columnar execution engine
[ https://issues.apache.org/jira/browse/SPARK-15687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15316858#comment-15316858 ] Kazuaki Ishizaki edited comment on SPARK-15687 at 6/6/16 5:49 PM: -- Thank you for your answers * How we pass columnar format among operators? Currently, we use Iterater(Row) to pass data between operators. ** We can change it to pass Iterator *** In my WIP (SPARK-15380), I introduced [a new iterator|https://github.com/apache/spark/pull/13171/files#diff-28cb12941b992ff680c277c651b59aa0R445]. However, this iterator is not used as an iterator for columnar storage as [here|https://github.com/apache/spark/pull/13171/files#diff-e4d7c2fc195fa8c801145928115cdcd0R178]. I need more information beyond a vanilla iterator for an effective accesses to an columnar storage. I would like to investigate what information we actually need thru additional implementations on columnar storage. * Who decides which (columnar or row-oriented) data format? Logical planner, Physical planner, or others? ** It's a good question. I'd think this is just something the physical layer should be responsible for, since it is about physical layout. *** Good to hear your thought. We will see how we can decide at {{PhysicalPlan}} * Will we use Apache Arrow format as an internal format? ** Arrow seems too early right now and it's unlikely we'd want to depend our internal format on an external project. We can however make the format close to it so it would be easy to integrate. *** I see. According to my experience, to use ```ColumnVector``` can abstract physical layout of a columnar storage. We can use our internal format, and also use Arrow format if data is read from external Arrow storage. * We have two internal columnar formats: ColumnarBatch and CachedBatch. Will we integrate these two into one? ** Yes - possibly keeping just ColumnBatch. *** Great to hear this. I like this idea. I have one terrible experience that I want to share. When I add an field whose type is {{Array[DataType]}} into {{CachedBatch}}, it causes performance degradation. This is because {{SizeEstimator.estimate()}}, which spends longer time for this {{CachedBatch}}, is launched by {{+=}} at [this statement|https://github.com/apache/spark/blob/4a6e78abd9d5edc4a5092738dff0006bbe202a89/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala#L214] when {{values.next()}} returns {{iterator[CachedBatch]}}. We may have to avoid this situation when we change {{CachedBatch}}. was (Author: kiszk): Thank you for your answers * How we pass columnar format among operators? Currently, we use Iterater(Row) to pass data between operators. ** We can change it to pass Iterator *** In my WIP (SPARK-15380), I introduced [a new iterator|https://github.com/apache/spark/pull/13171/files#diff-28cb12941b992ff680c277c651b59aa0R445]. However, this iterator is not used as an iterator for columnar storage as [here|https://github.com/apache/spark/pull/13171/files#diff-e4d7c2fc195fa8c801145928115cdcd0R178]. I need more information beyond a vanilla iterator for an effective accesses to an columnar storage. I would like to investigate what information we actually need thru additional implementations on columnar storage. * Who decides which (columnar or row-oriented) data format? Logical planner, Physical planner, or others? ** It's a good question. I'd think this is just something the physical layer should be responsible for, since it is about physical layout. *** Good to hear your thought. We will see how we can decide at {{PhysicalPlan}} * Will we use Apache Arrow format as an internal format? ** Arrow seems too early right now and it's unlikely we'd want to depend our internal format on an external project. We can however make the format close to it so it would be easy to integrate. *** I see. According to my experience, to use ```ColumnVector``` can abstract physical layout of a columnar storage. We can use our internal format, and also use Arrow format if data is read from external Arrow storage. * We have two internal columnar formats: ColumnarBatch and CachedBatch. Will we integrate these two into one? ** Yes - possibly keeping just ColumnBatch. *** Great to hear this. I like this idea. I have one terrible experience that I want to share. When I add an field whose type is {{Array[DataType]}} into {{CachedBatch}}}, it causes performance degradation. This is because {{SizeEstimator.estimate()}}, which spends longer time for this {{CachedBatch}}, is launched by {{+=}} at [this statement|https://github.com/apache/spark/blob/4a6e78abd9d5edc4a5092738dff0006bbe202a89/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala#L214] when {{values.next()}} returns {{interator[CachedBatch]}. We may have to avoid this situation when we change {{CachedBatch}}. * * > Columnar execution engine >