I think I may have been affected by some late night programming. Slightly revised how I'm using my aggregate val userDocsStream = this.tableEnv .toRetractStream(userDocsTable, classOf[Row]) .keyBy(_.f1.getField(0)) val compactedUserDocsStream = userDocsStream .window(TumblingEventTimeWindows.of(Time.seconds(1))) .aggregate(new CompactionAggregate()) but this now gives me the following exception: java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'? at org.apache.flink.streaming.api.windowing.assigners. TumblingEventTimeWindows.assignWindows(TumblingEventTimeWindows.java:69) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator .processElement(WindowOperator.java:295) at org.apache.flink.streaming.runtime.tasks. OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask .java:161) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput .processElement(StreamTaskNetworkInput.java:178) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput .emitNext(StreamTaskNetworkInput.java:153) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor .processInput(StreamOneInputProcessor.java:67) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput( StreamTask.java:351) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor .runMailboxStep(MailboxProcessor.java:191) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor .runMailboxLoop(MailboxProcessor.java:181) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop( StreamTask.java:566) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask .java:536) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at java.base/java.lang.Thread.run(Thread.java:829)
Which I'm not at all sure how to interpret On Wed, Jan 27, 2021 at 11:57 PM Rex Fenley <r...@remind101.com> wrote: > Ok, that sounds like it confirms my expectations. > > So I tried running my above code and had to slightly edit to using java > Tuple2 because our execution environment stuff is all in Java. > > class CompactionAggregate > extends AggregateFunction[ > Tuple2[java.lang.Boolean, Row], > Tuple2[java.lang.Boolean, Row], > Tuple2[java.lang.Boolean, Row] > ] { > > override def createAccumulator() = new Tuple2(false, null) > > // Just take the lastest value to compact. > override def add( > value: Tuple2[java.lang.Boolean, Row], > accumulator: Tuple2[java.lang.Boolean, Row] > ) = > value > > override def getResult(accumulator: Tuple2[java.lang.Boolean, Row]) = > accumulator > > // This is a required function that we don't use. > override def merge( > a: Tuple2[java.lang.Boolean, Row], > b: Tuple2[java.lang.Boolean, Row] > ) = > throw new NotImplementedException() > } > > But when running I get the following error: > >Caused by: java.lang.RuntimeException: Could not extract key from > [redacted row] > >... > > Caused by: org.apache.flink.table.api.ValidationException: Unsupported > kind 'DELETE' of a row [redacted row]. Only rows with 'INSERT' kind are > supported when converting to an expression. > > I'm googling around and haven't found anything informative about what > might be causing this issue. Any ideas? > > I'll also take a look at the SQL functions you suggested and see if I can > use those. > > Thanks! > > > > On Wed, Jan 27, 2021 at 11:48 PM Arvid Heise <ar...@apache.org> wrote: > >> Hi Rex, >> >> if your keyby (and with join/grouping/windowing) is random or not depends >> on the relationship of the join/grouping key with your Kafka partitioning >> key. >> >> Say your partitioning key is document_id. Then, any join/grouping key >> that is composed of (or is exactly) document_id, will retain the order. You >> should always ask yourself the question: can two records coming from the >> ordered Kafka partition X be processed by two different operator instances. >> For a join/grouping operator, there is only the strict guarantee that all >> records with the same key will be shuffled into the same operator instance. >> >> Your compaction in general looks good but I'm not deep into Table API. >> I'm quite sure that *FIRST_VALUE* and *LAST_VALUE* functions in Table >> API should already do what you want. [1] >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/functions/systemFunctions.html#aggregate-functions >> >> On Thu, Jan 28, 2021 at 6:45 AM Rex Fenley <r...@remind101.com> wrote: >> >>> In addition to those questions, assuming that keyed streams are in >>> order, I've come up with the following solution to compact our records and >>> only pick the most recent one per id before sending to the ES sink. >>> >>> The first item in the Row is the document ID / primary key which we want >>> to compact records on. >>> >>> val userDocsStream = userDocsTable.toRetractStream[Row].keyBy(_._2.get(0)) >>> userDocsStream >>> .window(TumblingEventTimeWindows.of(Time.seconds(1))) >>> .aggregate(new CompactionAggregate())class CompactionAggregate >>> extends AggregateFunction[ >>> (Boolean, Row), >>> (Boolean, Row), >>> (Boolean, Row) >>> ] { override def createAccumulator() = (false, null) // Just take the >>> latest value to compact. >>> override def add(value: (Boolean, Row), accumulator: (Boolean, Row)) = >>> value override def getResult(accumulator: (Boolean, Row)) = >>> accumulator // This is a required function that we don't use. >>> override def merge(a: (Boolean, Row), b: (Boolean, Row)) = >>> throw new NotImplementedException() >>> } >>> >>> I'm hoping that if the last record in the window is an insert it picks >>> that if it's a retract then it picks that and then when we send this to the >>> ES sink we will simply check true or false in the first element of the >>> tuple for an insert or delete request to ES. Does this seem like it will >>> work? >>> >>> Thanks! >>> >>> >>> On Wed, Jan 27, 2021 at 12:45 PM Rex Fenley <r...@remind101.com> wrote: >>> >>>> This is great info, thanks! >>>> >>>> My question then becomes, what constitutes a random shuffle? Currently >>>> we're using the Table API with minibatch on flink v1.11.3. Do our joins >>>> output a keyed stream of records by join key or is this random? I imagine >>>> that they'd have to have a key for retracts and accumulates to arrive in >>>> order on the next downstream operator. Same with aggs but on the groupBy >>>> key. >>>> >>>> Does this sound correct to you? >>>> >>>> Thanks! >>>> >>>> On Wed, Jan 27, 2021 at 11:39 AM Arvid Heise <ar...@apache.org> wrote: >>>> >>>>> Hi Rex, >>>>> >>>>> indeed these two statements look like they contradict each other, but >>>>> they are looking at both sides from the same coin. >>>>> Flink is simply putting records in FIFO in windows. That is, there is >>>>> no ordering on event time if there are late events. So if your elements >>>>> arrive ordered, the ordering is retained. If your elements arrive >>>>> unordered, the same unordered order is retained. >>>>> >>>>> However, note that Flink can only guarantee FIFO according to your >>>>> topology. Consider a source with parallelism 2, each reading data from an >>>>> ordered kafka partition (k1, k2) respectively. Each partition has records >>>>> with keys, such that no key appears in both partitions (default behavior >>>>> if >>>>> you set keys but no partition while writing to Kafka). >>>>> 1) Let's assume you do a simple transformation and write them back >>>>> into kafka with the same key. Then you can be sure that the order of the >>>>> records is retained. >>>>> >>>>> 2) Now you add a random shuffle and have the transformation. Now two >>>>> successive records may be processed in parallel and there is a race >>>>> condition who is written first into Kafka. So order is not retained. >>>>> >>>>> 3) You shuffle both partitions by the Kafka key (keyby) and do some >>>>> aggregation. Two successive records with the same key will always be >>>>> processed by the same aggregation operator. So the order is retained for >>>>> each key (note that this is what you usually want and want Kafka gives you >>>>> if you don't set the partition explicitly and just provide a key) >>>>> >>>>> 4) You shuffle both partitions by a different key. Then two successive >>>>> Kafka records could be again calculated in parallel such that there is a >>>>> race condition. >>>>> >>>>> Note that windows are a kind of aggregation. >>>>> >>>>> So Flink is never going to restore an ordering that is not there >>>>> (because it's too costly and there are too many unknowns). But you can >>>>> infer the guarantees by analyzing your topology. >>>>> >>>>> --- >>>>> >>>>> Please note that there is a common pitfall when you work with Kafka: >>>>> - Ordering of records in Kafka is only guaranteed if you set >>>>> *max.in.flight.requests.per.connection >>>>> *to 1 >>>>> *. [1]* >>>>> *- *Often you also want to set *enable.idempotence* and *acks=all* >>>>> >>>>> That is true for the upstream application and if you plan back to >>>>> write to Kafka you also need to set that in Flink. >>>>> >>>>> [1] >>>>> https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html >>>>> >>>>> On Wed, Jan 27, 2021 at 6:08 AM Rex Fenley <r...@remind101.com> wrote: >>>>> >>>>>> Hello, >>>>>> >>>>>> I began reading >>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/concepts/concepts.html#parallel-dataflows >>>>>> >>>>>> - >>>>>> >>>>>> *> Redistributing* streams (between *map()* and *keyBy/window*, >>>>>> as well as between *keyBy/window* and *sink*) change the >>>>>> partitioning of streams. Each *operator subtask* sends data to >>>>>> different target subtasks, depending on the selected transformation. >>>>>> Examples are *keyBy()* (re-partitions by hash code), *broadcast()*, >>>>>> or *rebalance()* (random redistribution). In a *redistributing* >>>>>> exchange, order among elements is only preserved for each pair of >>>>>> sending- >>>>>> and receiving task (for example subtask[1] of *map()* and >>>>>> subtask[2] of *keyBy/window*). >>>>>> >>>>>> This makes it sounds like ordering on the same partition/key is >>>>>> always maintained. Which is exactly the ordering guarantee that I need. >>>>>> This seems to slightly contradict the statement "Flink provides no >>>>>> guarantees about the order of the elements within a window" for keyed >>>>>> state. So is it true that ordering _is_ guaranteed for identical keys? >>>>>> >>>>>> If I'm not mistaken, the state in the TableAPI is always considered >>>>>> keyed state for a join or aggregate. Or am I missing something? >>>>>> >>>>>> Thanks! >>>>>> >>>>>> On Tue, Jan 26, 2021 at 8:53 PM Rex Fenley <r...@remind101.com> wrote: >>>>>> >>>>>>> Our data arrives in order from Kafka, so we are hoping to use that >>>>>>> same order for our processing. >>>>>>> >>>>>>> On Tue, Jan 26, 2021 at 8:40 PM Rex Fenley <r...@remind101.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Going further, if "Flink provides no guarantees about the order of >>>>>>>> the elements within a window" then with minibatch, which I assume uses >>>>>>>> a >>>>>>>> window under the hood, any aggregates that expect rows to arrive in >>>>>>>> order >>>>>>>> will fail to keep their consistency. Is this correct? >>>>>>>> >>>>>>>> On Tue, Jan 26, 2021 at 5:36 PM Rex Fenley <r...@remind101.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hello, >>>>>>>>> >>>>>>>>> We have a job from CDC to a large unbounded Flink plan to >>>>>>>>> Elasticsearch. >>>>>>>>> >>>>>>>>> Currently, we have been relentlessly trying to reduce our record >>>>>>>>> amplification which, when our Elasticsearch index is near fully >>>>>>>>> populated, >>>>>>>>> completely bottlenecks our write performance. We decided recently to >>>>>>>>> try a >>>>>>>>> new job using mini-batch. At first this seemed promising but at some >>>>>>>>> point >>>>>>>>> we began getting huge record amplification in a join operator. It >>>>>>>>> appears >>>>>>>>> that minibatch may only batch on aggregate operators? >>>>>>>>> >>>>>>>>> So we're now thinking that we should have a window before our ES >>>>>>>>> sink which only takes the last record for any unique document id in >>>>>>>>> the >>>>>>>>> window, since that's all we really want to send anyway. However, when >>>>>>>>> investigating turning a table, to a keyed window stream for deduping, >>>>>>>>> and >>>>>>>>> then back into a table I read the following: >>>>>>>>> >>>>>>>>> >Attention Flink provides no guarantees about the order of the >>>>>>>>> elements within a window. This implies that although an evictor may >>>>>>>>> remove >>>>>>>>> elements from the beginning of the window, these are not necessarily >>>>>>>>> the >>>>>>>>> ones that arrive first or last. [1] >>>>>>>>> >>>>>>>>> which has put a damper on our investigation. >>>>>>>>> >>>>>>>>> I then found the deduplication SQL doc [2], but I have a hard time >>>>>>>>> parsing what the SQL does and we've never used TemporaryViews or >>>>>>>>> proctime >>>>>>>>> before. >>>>>>>>> Is this essentially what we want? >>>>>>>>> Will just using this SQL be safe for a job that is unbounded and >>>>>>>>> just wants to deduplicate a document write to whatever the most >>>>>>>>> current one >>>>>>>>> is (i.e. will restoring from a checkpoint maintain our unbounded >>>>>>>>> consistency and will deletes work)? >>>>>>>>> >>>>>>>>> [1] >>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html >>>>>>>>> [2] >>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#deduplication >>>>>>>>> >>>>>>>>> Thanks! >>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> >>>>>>>>> Rex Fenley | Software Engineer - Mobile and Backend >>>>>>>>> >>>>>>>>> >>>>>>>>> Remind.com <https://www.remind.com/> | BLOG >>>>>>>>> <http://blog.remind.com/> | FOLLOW US >>>>>>>>> <https://twitter.com/remindhq> | LIKE US >>>>>>>>> <https://www.facebook.com/remindhq> >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> >>>>>>>> Rex Fenley | Software Engineer - Mobile and Backend >>>>>>>> >>>>>>>> >>>>>>>> Remind.com <https://www.remind.com/> | BLOG >>>>>>>> <http://blog.remind.com/> | FOLLOW US >>>>>>>> <https://twitter.com/remindhq> | LIKE US >>>>>>>> <https://www.facebook.com/remindhq> >>>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> >>>>>>> Rex Fenley | Software Engineer - Mobile and Backend >>>>>>> >>>>>>> >>>>>>> Remind.com <https://www.remind.com/> | BLOG >>>>>>> <http://blog.remind.com/> | FOLLOW US >>>>>>> <https://twitter.com/remindhq> | LIKE US >>>>>>> <https://www.facebook.com/remindhq> >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> >>>>>> Rex Fenley | Software Engineer - Mobile and Backend >>>>>> >>>>>> >>>>>> Remind.com <https://www.remind.com/> | BLOG >>>>>> <http://blog.remind.com/> | FOLLOW US >>>>>> <https://twitter.com/remindhq> | LIKE US >>>>>> <https://www.facebook.com/remindhq> >>>>>> >>>>> >>>> >>>> -- >>>> >>>> Rex Fenley | Software Engineer - Mobile and Backend >>>> >>>> >>>> Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> >>>> | FOLLOW US <https://twitter.com/remindhq> | LIKE US >>>> <https://www.facebook.com/remindhq> >>>> >>> >>> >>> -- >>> >>> Rex Fenley | Software Engineer - Mobile and Backend >>> >>> >>> Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> >>> | FOLLOW US <https://twitter.com/remindhq> | LIKE US >>> <https://www.facebook.com/remindhq> >>> >> > > -- > > Rex Fenley | Software Engineer - Mobile and Backend > > > Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> | > FOLLOW US <https://twitter.com/remindhq> | LIKE US > <https://www.facebook.com/remindhq> > -- Rex Fenley | Software Engineer - Mobile and Backend Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> | FOLLOW US <https://twitter.com/remindhq> | LIKE US <https://www.facebook.com/remindhq>