Hi Rex,

there cannot be any late event in processing time by definition (maybe on a
quantum computer?), so you should be fine. The timestamp of records in
processing time is monotonously increasing.

Best,

Arvid

On Fri, Jan 29, 2021 at 1:14 AM Rex Fenley <r...@remind101.com> wrote:

> Switching to TumblingProcessingTimeWindows seems to have solved that
> problem.
>
> For my own understanding, this won't have any "late" and therefore dropped
> records right? We cannot blindly drop a record from the aggregate
> evaluation, it just needs to take all the records it gets in a window and
> process them and then the aggregate will take whatever is last in-order.
>
> Thanks!
>
> On Thu, Jan 28, 2021 at 1:01 PM Rex Fenley <r...@remind101.com> wrote:
>
>> It looks like it wants me to call assignTimestampsAndWatermarks but I
>> already have a timer on my window which I'd expect everything entering this
>> stream would simply be aggregated during that window
>> .window(TumblingEventTimeWindows.of(Time.seconds(1)))
>>
>> On Thu, Jan 28, 2021 at 12:59 PM Rex Fenley <r...@remind101.com> wrote:
>>
>>> I think I may have been affected by some late night programming.
>>>
>>> Slightly revised how I'm using my aggregate
>>> val userDocsStream =
>>> this.tableEnv
>>> .toRetractStream(userDocsTable, classOf[Row])
>>> .keyBy(_.f1.getField(0))
>>> val compactedUserDocsStream = userDocsStream
>>> .window(TumblingEventTimeWindows.of(Time.seconds(1)))
>>> .aggregate(new CompactionAggregate())
>>> but this now gives me the following exception:
>>> java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no
>>> timestamp marker). Is the time characteristic set to 'ProcessingTime',
>>> or did you forget to call
>>> 'DataStream.assignTimestampsAndWatermarks(...)'?
>>>     at org.apache.flink.streaming.api.windowing.assigners.
>>> TumblingEventTimeWindows.assignWindows(TumblingEventTimeWindows.java:69)
>>>     at org.apache.flink.streaming.runtime.operators.windowing.
>>> WindowOperator.processElement(WindowOperator.java:295)
>>>     at org.apache.flink.streaming.runtime.tasks.
>>> OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask
>>> .java:161)
>>>     at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
>>> .processElement(StreamTaskNetworkInput.java:178)
>>>     at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
>>> .emitNext(StreamTaskNetworkInput.java:153)
>>>     at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
>>> .processInput(StreamOneInputProcessor.java:67)
>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
>>> StreamTask.java:351)
>>>     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
>>> .runMailboxStep(MailboxProcessor.java:191)
>>>     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
>>> .runMailboxLoop(MailboxProcessor.java:181)
>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask
>>> .runMailboxLoop(StreamTask.java:566)
>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>>> StreamTask.java:536)
>>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>>>     at java.base/java.lang.Thread.run(Thread.java:829)
>>>
>>> Which I'm not at all sure how to interpret
>>>
>>> On Wed, Jan 27, 2021 at 11:57 PM Rex Fenley <r...@remind101.com> wrote:
>>>
>>>> Ok, that sounds like it confirms my expectations.
>>>>
>>>> So I tried running my above code and had to slightly edit to using java
>>>> Tuple2 because our execution environment stuff is all in Java.
>>>>
>>>> class CompactionAggregate
>>>> extends AggregateFunction[
>>>> Tuple2[java.lang.Boolean, Row],
>>>> Tuple2[java.lang.Boolean, Row],
>>>> Tuple2[java.lang.Boolean, Row]
>>>> ] {
>>>>
>>>> override def createAccumulator() = new Tuple2(false, null)
>>>>
>>>> // Just take the lastest value to compact.
>>>> override def add(
>>>> value: Tuple2[java.lang.Boolean, Row],
>>>> accumulator: Tuple2[java.lang.Boolean, Row]
>>>> ) =
>>>> value
>>>>
>>>> override def getResult(accumulator: Tuple2[java.lang.Boolean, Row]) =
>>>> accumulator
>>>>
>>>> // This is a required function that we don't use.
>>>> override def merge(
>>>> a: Tuple2[java.lang.Boolean, Row],
>>>> b: Tuple2[java.lang.Boolean, Row]
>>>> ) =
>>>> throw new NotImplementedException()
>>>> }
>>>>
>>>> But when running I get the following error:
>>>> >Caused by: java.lang.RuntimeException: Could not extract key from
>>>> [redacted row]
>>>> >...
>>>> > Caused by: org.apache.flink.table.api.ValidationException:
>>>> Unsupported kind 'DELETE' of a row [redacted row]. Only rows with 'INSERT'
>>>> kind are supported when converting to an expression.
>>>>
>>>> I'm googling around and haven't found anything informative about what
>>>> might be causing this issue. Any ideas?
>>>>
>>>> I'll also take a look at the SQL functions you suggested and see if I
>>>> can use those.
>>>>
>>>> Thanks!
>>>>
>>>>
>>>>
>>>> On Wed, Jan 27, 2021 at 11:48 PM Arvid Heise <ar...@apache.org> wrote:
>>>>
>>>>> Hi Rex,
>>>>>
>>>>> if your keyby (and with join/grouping/windowing) is random or not
>>>>> depends on the relationship of the join/grouping key with your Kafka
>>>>> partitioning key.
>>>>>
>>>>> Say your partitioning key is document_id. Then, any join/grouping key
>>>>> that is composed of (or is exactly) document_id, will retain the order. 
>>>>> You
>>>>> should always ask yourself the question: can two records coming from the
>>>>> ordered Kafka partition X be processed by two different operator 
>>>>> instances.
>>>>> For a join/grouping operator, there is only the strict guarantee that all
>>>>> records with the same key will be shuffled into the same operator 
>>>>> instance.
>>>>>
>>>>> Your compaction in general looks good but I'm not deep into Table API.
>>>>> I'm quite sure that *FIRST_VALUE* and *LAST_VALUE* functions in Table
>>>>> API should already do what you want. [1]
>>>>>
>>>>> [1]
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/functions/systemFunctions.html#aggregate-functions
>>>>>
>>>>> On Thu, Jan 28, 2021 at 6:45 AM Rex Fenley <r...@remind101.com> wrote:
>>>>>
>>>>>> In addition to those questions, assuming that keyed streams are in
>>>>>> order, I've come up with the following solution to compact our records 
>>>>>> and
>>>>>> only pick the most recent one per id before sending to the ES sink.
>>>>>>
>>>>>> The first item in the Row is the document ID / primary key which we
>>>>>> want to compact records on.
>>>>>>
>>>>>> val userDocsStream = 
>>>>>> userDocsTable.toRetractStream[Row].keyBy(_._2.get(0))
>>>>>>     userDocsStream
>>>>>>       .window(TumblingEventTimeWindows.of(Time.seconds(1)))
>>>>>>       .aggregate(new CompactionAggregate())class CompactionAggregate
>>>>>>     extends AggregateFunction[
>>>>>>       (Boolean, Row),
>>>>>>       (Boolean, Row),
>>>>>>       (Boolean, Row)
>>>>>>     ] {  override def createAccumulator() = (false, null)  // Just take 
>>>>>> the latest value to compact.
>>>>>>   override def add(value: (Boolean, Row), accumulator: (Boolean, Row)) =
>>>>>>     value  override def getResult(accumulator: (Boolean, Row)) = 
>>>>>> accumulator  // This is a required function that we don't use.
>>>>>>   override def merge(a: (Boolean, Row), b: (Boolean, Row)) =
>>>>>>     throw new NotImplementedException()
>>>>>> }
>>>>>>
>>>>>> I'm hoping that if the last record in the window is an insert it
>>>>>> picks that if it's a retract then it picks that and then when we send 
>>>>>> this
>>>>>> to the ES sink we will simply check true or false in the first element of
>>>>>> the tuple for an insert or delete request to ES. Does this seem like it
>>>>>> will work?
>>>>>>
>>>>>> Thanks!
>>>>>>
>>>>>>
>>>>>> On Wed, Jan 27, 2021 at 12:45 PM Rex Fenley <r...@remind101.com>
>>>>>> wrote:
>>>>>>
>>>>>>> This is great info, thanks!
>>>>>>>
>>>>>>> My question then becomes, what constitutes a random shuffle?
>>>>>>> Currently we're using the Table API with minibatch on flink v1.11.3. Do 
>>>>>>> our
>>>>>>> joins output a keyed stream of records by join key or is this random? I
>>>>>>> imagine that they'd have to have a key for retracts and accumulates to
>>>>>>> arrive in order on the next downstream operator. Same with aggs but on 
>>>>>>> the
>>>>>>> groupBy key.
>>>>>>>
>>>>>>> Does this sound correct to you?
>>>>>>>
>>>>>>> Thanks!
>>>>>>>
>>>>>>> On Wed, Jan 27, 2021 at 11:39 AM Arvid Heise <ar...@apache.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Rex,
>>>>>>>>
>>>>>>>> indeed these two statements look like they contradict each other,
>>>>>>>> but they are looking at both sides from the same coin.
>>>>>>>> Flink is simply putting records in FIFO in windows. That is, there
>>>>>>>> is no ordering on event time if there are late events. So if your 
>>>>>>>> elements
>>>>>>>> arrive ordered, the ordering is retained. If your elements arrive
>>>>>>>> unordered, the same unordered order is retained.
>>>>>>>>
>>>>>>>> However, note that Flink can only guarantee FIFO according to your
>>>>>>>> topology. Consider a source with parallelism 2, each reading data from 
>>>>>>>> an
>>>>>>>> ordered kafka partition (k1, k2) respectively. Each partition has 
>>>>>>>> records
>>>>>>>> with keys, such that no key appears in both partitions (default 
>>>>>>>> behavior if
>>>>>>>> you set keys but no partition while writing to Kafka).
>>>>>>>> 1) Let's assume you do a simple transformation and write them back
>>>>>>>> into kafka with the same key. Then you can be sure that the order of 
>>>>>>>> the
>>>>>>>> records is retained.
>>>>>>>>
>>>>>>>> 2) Now you add a random shuffle and have the transformation. Now
>>>>>>>> two successive records may be processed in parallel and there is a race
>>>>>>>> condition who is written first into Kafka. So order is not retained.
>>>>>>>>
>>>>>>>> 3) You shuffle both partitions by the Kafka key (keyby) and do some
>>>>>>>> aggregation. Two successive records with the same key will always be
>>>>>>>> processed by the same aggregation operator. So the order is retained 
>>>>>>>> for
>>>>>>>> each key (note that this is what you usually want and want Kafka gives 
>>>>>>>> you
>>>>>>>> if you don't set the partition explicitly and just provide a key)
>>>>>>>>
>>>>>>>> 4) You shuffle both partitions by a different key. Then two
>>>>>>>> successive Kafka records could be again calculated in parallel such 
>>>>>>>> that
>>>>>>>> there is a race condition.
>>>>>>>>
>>>>>>>> Note that windows are a kind of aggregation.
>>>>>>>>
>>>>>>>> So Flink is never going to restore an ordering that is not there
>>>>>>>> (because it's too costly and there are too many unknowns). But you can
>>>>>>>> infer the guarantees by analyzing your topology.
>>>>>>>>
>>>>>>>> ---
>>>>>>>>
>>>>>>>> Please note that there is a common pitfall when you work with Kafka:
>>>>>>>> - Ordering of records in Kafka is only guaranteed if you set 
>>>>>>>> *max.in.flight.requests.per.connection
>>>>>>>> *to 1
>>>>>>>> *. [1]*
>>>>>>>> *- *Often you also want to set *enable.idempotence* and *acks=all*
>>>>>>>>
>>>>>>>> That is true for the upstream application and if you plan back to
>>>>>>>> write to Kafka you also need to set that in Flink.
>>>>>>>>
>>>>>>>> [1]
>>>>>>>> https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html
>>>>>>>>
>>>>>>>> On Wed, Jan 27, 2021 at 6:08 AM Rex Fenley <r...@remind101.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hello,
>>>>>>>>>
>>>>>>>>> I began reading
>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/concepts/concepts.html#parallel-dataflows
>>>>>>>>>
>>>>>>>>>    -
>>>>>>>>>
>>>>>>>>>    *> Redistributing* streams (between *map()* and *keyBy/window*,
>>>>>>>>>    as well as between *keyBy/window* and *sink*) change the
>>>>>>>>>    partitioning of streams. Each *operator subtask* sends data to
>>>>>>>>>    different target subtasks, depending on the selected 
>>>>>>>>> transformation.
>>>>>>>>>    Examples are *keyBy()* (re-partitions by hash code),
>>>>>>>>>    *broadcast()*, or *rebalance()* (random redistribution). In a
>>>>>>>>>    *redistributing* exchange, order among elements is only
>>>>>>>>>    preserved for each pair of sending- and receiving task (for example
>>>>>>>>>    subtask[1] of *map()* and subtask[2] of *keyBy/window*).
>>>>>>>>>
>>>>>>>>> This makes it sounds like ordering on the same partition/key is
>>>>>>>>> always maintained. Which is exactly the ordering guarantee that I 
>>>>>>>>> need.
>>>>>>>>> This seems to slightly contradict the statement "Flink provides no
>>>>>>>>> guarantees about the order of the elements within a window" for keyed
>>>>>>>>> state. So is it true that ordering _is_ guaranteed for identical keys?
>>>>>>>>>
>>>>>>>>> If I'm not mistaken, the state in the TableAPI is always
>>>>>>>>> considered keyed state for a join or aggregate. Or am I missing 
>>>>>>>>> something?
>>>>>>>>>
>>>>>>>>> Thanks!
>>>>>>>>>
>>>>>>>>> On Tue, Jan 26, 2021 at 8:53 PM Rex Fenley <r...@remind101.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Our data arrives in order from Kafka, so we are hoping to use
>>>>>>>>>> that same order for our processing.
>>>>>>>>>>
>>>>>>>>>> On Tue, Jan 26, 2021 at 8:40 PM Rex Fenley <r...@remind101.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Going further, if "Flink provides no guarantees about the order
>>>>>>>>>>> of the elements within a window" then with minibatch, which I 
>>>>>>>>>>> assume uses a
>>>>>>>>>>> window under the hood, any aggregates that expect rows to arrive in 
>>>>>>>>>>> order
>>>>>>>>>>> will fail to keep their consistency. Is this correct?
>>>>>>>>>>>
>>>>>>>>>>> On Tue, Jan 26, 2021 at 5:36 PM Rex Fenley <r...@remind101.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hello,
>>>>>>>>>>>>
>>>>>>>>>>>> We have a job from CDC to a large unbounded Flink plan to
>>>>>>>>>>>> Elasticsearch.
>>>>>>>>>>>>
>>>>>>>>>>>> Currently, we have been relentlessly trying to reduce our
>>>>>>>>>>>> record amplification which, when our Elasticsearch index is near 
>>>>>>>>>>>> fully
>>>>>>>>>>>> populated, completely bottlenecks our write performance. We decided
>>>>>>>>>>>> recently to try a new job using mini-batch. At first this seemed 
>>>>>>>>>>>> promising
>>>>>>>>>>>> but at some point we began getting huge record amplification in a 
>>>>>>>>>>>> join
>>>>>>>>>>>> operator. It appears that minibatch may only batch on aggregate 
>>>>>>>>>>>> operators?
>>>>>>>>>>>>
>>>>>>>>>>>> So we're now thinking that we should have a window before our
>>>>>>>>>>>> ES sink which only takes the last record for any unique document 
>>>>>>>>>>>> id in the
>>>>>>>>>>>> window, since that's all we really want to send anyway. However, 
>>>>>>>>>>>> when
>>>>>>>>>>>> investigating turning a table, to a keyed window stream for 
>>>>>>>>>>>> deduping, and
>>>>>>>>>>>> then back into a table I read the following:
>>>>>>>>>>>>
>>>>>>>>>>>> >Attention Flink provides no guarantees about the order of the
>>>>>>>>>>>> elements within a window. This implies that although an evictor 
>>>>>>>>>>>> may remove
>>>>>>>>>>>> elements from the beginning of the window, these are not 
>>>>>>>>>>>> necessarily the
>>>>>>>>>>>> ones that arrive first or last. [1]
>>>>>>>>>>>>
>>>>>>>>>>>> which has put a damper on our investigation.
>>>>>>>>>>>>
>>>>>>>>>>>> I then found the deduplication SQL doc [2], but I have a hard
>>>>>>>>>>>> time parsing what the SQL does and we've never used TemporaryViews 
>>>>>>>>>>>> or
>>>>>>>>>>>> proctime before.
>>>>>>>>>>>> Is this essentially what we want?
>>>>>>>>>>>> Will just using this SQL be safe for a job that is unbounded
>>>>>>>>>>>> and just wants to deduplicate a document write to whatever the 
>>>>>>>>>>>> most current
>>>>>>>>>>>> one is (i.e. will restoring from a checkpoint maintain our 
>>>>>>>>>>>> unbounded
>>>>>>>>>>>> consistency and will deletes work)?
>>>>>>>>>>>>
>>>>>>>>>>>> [1]
>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html
>>>>>>>>>>>> [2]
>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#deduplication
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks!
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>>
>>>>>>>>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Remind.com <https://www.remind.com/> |  BLOG
>>>>>>>>>>>> <http://blog.remind.com/>  |  FOLLOW US
>>>>>>>>>>>> <https://twitter.com/remindhq>  |  LIKE US
>>>>>>>>>>>> <https://www.facebook.com/remindhq>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>>
>>>>>>>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Remind.com <https://www.remind.com/> |  BLOG
>>>>>>>>>>> <http://blog.remind.com/>  |  FOLLOW US
>>>>>>>>>>> <https://twitter.com/remindhq>  |  LIKE US
>>>>>>>>>>> <https://www.facebook.com/remindhq>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>>
>>>>>>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Remind.com <https://www.remind.com/> |  BLOG
>>>>>>>>>> <http://blog.remind.com/>  |  FOLLOW US
>>>>>>>>>> <https://twitter.com/remindhq>  |  LIKE US
>>>>>>>>>> <https://www.facebook.com/remindhq>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>>
>>>>>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Remind.com <https://www.remind.com/> |  BLOG
>>>>>>>>> <http://blog.remind.com/>  |  FOLLOW US
>>>>>>>>> <https://twitter.com/remindhq>  |  LIKE US
>>>>>>>>> <https://www.facebook.com/remindhq>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>>
>>>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>>>
>>>>>>>
>>>>>>> Remind.com <https://www.remind.com/> |  BLOG
>>>>>>> <http://blog.remind.com/>  |  FOLLOW US
>>>>>>> <https://twitter.com/remindhq>  |  LIKE US
>>>>>>> <https://www.facebook.com/remindhq>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>>
>>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>>
>>>>>>
>>>>>> Remind.com <https://www.remind.com/> |  BLOG
>>>>>> <http://blog.remind.com/>  |  FOLLOW US
>>>>>> <https://twitter.com/remindhq>  |  LIKE US
>>>>>> <https://www.facebook.com/remindhq>
>>>>>>
>>>>>
>>>>
>>>> --
>>>>
>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>
>>>>
>>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>>> <https://www.facebook.com/remindhq>
>>>>
>>>
>>>
>>> --
>>>
>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>
>>>
>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>> <https://www.facebook.com/remindhq>
>>>
>>
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>> <https://www.facebook.com/remindhq>
>>
>
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>  |
>  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> <https://www.facebook.com/remindhq>
>

Reply via email to