Re: CATALYST rule join

2018-02-27 Thread tan shai
 Hi,

I need to write a rule to customize the join function using Spark Catalyst
optimizer. The objective to duplicate the second dataset using this
process:

- Execute a udf on the column called x, this udf returns an array

- Execute an explode function on the new column

Using SQL terms, my objective is to execute this query on the second table :

SELECT EXPLODE(foo(x)) from table2

Where `foo` is is a udf that return an array of elements.

I have this rule:

case class JoinRule(spark: SparkSession) extends Rule[LogicalPlan] {

override def apply(plan: LogicalPlan): LogicalPlan = plan transform {

case join@Join(left, right, _, Some(condition)) =>

{

val attr = right.outputSet.find(x => x.toString().contains("x"))

val udf = ScalaUDF((x: Long) => x +: f(ipix), ArrayType(LongType),
Seq(attr.last.toAttribute))

val explode = Explode(udf)

val resolvedGenerator = Generate(explode, true,false, qualifier = None,
udf.references.toSeq, right)

var newRight = Project(resolvedGenerator.output,resolvedGenerator)

Join(left, newRight , Inner,Option(condition))

}
  }
}

But the problem is that the operation `Generate explode` appears many times
in the physical plan.


Do you have any other ideas ? Maybe rewriting the code.

Thank you


2018-02-25 23:08 GMT+01:00 tan shai <tan.shai...@gmail.com>:

> Hi,
>
> I need to write a rule to customize the join function using Spark Catalyst
> optimizer. The objective to duplicate the second dataset using this
> process:
>
> - Execute a udf on the column called x, this udf returns an array
>
> - Execute an explode function on the new column
>
> Using SQL terms, my objective is to execute this query on the second table
> :
>
> SELECT EXPLODE(foo(x)) from table2
>
> Where `foo` is is a udf that return an array of elements.
>
> I have this rule:
>
> case class JoinRule(spark: SparkSession) extends Rule[LogicalPlan] {
>
> override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
>
> case join@Join(left, right, _, Some(condition)) =>
>
> {
>
> val attr = right.outputSet.find(x => x.toString().contains("x"))
>
> val udf = ScalaUDF((x: Long) => x +: f(ipix), ArrayType(LongType),
> Seq(attr.last.toAttribute))
>
> val explode = Explode(udf)
>
> val resolvedGenerator = Generate(explode, true,false, qualifier =
> None, udf.references.toSeq, right)
>
> var newRight = Project(resolvedGenerator.output,resolvedGenerator)
>
> Join(left, newRight , Inner,Option(condition))
>
> }
>   }
> }
>
> But the problem is that the operation `Generate explode` appears many
> times in the physical plan.
>
>
> Do you have any other ideas ? Maybe rewriting the code.
>
> Thank you.
>
>


CATALYST rule join

2018-02-25 Thread tan shai
Hi,

I need to write a rule to customize the join function using Spark Catalyst
optimizer. The objective to duplicate the second dataset using this
process:

- Execute a udf on the column called x, this udf returns an array

- Execute an explode function on the new column

Using SQL terms, my objective is to execute this query on the second table :

SELECT EXPLODE(foo(x)) from table2

Where `foo` is is a udf that return an array of elements.

I have this rule:

case class JoinRule(spark: SparkSession) extends Rule[LogicalPlan] {

override def apply(plan: LogicalPlan): LogicalPlan = plan transform {

case join@Join(left, right, _, Some(condition)) =>

{

val attr = right.outputSet.find(x => x.toString().contains("x"))

val udf = ScalaUDF((x: Long) => x +: f(ipix), ArrayType(LongType),
Seq(attr.last.toAttribute))

val explode = Explode(udf)

val resolvedGenerator = Generate(explode, true,false, qualifier = None,
udf.references.toSeq, right)

var newRight = Project(resolvedGenerator.output,resolvedGenerator)

Join(left, newRight , Inner,Option(condition))

}
  }
}

But the problem is that the operation `Generate explode` appears many times
in the physical plan.


Do you have any other ideas ? Maybe rewriting the code.

Thank you.


Tuning Spark memory

2016-09-23 Thread tan shai
Hi,

I am working with Spark 2.0, the job starts by sorting the input data and
storing the output on HDFS.

I am getting Out of memory errors, the solution was to increase the value
of spark.shuffle.memoryFraction from 0.2 to 0.8 and this solves the
problem. But in the documentation I have found that this is a deprecated
parameter.

As I have understand, It was replaced by spark.memory.fraction. How to
modify this parameter while taking into account the sort and storage on
HDFS?

Thanks.


Total memory of workers

2016-09-06 Thread tan shai
Hello,

Can anyone explain to me the behavior of spark if the size of the processed
file is greater than the total memory available on workers?

Many thanks.


RangePartitioning

2016-07-08 Thread tan shai
Hi,

Can any one explain to me the class RangePartitioning "
https://github.com/apache/spark/blob/d5911d1173fe0872f21cae6c47abf8ff479345a4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
"

case class RangePartitioning(ordering: Seq[SortOrder], numPartitions: Int)
extends Expression with Partitioning with Unevaluable {
override def children: Seq[SortOrder] = ordering
override def nullable: Boolean = false
override def dataType: DataType = IntegerType
override def satisfies(required: Distribution): Boolean = required match {
case UnspecifiedDistribution => true
case OrderedDistribution(requiredOrdering) =>
val minSize = Seq(requiredOrdering.size, ordering.size).min
requiredOrdering.take(minSize) == ordering.take(minSize)
case ClusteredDistribution(requiredClustering) =>
ordering.map(_.child).forall(x =>
requiredClustering.exists(_.semanticEquals(x)))
case _ => false
}
override def compatibleWith(other: Partitioning): Boolean = other match {
case o: RangePartitioning => this.semanticEquals(o)
case _ => false
}
override def guarantees(other: Partitioning): Boolean = other match {
case o: RangePartitioning => this.semanticEquals(o)
case _ => false
}
}


[no subject]

2016-07-08 Thread tan shai
Hi,

Can any one explain to me the class RangePartitioning "
https://github.com/apache/spark/blob/d5911d1173fe0872f21cae6c47abf8ff479345a4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
"

case class RangePartitioning(ordering: Seq[SortOrder], numPartitions: Int)
extends Expression with Partitioning with Unevaluable {
override def children: Seq[SortOrder] = ordering
override def nullable: Boolean = false
override def dataType: DataType = IntegerType
override def satisfies(required: Distribution): Boolean = required match {
case UnspecifiedDistribution => true
case OrderedDistribution(requiredOrdering) =>
val minSize = Seq(requiredOrdering.size, ordering.size).min
requiredOrdering.take(minSize) == ordering.take(minSize)
case ClusteredDistribution(requiredClustering) =>
ordering.map(_.child).forall(x =>
requiredClustering.exists(_.semanticEquals(x)))
case _ => false
}
override def compatibleWith(other: Partitioning): Boolean = other match {
case o: RangePartitioning => this.semanticEquals(o)
case _ => false
}
override def guarantees(other: Partitioning): Boolean = other match {
case o: RangePartitioning => this.semanticEquals(o)
case _ => false
}
}


Re: Extend Dataframe API

2016-07-07 Thread tan shai
That was what I am thinking to do.

Do you have any idea about this? Or any documentations?

Many thanks.

2016-07-07 17:07 GMT+02:00 Koert Kuipers <ko...@tresata.com>:

> i dont see any easy way to extend the plans, beyond creating a custom
> version of spark.
>
> On Thu, Jul 7, 2016 at 9:31 AM, tan shai <tan.shai...@gmail.com> wrote:
>
>> Hi,
>>
>> I need to add new operations to the dataframe API.
>> Can any one explain to me how to extend the plans of query execution?
>>
>> Many thanks.
>>
>
>


Re: Question regarding structured data and partitions

2016-07-07 Thread tan shai
Thank you for your answer.

Since Spark 1.6.0, it is possible to partition a dataframe using hash
partitioning with Repartition "
https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrame
"
I have also sorted a dataframe and it using a range partitioning in the
physical plan.

So, I need to retrieve partition informations obtained with the sorted
function.

Any ideas??

2016-07-07 16:59 GMT+02:00 Koert Kuipers <ko...@tresata.com>:

> since dataframes represent more or less a plan of execution, they do not
> have partition information as such i think?
> you could however do dataFrame.rdd, to force it to create a physical plan
> that results in an actual rdd, and then query the rdd for partition info.
>
> On Thu, Jul 7, 2016 at 4:24 AM, tan shai <tan.shai...@gmail.com> wrote:
>
>> Using partitioning with dataframes, how can we retrieve informations
>> about partitions? partitions bounds for example
>>
>> Thanks,
>> Shaira
>>
>> 2016-07-07 6:30 GMT+02:00 Koert Kuipers <ko...@tresata.com>:
>>
>>> spark does keep some information on the partitions of an RDD, namely the
>>> partitioning/partitioner.
>>>
>>> GroupSorted is an extension for key-value RDDs that also keeps track of
>>> the ordering, allowing for faster joins, non-reduce type operations on very
>>> large groups of values per key, etc.
>>> see here:
>>> https://github.com/tresata/spark-sorted
>>> however no support for streaming (yet)...
>>>
>>>
>>> On Wed, Jul 6, 2016 at 11:55 PM, Omid Alipourfard <ecyn...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> Why doesn't Spark keep information about the structure of the RDDs or
>>>> the partitions within RDDs?   Say that I use
>>>> repartitionAndSortWithinPartitions, which results in sorted
>>>> partitions.  With sorted partitions, lookups should be super fast (binary
>>>> search?), yet I still need to go through the whole partition to perform a
>>>> lookup -- using say, filter.
>>>>
>>>> To give more context into a use case, let me give a very simple example
>>>> where having this feature seems extremely useful: consider that you have a
>>>> stream of incoming keys, where for each key you need to lookup the
>>>> associated value in a large RDD and perform operations on the values.
>>>> Right now, performing a join between the RDDs in the DStream and the large
>>>> RDD seems to be the way to go.  I.e.:
>>>>
>>>> incomingData.transform { rdd => largeRdd.join(rdd) }
>>>>   .map(performAdditionalOperations).save(...)
>>>>
>>>> Assuming that the largeRdd is sorted/or contains an index and each
>>>> window of incomingData is small, this join operation can be performed in 
>>>> *O(incomingData
>>>> * (log(largeRDD) | 1)).  *Yet, right now, I believe this operation is
>>>> much more expensive than that.
>>>>
>>>> I have just started using Spark, so it's highly likely that I am using
>>>> it wrong.  So any thoughts are appreciated!
>>>>
>>>> TL;DR.  Why not keep an index/info with each partition or RDD to speed
>>>> up operations such as lookups filters, etc.?
>>>>
>>>> Thanks,
>>>> Omid
>>>>
>>>
>>>
>>
>


Extend Dataframe API

2016-07-07 Thread tan shai
Hi,

I need to add new operations to the dataframe API.
Can any one explain to me how to extend the plans of query execution?

Many thanks.


Re: Optimize filter operations with sorted data

2016-07-07 Thread tan shai
How can you verify that it is loading only the part of time and network in
filter ?

2016-07-07 11:58 GMT+02:00 Chanh Le <giaosu...@gmail.com>:

> Hi Tan,
> It depends on how data organise and what your filter is.
> For example in my case: I store data by partition by field time and
> network_id. If I filter by time or network_id or both and with other field
> Spark only load part of time and network in filter then filter the rest.
>
>
>
> > On Jul 7, 2016, at 4:43 PM, Ted Yu <yuzhih...@gmail.com> wrote:
> >
> > Does the filter under consideration operate on sorted column(s) ?
> >
> > Cheers
> >
> >> On Jul 7, 2016, at 2:25 AM, tan shai <tan.shai...@gmail.com> wrote:
> >>
> >> Hi,
> >>
> >> I have a sorted dataframe, I need to optimize the filter operations.
> >> How does Spark performs filter operations on sorted dataframe?
> >>
> >> It is scanning all the data?
> >>
> >> Many thanks.
> >
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
>
>


Re: Optimize filter operations with sorted data

2016-07-07 Thread tan shai
Yes it is operating on the sorted column

2016-07-07 11:43 GMT+02:00 Ted Yu <yuzhih...@gmail.com>:

> Does the filter under consideration operate on sorted column(s) ?
>
> Cheers
>
> > On Jul 7, 2016, at 2:25 AM, tan shai <tan.shai...@gmail.com> wrote:
> >
> > Hi,
> >
> > I have a sorted dataframe, I need to optimize the filter operations.
> > How does Spark performs filter operations on sorted dataframe?
> >
> > It is scanning all the data?
> >
> > Many thanks.
>


Optimize filter operations with sorted data

2016-07-07 Thread tan shai
Hi,

I have a sorted dataframe, I need to optimize the filter operations.
How does Spark performs filter operations on sorted dataframe?

It is scanning all the data?

Many thanks.


Re: Question regarding structured data and partitions

2016-07-07 Thread tan shai
Using partitioning with dataframes, how can we retrieve informations about
partitions? partitions bounds for example

Thanks,
Shaira

2016-07-07 6:30 GMT+02:00 Koert Kuipers :

> spark does keep some information on the partitions of an RDD, namely the
> partitioning/partitioner.
>
> GroupSorted is an extension for key-value RDDs that also keeps track of
> the ordering, allowing for faster joins, non-reduce type operations on very
> large groups of values per key, etc.
> see here:
> https://github.com/tresata/spark-sorted
> however no support for streaming (yet)...
>
>
> On Wed, Jul 6, 2016 at 11:55 PM, Omid Alipourfard 
> wrote:
>
>> Hi,
>>
>> Why doesn't Spark keep information about the structure of the RDDs or the
>> partitions within RDDs?   Say that I use
>> repartitionAndSortWithinPartitions, which results in sorted partitions.
>> With sorted partitions, lookups should be super fast (binary search?), yet
>> I still need to go through the whole partition to perform a lookup -- using
>> say, filter.
>>
>> To give more context into a use case, let me give a very simple example
>> where having this feature seems extremely useful: consider that you have a
>> stream of incoming keys, where for each key you need to lookup the
>> associated value in a large RDD and perform operations on the values.
>> Right now, performing a join between the RDDs in the DStream and the large
>> RDD seems to be the way to go.  I.e.:
>>
>> incomingData.transform { rdd => largeRdd.join(rdd) }
>>   .map(performAdditionalOperations).save(...)
>>
>> Assuming that the largeRdd is sorted/or contains an index and each window
>> of incomingData is small, this join operation can be performed in 
>> *O(incomingData
>> * (log(largeRDD) | 1)).  *Yet, right now, I believe this operation is
>> much more expensive than that.
>>
>> I have just started using Spark, so it's highly likely that I am using it
>> wrong.  So any thoughts are appreciated!
>>
>> TL;DR.  Why not keep an index/info with each partition or RDD to speed up
>> operations such as lookups filters, etc.?
>>
>> Thanks,
>> Omid
>>
>
>


Dataframe sort

2016-07-05 Thread tan shai
Hi,

I need to sort a dataframe and retrive the bounds of each partition.
The dataframe.sort() is using the range partitioning in the physical plan.

I need to retrieve partition bounds.

Many thanks for your help.