It looks like it wants me to call assignTimestampsAndWatermarks but I already have a timer on my window which I'd expect everything entering this stream would simply be aggregated during that window .window(TumblingEventTimeWindows.of(Time.seconds(1)))
On Thu, Jan 28, 2021 at 12:59 PM Rex Fenley <[email protected]> wrote: > I think I may have been affected by some late night programming. > > Slightly revised how I'm using my aggregate > val userDocsStream = > this.tableEnv > .toRetractStream(userDocsTable, classOf[Row]) > .keyBy(_.f1.getField(0)) > val compactedUserDocsStream = userDocsStream > .window(TumblingEventTimeWindows.of(Time.seconds(1))) > .aggregate(new CompactionAggregate()) > but this now gives me the following exception: > java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no > timestamp marker). Is the time characteristic set to 'ProcessingTime', or > did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'? > at org.apache.flink.streaming.api.windowing.assigners. > TumblingEventTimeWindows.assignWindows(TumblingEventTimeWindows.java:69) > at org.apache.flink.streaming.runtime.operators.windowing. > WindowOperator.processElement(WindowOperator.java:295) > at org.apache.flink.streaming.runtime.tasks. > OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask > .java:161) > at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput > .processElement(StreamTaskNetworkInput.java:178) > at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput > .emitNext(StreamTaskNetworkInput.java:153) > at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor > .processInput(StreamOneInputProcessor.java:67) > at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput( > StreamTask.java:351) > at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor > .runMailboxStep(MailboxProcessor.java:191) > at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor > .runMailboxLoop(MailboxProcessor.java:181) > at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop( > StreamTask.java:566) > at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( > StreamTask.java:536) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) > at java.base/java.lang.Thread.run(Thread.java:829) > > Which I'm not at all sure how to interpret > > On Wed, Jan 27, 2021 at 11:57 PM Rex Fenley <[email protected]> wrote: > >> Ok, that sounds like it confirms my expectations. >> >> So I tried running my above code and had to slightly edit to using java >> Tuple2 because our execution environment stuff is all in Java. >> >> class CompactionAggregate >> extends AggregateFunction[ >> Tuple2[java.lang.Boolean, Row], >> Tuple2[java.lang.Boolean, Row], >> Tuple2[java.lang.Boolean, Row] >> ] { >> >> override def createAccumulator() = new Tuple2(false, null) >> >> // Just take the lastest value to compact. >> override def add( >> value: Tuple2[java.lang.Boolean, Row], >> accumulator: Tuple2[java.lang.Boolean, Row] >> ) = >> value >> >> override def getResult(accumulator: Tuple2[java.lang.Boolean, Row]) = >> accumulator >> >> // This is a required function that we don't use. >> override def merge( >> a: Tuple2[java.lang.Boolean, Row], >> b: Tuple2[java.lang.Boolean, Row] >> ) = >> throw new NotImplementedException() >> } >> >> But when running I get the following error: >> >Caused by: java.lang.RuntimeException: Could not extract key from >> [redacted row] >> >... >> > Caused by: org.apache.flink.table.api.ValidationException: Unsupported >> kind 'DELETE' of a row [redacted row]. Only rows with 'INSERT' kind are >> supported when converting to an expression. >> >> I'm googling around and haven't found anything informative about what >> might be causing this issue. Any ideas? >> >> I'll also take a look at the SQL functions you suggested and see if I can >> use those. >> >> Thanks! >> >> >> >> On Wed, Jan 27, 2021 at 11:48 PM Arvid Heise <[email protected]> wrote: >> >>> Hi Rex, >>> >>> if your keyby (and with join/grouping/windowing) is random or not >>> depends on the relationship of the join/grouping key with your Kafka >>> partitioning key. >>> >>> Say your partitioning key is document_id. Then, any join/grouping key >>> that is composed of (or is exactly) document_id, will retain the order. You >>> should always ask yourself the question: can two records coming from the >>> ordered Kafka partition X be processed by two different operator instances. >>> For a join/grouping operator, there is only the strict guarantee that all >>> records with the same key will be shuffled into the same operator instance. >>> >>> Your compaction in general looks good but I'm not deep into Table API. >>> I'm quite sure that *FIRST_VALUE* and *LAST_VALUE* functions in Table >>> API should already do what you want. [1] >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/functions/systemFunctions.html#aggregate-functions >>> >>> On Thu, Jan 28, 2021 at 6:45 AM Rex Fenley <[email protected]> wrote: >>> >>>> In addition to those questions, assuming that keyed streams are in >>>> order, I've come up with the following solution to compact our records and >>>> only pick the most recent one per id before sending to the ES sink. >>>> >>>> The first item in the Row is the document ID / primary key which we >>>> want to compact records on. >>>> >>>> val userDocsStream = userDocsTable.toRetractStream[Row].keyBy(_._2.get(0)) >>>> userDocsStream >>>> .window(TumblingEventTimeWindows.of(Time.seconds(1))) >>>> .aggregate(new CompactionAggregate())class CompactionAggregate >>>> extends AggregateFunction[ >>>> (Boolean, Row), >>>> (Boolean, Row), >>>> (Boolean, Row) >>>> ] { override def createAccumulator() = (false, null) // Just take >>>> the latest value to compact. >>>> override def add(value: (Boolean, Row), accumulator: (Boolean, Row)) = >>>> value override def getResult(accumulator: (Boolean, Row)) = >>>> accumulator // This is a required function that we don't use. >>>> override def merge(a: (Boolean, Row), b: (Boolean, Row)) = >>>> throw new NotImplementedException() >>>> } >>>> >>>> I'm hoping that if the last record in the window is an insert it picks >>>> that if it's a retract then it picks that and then when we send this to the >>>> ES sink we will simply check true or false in the first element of the >>>> tuple for an insert or delete request to ES. Does this seem like it will >>>> work? >>>> >>>> Thanks! >>>> >>>> >>>> On Wed, Jan 27, 2021 at 12:45 PM Rex Fenley <[email protected]> wrote: >>>> >>>>> This is great info, thanks! >>>>> >>>>> My question then becomes, what constitutes a random shuffle? Currently >>>>> we're using the Table API with minibatch on flink v1.11.3. Do our joins >>>>> output a keyed stream of records by join key or is this random? I imagine >>>>> that they'd have to have a key for retracts and accumulates to arrive in >>>>> order on the next downstream operator. Same with aggs but on the groupBy >>>>> key. >>>>> >>>>> Does this sound correct to you? >>>>> >>>>> Thanks! >>>>> >>>>> On Wed, Jan 27, 2021 at 11:39 AM Arvid Heise <[email protected]> wrote: >>>>> >>>>>> Hi Rex, >>>>>> >>>>>> indeed these two statements look like they contradict each other, but >>>>>> they are looking at both sides from the same coin. >>>>>> Flink is simply putting records in FIFO in windows. That is, there is >>>>>> no ordering on event time if there are late events. So if your elements >>>>>> arrive ordered, the ordering is retained. If your elements arrive >>>>>> unordered, the same unordered order is retained. >>>>>> >>>>>> However, note that Flink can only guarantee FIFO according to your >>>>>> topology. Consider a source with parallelism 2, each reading data from an >>>>>> ordered kafka partition (k1, k2) respectively. Each partition has records >>>>>> with keys, such that no key appears in both partitions (default behavior >>>>>> if >>>>>> you set keys but no partition while writing to Kafka). >>>>>> 1) Let's assume you do a simple transformation and write them back >>>>>> into kafka with the same key. Then you can be sure that the order of the >>>>>> records is retained. >>>>>> >>>>>> 2) Now you add a random shuffle and have the transformation. Now two >>>>>> successive records may be processed in parallel and there is a race >>>>>> condition who is written first into Kafka. So order is not retained. >>>>>> >>>>>> 3) You shuffle both partitions by the Kafka key (keyby) and do some >>>>>> aggregation. Two successive records with the same key will always be >>>>>> processed by the same aggregation operator. So the order is retained for >>>>>> each key (note that this is what you usually want and want Kafka gives >>>>>> you >>>>>> if you don't set the partition explicitly and just provide a key) >>>>>> >>>>>> 4) You shuffle both partitions by a different key. Then two >>>>>> successive Kafka records could be again calculated in parallel such that >>>>>> there is a race condition. >>>>>> >>>>>> Note that windows are a kind of aggregation. >>>>>> >>>>>> So Flink is never going to restore an ordering that is not there >>>>>> (because it's too costly and there are too many unknowns). But you can >>>>>> infer the guarantees by analyzing your topology. >>>>>> >>>>>> --- >>>>>> >>>>>> Please note that there is a common pitfall when you work with Kafka: >>>>>> - Ordering of records in Kafka is only guaranteed if you set >>>>>> *max.in.flight.requests.per.connection >>>>>> *to 1 >>>>>> *. [1]* >>>>>> *- *Often you also want to set *enable.idempotence* and *acks=all* >>>>>> >>>>>> That is true for the upstream application and if you plan back to >>>>>> write to Kafka you also need to set that in Flink. >>>>>> >>>>>> [1] >>>>>> https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html >>>>>> >>>>>> On Wed, Jan 27, 2021 at 6:08 AM Rex Fenley <[email protected]> wrote: >>>>>> >>>>>>> Hello, >>>>>>> >>>>>>> I began reading >>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/concepts/concepts.html#parallel-dataflows >>>>>>> >>>>>>> - >>>>>>> >>>>>>> *> Redistributing* streams (between *map()* and *keyBy/window*, >>>>>>> as well as between *keyBy/window* and *sink*) change the >>>>>>> partitioning of streams. Each *operator subtask* sends data to >>>>>>> different target subtasks, depending on the selected transformation. >>>>>>> Examples are *keyBy()* (re-partitions by hash code), >>>>>>> *broadcast()*, or *rebalance()* (random redistribution). In a >>>>>>> *redistributing* exchange, order among elements is only >>>>>>> preserved for each pair of sending- and receiving task (for example >>>>>>> subtask[1] of *map()* and subtask[2] of *keyBy/window*). >>>>>>> >>>>>>> This makes it sounds like ordering on the same partition/key is >>>>>>> always maintained. Which is exactly the ordering guarantee that I need. >>>>>>> This seems to slightly contradict the statement "Flink provides no >>>>>>> guarantees about the order of the elements within a window" for keyed >>>>>>> state. So is it true that ordering _is_ guaranteed for identical keys? >>>>>>> >>>>>>> If I'm not mistaken, the state in the TableAPI is always considered >>>>>>> keyed state for a join or aggregate. Or am I missing something? >>>>>>> >>>>>>> Thanks! >>>>>>> >>>>>>> On Tue, Jan 26, 2021 at 8:53 PM Rex Fenley <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> Our data arrives in order from Kafka, so we are hoping to use that >>>>>>>> same order for our processing. >>>>>>>> >>>>>>>> On Tue, Jan 26, 2021 at 8:40 PM Rex Fenley <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Going further, if "Flink provides no guarantees about the order of >>>>>>>>> the elements within a window" then with minibatch, which I assume >>>>>>>>> uses a >>>>>>>>> window under the hood, any aggregates that expect rows to arrive in >>>>>>>>> order >>>>>>>>> will fail to keep their consistency. Is this correct? >>>>>>>>> >>>>>>>>> On Tue, Jan 26, 2021 at 5:36 PM Rex Fenley <[email protected]> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hello, >>>>>>>>>> >>>>>>>>>> We have a job from CDC to a large unbounded Flink plan to >>>>>>>>>> Elasticsearch. >>>>>>>>>> >>>>>>>>>> Currently, we have been relentlessly trying to reduce our record >>>>>>>>>> amplification which, when our Elasticsearch index is near fully >>>>>>>>>> populated, >>>>>>>>>> completely bottlenecks our write performance. We decided recently to >>>>>>>>>> try a >>>>>>>>>> new job using mini-batch. At first this seemed promising but at some >>>>>>>>>> point >>>>>>>>>> we began getting huge record amplification in a join operator. It >>>>>>>>>> appears >>>>>>>>>> that minibatch may only batch on aggregate operators? >>>>>>>>>> >>>>>>>>>> So we're now thinking that we should have a window before our ES >>>>>>>>>> sink which only takes the last record for any unique document id in >>>>>>>>>> the >>>>>>>>>> window, since that's all we really want to send anyway. However, when >>>>>>>>>> investigating turning a table, to a keyed window stream for >>>>>>>>>> deduping, and >>>>>>>>>> then back into a table I read the following: >>>>>>>>>> >>>>>>>>>> >Attention Flink provides no guarantees about the order of the >>>>>>>>>> elements within a window. This implies that although an evictor may >>>>>>>>>> remove >>>>>>>>>> elements from the beginning of the window, these are not necessarily >>>>>>>>>> the >>>>>>>>>> ones that arrive first or last. [1] >>>>>>>>>> >>>>>>>>>> which has put a damper on our investigation. >>>>>>>>>> >>>>>>>>>> I then found the deduplication SQL doc [2], but I have a hard >>>>>>>>>> time parsing what the SQL does and we've never used TemporaryViews or >>>>>>>>>> proctime before. >>>>>>>>>> Is this essentially what we want? >>>>>>>>>> Will just using this SQL be safe for a job that is unbounded and >>>>>>>>>> just wants to deduplicate a document write to whatever the most >>>>>>>>>> current one >>>>>>>>>> is (i.e. will restoring from a checkpoint maintain our unbounded >>>>>>>>>> consistency and will deletes work)? >>>>>>>>>> >>>>>>>>>> [1] >>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html >>>>>>>>>> [2] >>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#deduplication >>>>>>>>>> >>>>>>>>>> Thanks! >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> >>>>>>>>>> Rex Fenley | Software Engineer - Mobile and Backend >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Remind.com <https://www.remind.com/> | BLOG >>>>>>>>>> <http://blog.remind.com/> | FOLLOW US >>>>>>>>>> <https://twitter.com/remindhq> | LIKE US >>>>>>>>>> <https://www.facebook.com/remindhq> >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> >>>>>>>>> Rex Fenley | Software Engineer - Mobile and Backend >>>>>>>>> >>>>>>>>> >>>>>>>>> Remind.com <https://www.remind.com/> | BLOG >>>>>>>>> <http://blog.remind.com/> | FOLLOW US >>>>>>>>> <https://twitter.com/remindhq> | LIKE US >>>>>>>>> <https://www.facebook.com/remindhq> >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> >>>>>>>> Rex Fenley | Software Engineer - Mobile and Backend >>>>>>>> >>>>>>>> >>>>>>>> Remind.com <https://www.remind.com/> | BLOG >>>>>>>> <http://blog.remind.com/> | FOLLOW US >>>>>>>> <https://twitter.com/remindhq> | LIKE US >>>>>>>> <https://www.facebook.com/remindhq> >>>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> >>>>>>> Rex Fenley | Software Engineer - Mobile and Backend >>>>>>> >>>>>>> >>>>>>> Remind.com <https://www.remind.com/> | BLOG >>>>>>> <http://blog.remind.com/> | FOLLOW US >>>>>>> <https://twitter.com/remindhq> | LIKE US >>>>>>> <https://www.facebook.com/remindhq> >>>>>>> >>>>>> >>>>> >>>>> -- >>>>> >>>>> Rex Fenley | Software Engineer - Mobile and Backend >>>>> >>>>> >>>>> Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> >>>>> | FOLLOW US <https://twitter.com/remindhq> | LIKE US >>>>> <https://www.facebook.com/remindhq> >>>>> >>>> >>>> >>>> -- >>>> >>>> Rex Fenley | Software Engineer - Mobile and Backend >>>> >>>> >>>> Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> >>>> | FOLLOW US <https://twitter.com/remindhq> | LIKE US >>>> <https://www.facebook.com/remindhq> >>>> >>> >> >> -- >> >> Rex Fenley | Software Engineer - Mobile and Backend >> >> >> Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> >> | FOLLOW US <https://twitter.com/remindhq> | LIKE US >> <https://www.facebook.com/remindhq> >> > > > -- > > Rex Fenley | Software Engineer - Mobile and Backend > > > Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> | > FOLLOW US <https://twitter.com/remindhq> | LIKE US > <https://www.facebook.com/remindhq> > -- Rex Fenley | Software Engineer - Mobile and Backend Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> | FOLLOW US <https://twitter.com/remindhq> | LIKE US <https://www.facebook.com/remindhq>
