Ok, that sounds like it confirms my expectations.

So I tried running my above code and had to slightly edit to using java
Tuple2 because our execution environment stuff is all in Java.

class CompactionAggregate
extends AggregateFunction[
Tuple2[java.lang.Boolean, Row],
Tuple2[java.lang.Boolean, Row],
Tuple2[java.lang.Boolean, Row]
] {

override def createAccumulator() = new Tuple2(false, null)

// Just take the lastest value to compact.
override def add(
value: Tuple2[java.lang.Boolean, Row],
accumulator: Tuple2[java.lang.Boolean, Row]
) =
value

override def getResult(accumulator: Tuple2[java.lang.Boolean, Row]) =
accumulator

// This is a required function that we don't use.
override def merge(
a: Tuple2[java.lang.Boolean, Row],
b: Tuple2[java.lang.Boolean, Row]
) =
throw new NotImplementedException()
}

But when running I get the following error:
>Caused by: java.lang.RuntimeException: Could not extract key from
[redacted row]
>...
> Caused by: org.apache.flink.table.api.ValidationException: Unsupported
kind 'DELETE' of a row [redacted row]. Only rows with 'INSERT' kind are
supported when converting to an expression.

I'm googling around and haven't found anything informative about what might
be causing this issue. Any ideas?

I'll also take a look at the SQL functions you suggested and see if I can
use those.

Thanks!



On Wed, Jan 27, 2021 at 11:48 PM Arvid Heise <ar...@apache.org> wrote:

> Hi Rex,
>
> if your keyby (and with join/grouping/windowing) is random or not depends
> on the relationship of the join/grouping key with your Kafka partitioning
> key.
>
> Say your partitioning key is document_id. Then, any join/grouping key that
> is composed of (or is exactly) document_id, will retain the order. You
> should always ask yourself the question: can two records coming from the
> ordered Kafka partition X be processed by two different operator instances.
> For a join/grouping operator, there is only the strict guarantee that all
> records with the same key will be shuffled into the same operator instance.
>
> Your compaction in general looks good but I'm not deep into Table API. I'm
> quite sure that *FIRST_VALUE* and *LAST_VALUE* functions in Table API
> should already do what you want. [1]
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/functions/systemFunctions.html#aggregate-functions
>
> On Thu, Jan 28, 2021 at 6:45 AM Rex Fenley <r...@remind101.com> wrote:
>
>> In addition to those questions, assuming that keyed streams are in order,
>> I've come up with the following solution to compact our records and only
>> pick the most recent one per id before sending to the ES sink.
>>
>> The first item in the Row is the document ID / primary key which we want
>> to compact records on.
>>
>> val userDocsStream = userDocsTable.toRetractStream[Row].keyBy(_._2.get(0))
>>     userDocsStream
>>       .window(TumblingEventTimeWindows.of(Time.seconds(1)))
>>       .aggregate(new CompactionAggregate())class CompactionAggregate
>>     extends AggregateFunction[
>>       (Boolean, Row),
>>       (Boolean, Row),
>>       (Boolean, Row)
>>     ] {  override def createAccumulator() = (false, null)  // Just take the 
>> latest value to compact.
>>   override def add(value: (Boolean, Row), accumulator: (Boolean, Row)) =
>>     value  override def getResult(accumulator: (Boolean, Row)) = accumulator 
>>  // This is a required function that we don't use.
>>   override def merge(a: (Boolean, Row), b: (Boolean, Row)) =
>>     throw new NotImplementedException()
>> }
>>
>> I'm hoping that if the last record in the window is an insert it picks
>> that if it's a retract then it picks that and then when we send this to the
>> ES sink we will simply check true or false in the first element of the
>> tuple for an insert or delete request to ES. Does this seem like it will
>> work?
>>
>> Thanks!
>>
>>
>> On Wed, Jan 27, 2021 at 12:45 PM Rex Fenley <r...@remind101.com> wrote:
>>
>>> This is great info, thanks!
>>>
>>> My question then becomes, what constitutes a random shuffle? Currently
>>> we're using the Table API with minibatch on flink v1.11.3. Do our joins
>>> output a keyed stream of records by join key or is this random? I imagine
>>> that they'd have to have a key for retracts and accumulates to arrive in
>>> order on the next downstream operator. Same with aggs but on the groupBy
>>> key.
>>>
>>> Does this sound correct to you?
>>>
>>> Thanks!
>>>
>>> On Wed, Jan 27, 2021 at 11:39 AM Arvid Heise <ar...@apache.org> wrote:
>>>
>>>> Hi Rex,
>>>>
>>>> indeed these two statements look like they contradict each other, but
>>>> they are looking at both sides from the same coin.
>>>> Flink is simply putting records in FIFO in windows. That is, there is
>>>> no ordering on event time if there are late events. So if your elements
>>>> arrive ordered, the ordering is retained. If your elements arrive
>>>> unordered, the same unordered order is retained.
>>>>
>>>> However, note that Flink can only guarantee FIFO according to your
>>>> topology. Consider a source with parallelism 2, each reading data from an
>>>> ordered kafka partition (k1, k2) respectively. Each partition has records
>>>> with keys, such that no key appears in both partitions (default behavior if
>>>> you set keys but no partition while writing to Kafka).
>>>> 1) Let's assume you do a simple transformation and write them back into
>>>> kafka with the same key. Then you can be sure that the order of the records
>>>> is retained.
>>>>
>>>> 2) Now you add a random shuffle and have the transformation. Now two
>>>> successive records may be processed in parallel and there is a race
>>>> condition who is written first into Kafka. So order is not retained.
>>>>
>>>> 3) You shuffle both partitions by the Kafka key (keyby) and do some
>>>> aggregation. Two successive records with the same key will always be
>>>> processed by the same aggregation operator. So the order is retained for
>>>> each key (note that this is what you usually want and want Kafka gives you
>>>> if you don't set the partition explicitly and just provide a key)
>>>>
>>>> 4) You shuffle both partitions by a different key. Then two successive
>>>> Kafka records could be again calculated in parallel such that there is a
>>>> race condition.
>>>>
>>>> Note that windows are a kind of aggregation.
>>>>
>>>> So Flink is never going to restore an ordering that is not there
>>>> (because it's too costly and there are too many unknowns). But you can
>>>> infer the guarantees by analyzing your topology.
>>>>
>>>> ---
>>>>
>>>> Please note that there is a common pitfall when you work with Kafka:
>>>> - Ordering of records in Kafka is only guaranteed if you set 
>>>> *max.in.flight.requests.per.connection
>>>> *to 1
>>>> *. [1]*
>>>> *- *Often you also want to set *enable.idempotence* and *acks=all*
>>>>
>>>> That is true for the upstream application and if you plan back to write
>>>> to Kafka you also need to set that in Flink.
>>>>
>>>> [1]
>>>> https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html
>>>>
>>>> On Wed, Jan 27, 2021 at 6:08 AM Rex Fenley <r...@remind101.com> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> I began reading
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/concepts/concepts.html#parallel-dataflows
>>>>>
>>>>>    -
>>>>>
>>>>>    *> Redistributing* streams (between *map()* and *keyBy/window*, as
>>>>>    well as between *keyBy/window* and *sink*) change the partitioning
>>>>>    of streams. Each *operator subtask* sends data to different target
>>>>>    subtasks, depending on the selected transformation. Examples are
>>>>>    *keyBy()* (re-partitions by hash code), *broadcast()*, or
>>>>>    *rebalance()* (random redistribution). In a *redistributing*
>>>>>    exchange, order among elements is only preserved for each pair of 
>>>>> sending-
>>>>>    and receiving task (for example subtask[1] of *map()* and
>>>>>    subtask[2] of *keyBy/window*).
>>>>>
>>>>> This makes it sounds like ordering on the same partition/key is always
>>>>> maintained. Which is exactly the ordering guarantee that I need. This 
>>>>> seems
>>>>> to slightly contradict the statement "Flink provides no guarantees about
>>>>> the order of the elements within a window" for keyed state. So is it true
>>>>> that ordering _is_ guaranteed for identical keys?
>>>>>
>>>>> If I'm not mistaken, the state in the TableAPI is always considered
>>>>> keyed state for a join or aggregate. Or am I missing something?
>>>>>
>>>>> Thanks!
>>>>>
>>>>> On Tue, Jan 26, 2021 at 8:53 PM Rex Fenley <r...@remind101.com> wrote:
>>>>>
>>>>>> Our data arrives in order from Kafka, so we are hoping to use that
>>>>>> same order for our processing.
>>>>>>
>>>>>> On Tue, Jan 26, 2021 at 8:40 PM Rex Fenley <r...@remind101.com> wrote:
>>>>>>
>>>>>>> Going further, if "Flink provides no guarantees about the order of
>>>>>>> the elements within a window" then with minibatch, which I assume uses a
>>>>>>> window under the hood, any aggregates that expect rows to arrive in 
>>>>>>> order
>>>>>>> will fail to keep their consistency. Is this correct?
>>>>>>>
>>>>>>> On Tue, Jan 26, 2021 at 5:36 PM Rex Fenley <r...@remind101.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hello,
>>>>>>>>
>>>>>>>> We have a job from CDC to a large unbounded Flink plan to
>>>>>>>> Elasticsearch.
>>>>>>>>
>>>>>>>> Currently, we have been relentlessly trying to reduce our record
>>>>>>>> amplification which, when our Elasticsearch index is near fully 
>>>>>>>> populated,
>>>>>>>> completely bottlenecks our write performance. We decided recently to 
>>>>>>>> try a
>>>>>>>> new job using mini-batch. At first this seemed promising but at some 
>>>>>>>> point
>>>>>>>> we began getting huge record amplification in a join operator. It 
>>>>>>>> appears
>>>>>>>> that minibatch may only batch on aggregate operators?
>>>>>>>>
>>>>>>>> So we're now thinking that we should have a window before our ES
>>>>>>>> sink which only takes the last record for any unique document id in the
>>>>>>>> window, since that's all we really want to send anyway. However, when
>>>>>>>> investigating turning a table, to a keyed window stream for deduping, 
>>>>>>>> and
>>>>>>>> then back into a table I read the following:
>>>>>>>>
>>>>>>>> >Attention Flink provides no guarantees about the order of the
>>>>>>>> elements within a window. This implies that although an evictor may 
>>>>>>>> remove
>>>>>>>> elements from the beginning of the window, these are not necessarily 
>>>>>>>> the
>>>>>>>> ones that arrive first or last. [1]
>>>>>>>>
>>>>>>>> which has put a damper on our investigation.
>>>>>>>>
>>>>>>>> I then found the deduplication SQL doc [2], but I have a hard time
>>>>>>>> parsing what the SQL does and we've never used TemporaryViews or 
>>>>>>>> proctime
>>>>>>>> before.
>>>>>>>> Is this essentially what we want?
>>>>>>>> Will just using this SQL be safe for a job that is unbounded and
>>>>>>>> just wants to deduplicate a document write to whatever the most 
>>>>>>>> current one
>>>>>>>> is (i.e. will restoring from a checkpoint maintain our unbounded
>>>>>>>> consistency and will deletes work)?
>>>>>>>>
>>>>>>>> [1]
>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html
>>>>>>>> [2]
>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#deduplication
>>>>>>>>
>>>>>>>> Thanks!
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>>
>>>>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>>>>
>>>>>>>>
>>>>>>>> Remind.com <https://www.remind.com/> |  BLOG
>>>>>>>> <http://blog.remind.com/>  |  FOLLOW US
>>>>>>>> <https://twitter.com/remindhq>  |  LIKE US
>>>>>>>> <https://www.facebook.com/remindhq>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>>
>>>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>>>
>>>>>>>
>>>>>>> Remind.com <https://www.remind.com/> |  BLOG
>>>>>>> <http://blog.remind.com/>  |  FOLLOW US
>>>>>>> <https://twitter.com/remindhq>  |  LIKE US
>>>>>>> <https://www.facebook.com/remindhq>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>>
>>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>>
>>>>>>
>>>>>> Remind.com <https://www.remind.com/> |  BLOG
>>>>>> <http://blog.remind.com/>  |  FOLLOW US
>>>>>> <https://twitter.com/remindhq>  |  LIKE US
>>>>>> <https://www.facebook.com/remindhq>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>
>>>>>
>>>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>>>> <https://www.facebook.com/remindhq>
>>>>>
>>>>
>>>
>>> --
>>>
>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>
>>>
>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>> <https://www.facebook.com/remindhq>
>>>
>>
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>> <https://www.facebook.com/remindhq>
>>
>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>

Reply via email to