Switching to TumblingProcessingTimeWindows seems to have solved that
problem.

For my own understanding, this won't have any "late" and therefore dropped
records right? We cannot blindly drop a record from the aggregate
evaluation, it just needs to take all the records it gets in a window and
process them and then the aggregate will take whatever is last in-order.

Thanks!

On Thu, Jan 28, 2021 at 1:01 PM Rex Fenley <r...@remind101.com> wrote:

> It looks like it wants me to call assignTimestampsAndWatermarks but I
> already have a timer on my window which I'd expect everything entering this
> stream would simply be aggregated during that window
> .window(TumblingEventTimeWindows.of(Time.seconds(1)))
>
> On Thu, Jan 28, 2021 at 12:59 PM Rex Fenley <r...@remind101.com> wrote:
>
>> I think I may have been affected by some late night programming.
>>
>> Slightly revised how I'm using my aggregate
>> val userDocsStream =
>> this.tableEnv
>> .toRetractStream(userDocsTable, classOf[Row])
>> .keyBy(_.f1.getField(0))
>> val compactedUserDocsStream = userDocsStream
>> .window(TumblingEventTimeWindows.of(Time.seconds(1)))
>> .aggregate(new CompactionAggregate())
>> but this now gives me the following exception:
>> java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no
>> timestamp marker). Is the time characteristic set to 'ProcessingTime', or
>> did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?
>>     at org.apache.flink.streaming.api.windowing.assigners.
>> TumblingEventTimeWindows.assignWindows(TumblingEventTimeWindows.java:69)
>>     at org.apache.flink.streaming.runtime.operators.windowing.
>> WindowOperator.processElement(WindowOperator.java:295)
>>     at org.apache.flink.streaming.runtime.tasks.
>> OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask
>> .java:161)
>>     at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
>> .processElement(StreamTaskNetworkInput.java:178)
>>     at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
>> .emitNext(StreamTaskNetworkInput.java:153)
>>     at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
>> .processInput(StreamOneInputProcessor.java:67)
>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
>> StreamTask.java:351)
>>     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
>> .runMailboxStep(MailboxProcessor.java:191)
>>     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
>> .runMailboxLoop(MailboxProcessor.java:181)
>>     at org.apache.flink.streaming.runtime.tasks.StreamTask
>> .runMailboxLoop(StreamTask.java:566)
>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>> StreamTask.java:536)
>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>>     at java.base/java.lang.Thread.run(Thread.java:829)
>>
>> Which I'm not at all sure how to interpret
>>
>> On Wed, Jan 27, 2021 at 11:57 PM Rex Fenley <r...@remind101.com> wrote:
>>
>>> Ok, that sounds like it confirms my expectations.
>>>
>>> So I tried running my above code and had to slightly edit to using java
>>> Tuple2 because our execution environment stuff is all in Java.
>>>
>>> class CompactionAggregate
>>> extends AggregateFunction[
>>> Tuple2[java.lang.Boolean, Row],
>>> Tuple2[java.lang.Boolean, Row],
>>> Tuple2[java.lang.Boolean, Row]
>>> ] {
>>>
>>> override def createAccumulator() = new Tuple2(false, null)
>>>
>>> // Just take the lastest value to compact.
>>> override def add(
>>> value: Tuple2[java.lang.Boolean, Row],
>>> accumulator: Tuple2[java.lang.Boolean, Row]
>>> ) =
>>> value
>>>
>>> override def getResult(accumulator: Tuple2[java.lang.Boolean, Row]) =
>>> accumulator
>>>
>>> // This is a required function that we don't use.
>>> override def merge(
>>> a: Tuple2[java.lang.Boolean, Row],
>>> b: Tuple2[java.lang.Boolean, Row]
>>> ) =
>>> throw new NotImplementedException()
>>> }
>>>
>>> But when running I get the following error:
>>> >Caused by: java.lang.RuntimeException: Could not extract key from
>>> [redacted row]
>>> >...
>>> > Caused by: org.apache.flink.table.api.ValidationException: Unsupported
>>> kind 'DELETE' of a row [redacted row]. Only rows with 'INSERT' kind are
>>> supported when converting to an expression.
>>>
>>> I'm googling around and haven't found anything informative about what
>>> might be causing this issue. Any ideas?
>>>
>>> I'll also take a look at the SQL functions you suggested and see if I
>>> can use those.
>>>
>>> Thanks!
>>>
>>>
>>>
>>> On Wed, Jan 27, 2021 at 11:48 PM Arvid Heise <ar...@apache.org> wrote:
>>>
>>>> Hi Rex,
>>>>
>>>> if your keyby (and with join/grouping/windowing) is random or not
>>>> depends on the relationship of the join/grouping key with your Kafka
>>>> partitioning key.
>>>>
>>>> Say your partitioning key is document_id. Then, any join/grouping key
>>>> that is composed of (or is exactly) document_id, will retain the order. You
>>>> should always ask yourself the question: can two records coming from the
>>>> ordered Kafka partition X be processed by two different operator instances.
>>>> For a join/grouping operator, there is only the strict guarantee that all
>>>> records with the same key will be shuffled into the same operator instance.
>>>>
>>>> Your compaction in general looks good but I'm not deep into Table API.
>>>> I'm quite sure that *FIRST_VALUE* and *LAST_VALUE* functions in Table
>>>> API should already do what you want. [1]
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/functions/systemFunctions.html#aggregate-functions
>>>>
>>>> On Thu, Jan 28, 2021 at 6:45 AM Rex Fenley <r...@remind101.com> wrote:
>>>>
>>>>> In addition to those questions, assuming that keyed streams are in
>>>>> order, I've come up with the following solution to compact our records and
>>>>> only pick the most recent one per id before sending to the ES sink.
>>>>>
>>>>> The first item in the Row is the document ID / primary key which we
>>>>> want to compact records on.
>>>>>
>>>>> val userDocsStream = userDocsTable.toRetractStream[Row].keyBy(_._2.get(0))
>>>>>     userDocsStream
>>>>>       .window(TumblingEventTimeWindows.of(Time.seconds(1)))
>>>>>       .aggregate(new CompactionAggregate())class CompactionAggregate
>>>>>     extends AggregateFunction[
>>>>>       (Boolean, Row),
>>>>>       (Boolean, Row),
>>>>>       (Boolean, Row)
>>>>>     ] {  override def createAccumulator() = (false, null)  // Just take 
>>>>> the latest value to compact.
>>>>>   override def add(value: (Boolean, Row), accumulator: (Boolean, Row)) =
>>>>>     value  override def getResult(accumulator: (Boolean, Row)) = 
>>>>> accumulator  // This is a required function that we don't use.
>>>>>   override def merge(a: (Boolean, Row), b: (Boolean, Row)) =
>>>>>     throw new NotImplementedException()
>>>>> }
>>>>>
>>>>> I'm hoping that if the last record in the window is an insert it picks
>>>>> that if it's a retract then it picks that and then when we send this to 
>>>>> the
>>>>> ES sink we will simply check true or false in the first element of the
>>>>> tuple for an insert or delete request to ES. Does this seem like it will
>>>>> work?
>>>>>
>>>>> Thanks!
>>>>>
>>>>>
>>>>> On Wed, Jan 27, 2021 at 12:45 PM Rex Fenley <r...@remind101.com> wrote:
>>>>>
>>>>>> This is great info, thanks!
>>>>>>
>>>>>> My question then becomes, what constitutes a random shuffle?
>>>>>> Currently we're using the Table API with minibatch on flink v1.11.3. Do 
>>>>>> our
>>>>>> joins output a keyed stream of records by join key or is this random? I
>>>>>> imagine that they'd have to have a key for retracts and accumulates to
>>>>>> arrive in order on the next downstream operator. Same with aggs but on 
>>>>>> the
>>>>>> groupBy key.
>>>>>>
>>>>>> Does this sound correct to you?
>>>>>>
>>>>>> Thanks!
>>>>>>
>>>>>> On Wed, Jan 27, 2021 at 11:39 AM Arvid Heise <ar...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Rex,
>>>>>>>
>>>>>>> indeed these two statements look like they contradict each other,
>>>>>>> but they are looking at both sides from the same coin.
>>>>>>> Flink is simply putting records in FIFO in windows. That is, there
>>>>>>> is no ordering on event time if there are late events. So if your 
>>>>>>> elements
>>>>>>> arrive ordered, the ordering is retained. If your elements arrive
>>>>>>> unordered, the same unordered order is retained.
>>>>>>>
>>>>>>> However, note that Flink can only guarantee FIFO according to your
>>>>>>> topology. Consider a source with parallelism 2, each reading data from 
>>>>>>> an
>>>>>>> ordered kafka partition (k1, k2) respectively. Each partition has 
>>>>>>> records
>>>>>>> with keys, such that no key appears in both partitions (default 
>>>>>>> behavior if
>>>>>>> you set keys but no partition while writing to Kafka).
>>>>>>> 1) Let's assume you do a simple transformation and write them back
>>>>>>> into kafka with the same key. Then you can be sure that the order of the
>>>>>>> records is retained.
>>>>>>>
>>>>>>> 2) Now you add a random shuffle and have the transformation. Now two
>>>>>>> successive records may be processed in parallel and there is a race
>>>>>>> condition who is written first into Kafka. So order is not retained.
>>>>>>>
>>>>>>> 3) You shuffle both partitions by the Kafka key (keyby) and do some
>>>>>>> aggregation. Two successive records with the same key will always be
>>>>>>> processed by the same aggregation operator. So the order is retained for
>>>>>>> each key (note that this is what you usually want and want Kafka gives 
>>>>>>> you
>>>>>>> if you don't set the partition explicitly and just provide a key)
>>>>>>>
>>>>>>> 4) You shuffle both partitions by a different key. Then two
>>>>>>> successive Kafka records could be again calculated in parallel such that
>>>>>>> there is a race condition.
>>>>>>>
>>>>>>> Note that windows are a kind of aggregation.
>>>>>>>
>>>>>>> So Flink is never going to restore an ordering that is not there
>>>>>>> (because it's too costly and there are too many unknowns). But you can
>>>>>>> infer the guarantees by analyzing your topology.
>>>>>>>
>>>>>>> ---
>>>>>>>
>>>>>>> Please note that there is a common pitfall when you work with Kafka:
>>>>>>> - Ordering of records in Kafka is only guaranteed if you set 
>>>>>>> *max.in.flight.requests.per.connection
>>>>>>> *to 1
>>>>>>> *. [1]*
>>>>>>> *- *Often you also want to set *enable.idempotence* and *acks=all*
>>>>>>>
>>>>>>> That is true for the upstream application and if you plan back to
>>>>>>> write to Kafka you also need to set that in Flink.
>>>>>>>
>>>>>>> [1]
>>>>>>> https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html
>>>>>>>
>>>>>>> On Wed, Jan 27, 2021 at 6:08 AM Rex Fenley <r...@remind101.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hello,
>>>>>>>>
>>>>>>>> I began reading
>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/concepts/concepts.html#parallel-dataflows
>>>>>>>>
>>>>>>>>    -
>>>>>>>>
>>>>>>>>    *> Redistributing* streams (between *map()* and *keyBy/window*,
>>>>>>>>    as well as between *keyBy/window* and *sink*) change the
>>>>>>>>    partitioning of streams. Each *operator subtask* sends data to
>>>>>>>>    different target subtasks, depending on the selected transformation.
>>>>>>>>    Examples are *keyBy()* (re-partitions by hash code),
>>>>>>>>    *broadcast()*, or *rebalance()* (random redistribution). In a
>>>>>>>>    *redistributing* exchange, order among elements is only
>>>>>>>>    preserved for each pair of sending- and receiving task (for example
>>>>>>>>    subtask[1] of *map()* and subtask[2] of *keyBy/window*).
>>>>>>>>
>>>>>>>> This makes it sounds like ordering on the same partition/key is
>>>>>>>> always maintained. Which is exactly the ordering guarantee that I need.
>>>>>>>> This seems to slightly contradict the statement "Flink provides no
>>>>>>>> guarantees about the order of the elements within a window" for keyed
>>>>>>>> state. So is it true that ordering _is_ guaranteed for identical keys?
>>>>>>>>
>>>>>>>> If I'm not mistaken, the state in the TableAPI is always considered
>>>>>>>> keyed state for a join or aggregate. Or am I missing something?
>>>>>>>>
>>>>>>>> Thanks!
>>>>>>>>
>>>>>>>> On Tue, Jan 26, 2021 at 8:53 PM Rex Fenley <r...@remind101.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Our data arrives in order from Kafka, so we are hoping to use that
>>>>>>>>> same order for our processing.
>>>>>>>>>
>>>>>>>>> On Tue, Jan 26, 2021 at 8:40 PM Rex Fenley <r...@remind101.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Going further, if "Flink provides no guarantees about the order
>>>>>>>>>> of the elements within a window" then with minibatch, which I assume 
>>>>>>>>>> uses a
>>>>>>>>>> window under the hood, any aggregates that expect rows to arrive in 
>>>>>>>>>> order
>>>>>>>>>> will fail to keep their consistency. Is this correct?
>>>>>>>>>>
>>>>>>>>>> On Tue, Jan 26, 2021 at 5:36 PM Rex Fenley <r...@remind101.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hello,
>>>>>>>>>>>
>>>>>>>>>>> We have a job from CDC to a large unbounded Flink plan to
>>>>>>>>>>> Elasticsearch.
>>>>>>>>>>>
>>>>>>>>>>> Currently, we have been relentlessly trying to reduce our record
>>>>>>>>>>> amplification which, when our Elasticsearch index is near fully 
>>>>>>>>>>> populated,
>>>>>>>>>>> completely bottlenecks our write performance. We decided recently 
>>>>>>>>>>> to try a
>>>>>>>>>>> new job using mini-batch. At first this seemed promising but at 
>>>>>>>>>>> some point
>>>>>>>>>>> we began getting huge record amplification in a join operator. It 
>>>>>>>>>>> appears
>>>>>>>>>>> that minibatch may only batch on aggregate operators?
>>>>>>>>>>>
>>>>>>>>>>> So we're now thinking that we should have a window before our ES
>>>>>>>>>>> sink which only takes the last record for any unique document id in 
>>>>>>>>>>> the
>>>>>>>>>>> window, since that's all we really want to send anyway. However, 
>>>>>>>>>>> when
>>>>>>>>>>> investigating turning a table, to a keyed window stream for 
>>>>>>>>>>> deduping, and
>>>>>>>>>>> then back into a table I read the following:
>>>>>>>>>>>
>>>>>>>>>>> >Attention Flink provides no guarantees about the order of the
>>>>>>>>>>> elements within a window. This implies that although an evictor may 
>>>>>>>>>>> remove
>>>>>>>>>>> elements from the beginning of the window, these are not 
>>>>>>>>>>> necessarily the
>>>>>>>>>>> ones that arrive first or last. [1]
>>>>>>>>>>>
>>>>>>>>>>> which has put a damper on our investigation.
>>>>>>>>>>>
>>>>>>>>>>> I then found the deduplication SQL doc [2], but I have a hard
>>>>>>>>>>> time parsing what the SQL does and we've never used TemporaryViews 
>>>>>>>>>>> or
>>>>>>>>>>> proctime before.
>>>>>>>>>>> Is this essentially what we want?
>>>>>>>>>>> Will just using this SQL be safe for a job that is unbounded and
>>>>>>>>>>> just wants to deduplicate a document write to whatever the most 
>>>>>>>>>>> current one
>>>>>>>>>>> is (i.e. will restoring from a checkpoint maintain our unbounded
>>>>>>>>>>> consistency and will deletes work)?
>>>>>>>>>>>
>>>>>>>>>>> [1]
>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html
>>>>>>>>>>> [2]
>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#deduplication
>>>>>>>>>>>
>>>>>>>>>>> Thanks!
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>>
>>>>>>>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Remind.com <https://www.remind.com/> |  BLOG
>>>>>>>>>>> <http://blog.remind.com/>  |  FOLLOW US
>>>>>>>>>>> <https://twitter.com/remindhq>  |  LIKE US
>>>>>>>>>>> <https://www.facebook.com/remindhq>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>>
>>>>>>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Remind.com <https://www.remind.com/> |  BLOG
>>>>>>>>>> <http://blog.remind.com/>  |  FOLLOW US
>>>>>>>>>> <https://twitter.com/remindhq>  |  LIKE US
>>>>>>>>>> <https://www.facebook.com/remindhq>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>>
>>>>>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Remind.com <https://www.remind.com/> |  BLOG
>>>>>>>>> <http://blog.remind.com/>  |  FOLLOW US
>>>>>>>>> <https://twitter.com/remindhq>  |  LIKE US
>>>>>>>>> <https://www.facebook.com/remindhq>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>>
>>>>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>>>>
>>>>>>>>
>>>>>>>> Remind.com <https://www.remind.com/> |  BLOG
>>>>>>>> <http://blog.remind.com/>  |  FOLLOW US
>>>>>>>> <https://twitter.com/remindhq>  |  LIKE US
>>>>>>>> <https://www.facebook.com/remindhq>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>>
>>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>>
>>>>>>
>>>>>> Remind.com <https://www.remind.com/> |  BLOG
>>>>>> <http://blog.remind.com/>  |  FOLLOW US
>>>>>> <https://twitter.com/remindhq>  |  LIKE US
>>>>>> <https://www.facebook.com/remindhq>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>
>>>>>
>>>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>>>> <https://www.facebook.com/remindhq>
>>>>>
>>>>
>>>
>>> --
>>>
>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>
>>>
>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>> <https://www.facebook.com/remindhq>
>>>
>>
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>> <https://www.facebook.com/remindhq>
>>
>
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>  |
>  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> <https://www.facebook.com/remindhq>
>


-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>

Reply via email to