Re: External Data Source in Spark

2015-03-05 Thread Michael Armbrust

 Currently we have implemented  External Data Source API and are able to
 push filters and projections.

 Could you provide some info on how perhaps the joins could be pushed to
 the original Data Source if both the data sources are from same database

First a disclaimer: This is an experimental API that exposes internals that
are likely to change in between different Spark releases.  As a result,
most datasources should be written against the stable public API in
We expose this mostly to get feedback on what optimizations we should add
to the stable API in order to get the best performance out of data sources.

We'll start with a simple artificial data source that just returns ranges
of consecutive integers.

/** A data source that returns ranges of consecutive integers in a
column named `a`. */case class SimpleRelation(
start: Int,
end: Int)(
@transient val sqlContext: SQLContext)
  extends BaseRelation with TableScan {

  val schema = StructType(' :: Nil)
  def buildScan() = sqlContext.sparkContext.parallelize(start to

Given this we can create tables:


However, doing a join is pretty slow since we need to shuffle the big table
around for no reason:

sql(SELECT * FROM smallTable s JOIN bigTable b ON s.a = b.a).collect()
res3: Array[org.apache.spark.sql.Row] = Array([1,1])

This takes about 10 seconds on my cluster.  Clearly we can do better.  So
let's define special physical operators for the case when we are inner
joining two of these relations using equality. One will handle the case
when there is no overlap and the other when there is.  Physical operators
must extend SparkPlan and must return an RDD[Row] containing the answer
when execute() is called.

import org.apache.spark.sql.catalyst.expressions.{Attribute,
EqualTo}import org.apache.spark.sql.catalyst.plans._import
/** A join that just returns the pre-calculated overlap of two ranges
of consecutive integers. */case class OverlappingRangeJoin(leftOutput:
Attribute, rightOutput: Attribute, start: Int, end: Int) extends
SparkPlan {
  def output: Seq[Attribute] = leftOutput :: rightOutput :: Nil

  def execute(): org.apache.spark.rdd.RDD[Row] = {
sqlContext.sparkContext.parallelize(start to end).map(i = Row(i, i))

  def children: Seq[SparkPlan] = Nil
/** Used when a join is known to produce no results. */case class
EmptyJoin(output: Seq[Attribute]) extends SparkPlan {
  def execute(): org.apache.spark.rdd.RDD[Row] = {

  def children: Seq[SparkPlan] = Nil
/** Finds cases where two sets of consecutive integer ranges are inner
joined on equality. */object SmartSimpleJoin extends Strategy with
Serializable {
  def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
// Find inner joins between two SimpleRelations where the
condition is equality.
case Join(l @ LogicalRelation(left: SimpleRelation), r @
LogicalRelation(right: SimpleRelation), Inner, Some(EqualTo(a, b))) =
  // Check if the join condition is comparing `a` from each relation.
  if (a == l.output.head  b == r.output.head || a ==
r.output.head  b == l.output.head) {
if ((left.start = right.end)  (left.end = right.start)) {
math.max(left.start, right.start),
math.min(left.end, right.end)) :: Nil
} else {
  // Ranges don't overlap, join will be empty
  EmptyJoin(l.output.head :: r.output.head :: Nil) :: Nil
  } else {
// Join isn't between the the columns output...
// Let's just let the query planner handle this.
case _ = Nil // Return an empty list if we don't know how to
handle this plan.

We can then add these strategies to the query planner through the
experimental hook.  Added strategies take precedence over built-in ones.

// Add the strategy to the query planner.
sqlContext.experimental.extraStrategies = SmartSimpleJoin :: Nil

sql(SELECT * FROM smallTable s JOIN bigTable b ON s.a = b.a).collect()
res4: Array[org.apache.spark.sql.Row] = Array([1,1])

Now our join returns in  1 second.  For more advanced matching of joins
and their conditions you should look at the patterns that are available,
and the built-in join strategies

Re: External Data Source in Spark

2015-03-05 Thread Michael Armbrust
One other caveat: While writing up this example I realized that we make
SparkPlan private and we are already packaging 1.3-RC3... So you'll need a
custom build of Spark for this to run. We'll fix this in the next release.

On Thu, Mar 5, 2015 at 5:26 PM, Michael Armbrust

 Currently we have implemented  External Data Source API and are able to
 push filters and projections.

 Could you provide some info on how perhaps the joins could be pushed to
 the original Data Source if both the data sources are from same database

 First a disclaimer: This is an experimental API that exposes internals
 that are likely to change in between different Spark releases.  As a
 result, most datasources should be written against the stable public API in
 We expose this mostly to get feedback on what optimizations we should add
 to the stable API in order to get the best performance out of data sources.

 We'll start with a simple artificial data source that just returns ranges
 of consecutive integers.

 /** A data source that returns ranges of consecutive integers in a column 
 named `a`. */case class SimpleRelation(
 start: Int,
 end: Int)(
 @transient val sqlContext: SQLContext)
   extends BaseRelation with TableScan {

   val schema = StructType(' :: Nil)
   def buildScan() = sqlContext.sparkContext.parallelize(start to 

 Given this we can create tables:


 However, doing a join is pretty slow since we need to shuffle the big
 table around for no reason:

 sql(SELECT * FROM smallTable s JOIN bigTable b ON s.a = b.a).collect()
 res3: Array[org.apache.spark.sql.Row] = Array([1,1])

 This takes about 10 seconds on my cluster.  Clearly we can do better.  So
 let's define special physical operators for the case when we are inner
 joining two of these relations using equality. One will handle the case
 when there is no overlap and the other when there is.  Physical operators
 must extend SparkPlan and must return an RDD[Row] containing the answer
 when execute() is called.

 import org.apache.spark.sql.catalyst.expressions.{Attribute, EqualTo}import 
 /** A join that just returns the pre-calculated overlap of two ranges of 
 consecutive integers. */case class OverlappingRangeJoin(leftOutput: 
 Attribute, rightOutput: Attribute, start: Int, end: Int) extends SparkPlan {
   def output: Seq[Attribute] = leftOutput :: rightOutput :: Nil

   def execute(): org.apache.spark.rdd.RDD[Row] = {
 sqlContext.sparkContext.parallelize(start to end).map(i = Row(i, i))

   def children: Seq[SparkPlan] = Nil
 /** Used when a join is known to produce no results. */case class 
 EmptyJoin(output: Seq[Attribute]) extends SparkPlan {
   def execute(): org.apache.spark.rdd.RDD[Row] = {

   def children: Seq[SparkPlan] = Nil
 /** Finds cases where two sets of consecutive integer ranges are inner joined 
 on equality. */object SmartSimpleJoin extends Strategy with Serializable {
   def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
 // Find inner joins between two SimpleRelations where the condition is 
 case Join(l @ LogicalRelation(left: SimpleRelation), r @ 
 LogicalRelation(right: SimpleRelation), Inner, Some(EqualTo(a, b))) =
   // Check if the join condition is comparing `a` from each relation.
   if (a == l.output.head  b == r.output.head || a == r.output.head  b 
 == l.output.head) {
 if ((left.start = right.end)  (left.end = right.start)) {
 math.max(left.start, right.start),
 math.min(left.end, right.end)) :: Nil
 } else {
   // Ranges don't overlap, join will be empty
   EmptyJoin(l.output.head :: r.output.head :: Nil) :: Nil
   } else {
 // Join isn't between the the columns output...
 // Let's just let the query planner handle this.
 case _ = Nil // Return an empty list if we don't know how to handle this 

 We can then add these strategies to the query planner through the
 experimental hook.  Added strategies take precedence over built-in ones.

 // Add the strategy to the query planner.
 sqlContext.experimental.extraStrategies = SmartSimpleJoin :: Nil

 sql(SELECT * FROM smallTable s JOIN bigTable b ON s.a = b.a).collect()
 res4: Array[org.apache.spark.sql.Row] = 

Re: External Data Source in Spark

2015-03-02 Thread Akhil Das
Wouldn't it be possible with .saveAsNewHadoopAPIFile? How are you pushing
the filters and projections currently?

Best Regards

On Tue, Mar 3, 2015 at 1:11 AM, Addanki, Santosh Kumar wrote:

  Hi Colleagues,

 Currently we have implemented  External Data Source API and are able to
 push filters and projections.

 Could you provide some info on how perhaps the joins could be pushed to
 the original Data Source if both the data sources are from same database

 Briefly looked at DataSourceStrategy.scala but could not get far

 Best Regards


External Data Source in Spark

2015-03-02 Thread Addanki, Santosh Kumar
Hi Colleagues,

Currently we have implemented  External Data Source API and are able to push 
filters and projections.

Could you provide some info on how perhaps the joins could be pushed to the 
original Data Source if both the data sources are from same database

Briefly looked at DataSourceStrategy.scala but could not get far

Best Regards

External Data Source in SPARK

2015-02-09 Thread Addanki, Santosh Kumar

We implemented an External Data Source by extending the TableScan . We added 
the classes to the classpath
The data source works fine when run in Spark Shell .

But currently we are unable to use this same data source in Python Environment. 
So when we execute the following below in an Ipython notebook

sqlContext.sql(CREATE TEMPORARY TABLE dataTable USING  MyDataSource OPTIONS 
(partitions '2')) we get the following error :

Py4JJavaError: An error occurred while calling o78.sql.
: java.lang.RuntimeException: Failed to load class for data source: MyDataSource

How to expose this data source for consumption even in PySpark environment.
