Do you have some initial perf numbers? It seems fine to me to remain row-based inside Spark with whole-stage-codegen, and convert rows to columnar batches when communicating with external systems.
On Mon, Mar 25, 2019 at 1:05 PM Bobby Evans <bo...@apache.org> wrote: > This thread is to discuss adding in support for data frame processing > using an in-memory columnar format compatible with Apache Arrow. My main > goal in this is to lay the groundwork so we can add in support for GPU > accelerated processing of data frames, but this feature has a number of > other benefits. Spark currently supports Apache Arrow formatted data as an > option to exchange data with python for pandas UDF processing. There has > also been discussion around extending this to allow for exchanging data > with other tools like pytorch, tensorflow, xgboost,... If Spark supports > processing on Arrow compatible data it could eliminate the > serialization/deserialization overhead when going between these systems. > It also would allow for doing optimizations on a CPU with SIMD instructions > similar to what Hive currently supports. Accelerated processing using a GPU > is something that we will start a separate discussion thread on, but I > wanted to set the context a bit. > > Jason Lowe, Tom Graves, and I created a prototype over the past few months > to try and understand how to make this work. What we are proposing is > based off of lessons learned when building this prototype, but we really > wanted to get feedback early on from the community. We will file a SPIP > once we can get agreement that this is a good direction to go in. > > The current support for columnar processing lets a Parquet or Orc file > format return a ColumnarBatch inside an RDD[InternalRow] using Scala’s type > erasure. The code generation is aware that the RDD actually holds > ColumnarBatchs and generates code to loop through the data in each batch as > InternalRows. > > Instead, we propose a new set of APIs to work on an > RDD[InternalColumnarBatch] instead of abusing type erasure. With this we > propose adding in a Rule similar to how WholeStageCodeGen currently works. > Each part of the physical SparkPlan would expose columnar support through a > combination of traits and method calls. The rule would then decide when > columnar processing would start and when it would end. Switching between > columnar and row based processing is not free, so the rule would make a > decision based off of an estimate of the cost to do the transformation and > the estimated speedup in processing time. > > This should allow us to disable columnar support by simply disabling the > rule that modifies the physical SparkPlan. It should be minimal risk to > the existing row-based code path, as that code should not be touched, and > in many cases could be reused to implement the columnar version. This also > allows for small easily manageable patches. No huge patches that no one > wants to review. > > As far as the memory layout is concerned OnHeapColumnVector and > OffHeapColumnVector are already really close to being Apache Arrow > compatible so shifting them over would be a relatively simple change. > Alternatively we could add in a new implementation that is Arrow compatible > if there are reasons to keep the old ones. > > Again this is just to get the discussion started, any feedback is welcome, > and we will file a SPIP on it once we feel like the major changes we are > proposing are acceptable. > > Thanks, > > Bobby Evans >