Hi,

I have been trying to implement InMemoryRelation based on spark
ColumnarBatches, so far I have not been able to store the vectorised
columnarbatch into the relation. Is there a way to achieve this without
going with an intermediary representation like Arrow, so as to enable spark
to do fast columnar aggregations in memory. The code so far, using just the
high level APIs is as follows -

```
      //Load csv into Datafram
      val csvDF: DataFrame = context.sqlctx.read
        .format("com.databricks.spark.csv")
        .option("header", "true")
        .option("inferSchema", "true")
        .load(csvFile)

      //Create in memory relation using schema from csv dataframe
      val relation = InMemoryRelation(
        useCompression = true,
        batchSize = 100,
        storageLevel = StorageLevel.MEMORY_ONLY,
        child = csvDF.queryExecution.sparkPlan, //Do I need to alter this
to suggest columnar plans?
        tableName = Some("nyc_taxi"),
        optimizedPlan = csvDF.queryExecution.optimizedPlan
      )

      //create vectorized columnar batches
      val rows = csvDF.collect()
      import scala.collection.JavaConverters._
      val vectorizedRows: ColumnarBatch =
ColumnVectorUtils.toBatch(csvDF.schema, MemoryMode.ON_HEAP,
rows.iterator.asJava)

      //store the vectorized rows in the relation
      //relation.store(vectorizedRows)
```

Obviously the last line is the one which is not an API. Need help to
understand if this approach can work and if it does, need help and pointers
in trying to come up with how to implement this API using low level spark
constructs.

Thanks and Regards,
Praveen

Reply via email to