Hi Ken,

Thanks for the suggestion, that idea should also work for implementing a
data set difference operation, which is what concerns me here. However, I
was also curious about why there is so much performance difference between
using sortPartition and sorting in memory by partition, for datasets as
small as 20 elements and running in local mode. For that data set sizes I
would expect no relevant performance difference, but with sortPartition the
program crashes, so I must be doing something wrong here.

Thanks in any case for the idea.

Greetings,

Juan

On Mon, Jul 22, 2019 at 8:49 AM Ken Krugler <kkrugler_li...@transpac.com>
wrote:

> Hi Juan,
>
> If you want to deduplicate, then you could group by the record, and use a
> (very simple) reduce function to only emit a record if the group contains
> one element.
>
> There will be performance issues, though - Flink will have to generate all
> groups first, which typically means spilling to disk if the data set has
> any significant size.
>
> — Ken
>
> PS - I assume that you’ve implemented a valid hashCode()/equals() for the
> record.
>
>
> On Jul 22, 2019, at 8:29 AM, Juan Rodríguez Hortalá <
> juan.rodriguez.hort...@gmail.com> wrote:
>
> Hi,
>
> I've been trying to write a function to compute the difference between 2
> datasets. With that I mean computing a dataset that has all the elements of
> a dataset that are not present in another dataset. I first tried using
> coCogroup, but it was very slow in a local execution environment, and often
> was crashing with OOM. Then I tried with leftOuterJoin and got similar
> results. I then tried the following:
>
> private[this] def minussWithSortPartition(other: DataSet[T]): DataSet[T] = {
>   val selfMarked: DataSet[(T, Boolean)] = self.map((_, true))
>   val otherMarked: DataSet[(T, Boolean)] = other.map((_, false))
>
>   val all = selfMarked.union(otherMarked)
>     .partitionByHash(0) // so occurrences of the same value in both datasets 
> go to the same partition
>     .sortPartition[(T, Boolean)](identity, Order.ASCENDING)
>   all.mapPartition[T] { (partitionIter: Iterator[(T, Boolean)], collector: 
> Collector[T]) =>
>     var latestOtherOpt: Option[T] = None
>     partitionIter.foreach {
>       case (otherElem, false) => latestOtherOpt = Some(otherElem)
>       case (selfElem, true) =>
>         if (latestOtherOpt != Some(selfElem)) collector.collect(selfElem)
>     }
>   }
> }
>
>
> This is basically the idea of removing duplicates in a collection by first
> sorting it, and then traversing it from beginning to end, removing the
> elements that are consecutive to an element we just saw. That is extended
> here to mark whether an element is coming from `self` or from `other`,
> keeping only elements from `self` that are not following another occurrence
> of the same element in `other`. That code is also really slow on a local
> execution environment, and crashes a lot. But when I replace
> `sortPartition` by sorting each partition in memory inside a mapPartition,
> it works ok with the local execution environment.
>
> private[this] def minusWithInMemoryPartition(other: DataSet[T]): DataSet[T] = 
> {
>   val selfMarked: DataSet[(T, Boolean)] = self.map((_, true))
>   val otherMarked: DataSet[(T, Boolean)] = other.map((_, false))
>   val all = selfMarked.union(otherMarked)
>     .partitionByHash(0) // so occurrences of the same value in both datasets 
> go to the same partition
>   all.mapPartition[T] { (partitionIter: Iterator[(T, Boolean)], collector: 
> Collector[T]) =>
>     val sortedPartition = {
>       val partition = partitionIter.toArray
>       util.Sorting.quickSort(partition)
>       partition
>     }
>     var latestOtherOpt: Option[T] = None
>     sortedPartition.foreach {
>       case (otherElem, false) => latestOtherOpt = Some(otherElem)
>       case (selfElem, true) =>
>         if (latestOtherOpt != Some(selfElem)) collector.collect(selfElem)
>     }
>   }
> }
>
>
> I'm surprised by such a big difference. This is my code
> <https://github.com/demiourgoi/flink-check/blob/master/flink-check/src/main/scala/es/ucm/fdi/sscheck/matcher/specs2/flink/package.scala#L16>,
> and a test
> <https://github.com/demiourgoi/flink-check/blob/master/flink-check/src/test/scala/es/ucm/fdi/sscheck/matcher/specs2/flink/FlinkMatchersSpec.scala#L69>
> I use for running this. I'm very surprised with these performance issues
> with such small DataSet sizes, with less than 20 elements. Is this because
> I'm running the program with a local execution environment?, are operations
> like coGroup, leftOuterJoin or sorPartition implemented inefficiently in
> the local environment? If that is the case, is there any other alternative
> environment recommended for development in a single machine, where I won't
> be experiencing these issues with those operations? Should I expect the
> function `minussWithSortPartition` above to run efficiently on a cluster?
> Or maybe there is something wrong with my code? Are there any plans to
> provide a built-in minus operator in future versions of Flink?
>
> Thanks,
>
> Juan
>
>
>
> --------------------------
> Ken Krugler
> +1 530-210-6378
> http://www.scaleunlimited.com
> Custom big data solutions & training
> Flink, Solr, Hadoop, Cascading & Cassandra
>
>

Reply via email to