Btw. there is a set difference or minus operator in the Table API [1] that
might be helpful.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/tableApi.html#set-operations

Am Fr., 20. Sept. 2019 um 15:30 Uhr schrieb Fabian Hueske <fhue...@gmail.com
>:

> Hi Juan,
>
> Both, the local execution environment and the remote execution environment
> run the same code to execute the program.
> The implementation of the sortPartition operator was designed to scale to
> data sizes that exceed the memory.
> Internally, it serializes all records into byte arrays and sorts the
> serialized data. This is of course more expensive than keeping all objects
> on the heap and sorting them there.
> Hence, a certain performance difference is to be expected. However,
> something that should not happen is that the program fails.
>
> What's the magnitude of the performance difference?
> Can you post a stack trace of the error?
>
> Thanks,
> Fabian
>
> Am Mo., 16. Sept. 2019 um 13:51 Uhr schrieb Juan Rodríguez Hortalá <
> juan.rodriguez.hort...@gmail.com>:
>
>> Hi Ken,
>>
>> Thanks for the suggestion, that idea should also work for implementing a
>> data set difference operation, which is what concerns me here. However, I
>> was also curious about why there is so much performance difference between
>> using sortPartition and sorting in memory by partition, for datasets as
>> small as 20 elements and running in local mode. For that data set sizes I
>> would expect no relevant performance difference, but with sortPartition the
>> program crashes, so I must be doing something wrong here.
>>
>> Thanks in any case for the idea.
>>
>> Greetings,
>>
>> Juan
>>
>> On Mon, Jul 22, 2019 at 8:49 AM Ken Krugler <kkrugler_li...@transpac.com>
>> wrote:
>>
>>> Hi Juan,
>>>
>>> If you want to deduplicate, then you could group by the record, and use
>>> a (very simple) reduce function to only emit a record if the group contains
>>> one element.
>>>
>>> There will be performance issues, though - Flink will have to generate
>>> all groups first, which typically means spilling to disk if the data set
>>> has any significant size.
>>>
>>> — Ken
>>>
>>> PS - I assume that you’ve implemented a valid hashCode()/equals() for
>>> the record.
>>>
>>>
>>> On Jul 22, 2019, at 8:29 AM, Juan Rodríguez Hortalá <
>>> juan.rodriguez.hort...@gmail.com> wrote:
>>>
>>> Hi,
>>>
>>> I've been trying to write a function to compute the difference between 2
>>> datasets. With that I mean computing a dataset that has all the elements of
>>> a dataset that are not present in another dataset. I first tried using
>>> coCogroup, but it was very slow in a local execution environment, and often
>>> was crashing with OOM. Then I tried with leftOuterJoin and got similar
>>> results. I then tried the following:
>>>
>>> private[this] def minussWithSortPartition(other: DataSet[T]): DataSet[T] = {
>>>   val selfMarked: DataSet[(T, Boolean)] = self.map((_, true))
>>>   val otherMarked: DataSet[(T, Boolean)] = other.map((_, false))
>>>
>>>   val all = selfMarked.union(otherMarked)
>>>     .partitionByHash(0) // so occurrences of the same value in both 
>>> datasets go to the same partition
>>>     .sortPartition[(T, Boolean)](identity, Order.ASCENDING)
>>>   all.mapPartition[T] { (partitionIter: Iterator[(T, Boolean)], collector: 
>>> Collector[T]) =>
>>>     var latestOtherOpt: Option[T] = None
>>>     partitionIter.foreach {
>>>       case (otherElem, false) => latestOtherOpt = Some(otherElem)
>>>       case (selfElem, true) =>
>>>         if (latestOtherOpt != Some(selfElem)) collector.collect(selfElem)
>>>     }
>>>   }
>>> }
>>>
>>>
>>> This is basically the idea of removing duplicates in a collection by
>>> first sorting it, and then traversing it from beginning to end, removing
>>> the elements that are consecutive to an element we just saw. That is
>>> extended here to mark whether an element is coming from `self` or from
>>> `other`, keeping only elements from `self` that are not following another
>>> occurrence of the same element in `other`. That code is also really slow on
>>> a local execution environment, and crashes a lot. But when I replace
>>> `sortPartition` by sorting each partition in memory inside a mapPartition,
>>> it works ok with the local execution environment.
>>>
>>> private[this] def minusWithInMemoryPartition(other: DataSet[T]): DataSet[T] 
>>> = {
>>>   val selfMarked: DataSet[(T, Boolean)] = self.map((_, true))
>>>   val otherMarked: DataSet[(T, Boolean)] = other.map((_, false))
>>>   val all = selfMarked.union(otherMarked)
>>>     .partitionByHash(0) // so occurrences of the same value in both 
>>> datasets go to the same partition
>>>   all.mapPartition[T] { (partitionIter: Iterator[(T, Boolean)], collector: 
>>> Collector[T]) =>
>>>     val sortedPartition = {
>>>       val partition = partitionIter.toArray
>>>       util.Sorting.quickSort(partition)
>>>       partition
>>>     }
>>>     var latestOtherOpt: Option[T] = None
>>>     sortedPartition.foreach {
>>>       case (otherElem, false) => latestOtherOpt = Some(otherElem)
>>>       case (selfElem, true) =>
>>>         if (latestOtherOpt != Some(selfElem)) collector.collect(selfElem)
>>>     }
>>>   }
>>> }
>>>
>>>
>>> I'm surprised by such a big difference. This is my code
>>> <https://github.com/demiourgoi/flink-check/blob/master/flink-check/src/main/scala/es/ucm/fdi/sscheck/matcher/specs2/flink/package.scala#L16>,
>>> and a test
>>> <https://github.com/demiourgoi/flink-check/blob/master/flink-check/src/test/scala/es/ucm/fdi/sscheck/matcher/specs2/flink/FlinkMatchersSpec.scala#L69>
>>> I use for running this. I'm very surprised with these performance issues
>>> with such small DataSet sizes, with less than 20 elements. Is this because
>>> I'm running the program with a local execution environment?, are operations
>>> like coGroup, leftOuterJoin or sorPartition implemented inefficiently in
>>> the local environment? If that is the case, is there any other alternative
>>> environment recommended for development in a single machine, where I won't
>>> be experiencing these issues with those operations? Should I expect the
>>> function `minussWithSortPartition` above to run efficiently on a cluster?
>>> Or maybe there is something wrong with my code? Are there any plans to
>>> provide a built-in minus operator in future versions of Flink?
>>>
>>> Thanks,
>>>
>>> Juan
>>>
>>>
>>>
>>> --------------------------
>>> Ken Krugler
>>> +1 530-210-6378
>>> http://www.scaleunlimited.com
>>> Custom big data solutions & training
>>> Flink, Solr, Hadoop, Cascading & Cassandra
>>>
>>>

Reply via email to