I think I may have been affected by some late night programming.

Slightly revised how I'm using my aggregate
val userDocsStream =
this.tableEnv
.toRetractStream(userDocsTable, classOf[Row])
.keyBy(_.f1.getField(0))
val compactedUserDocsStream = userDocsStream
.window(TumblingEventTimeWindows.of(Time.seconds(1)))
.aggregate(new CompactionAggregate())
but this now gives me the following exception:
java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no
timestamp marker). Is the time characteristic set to 'ProcessingTime', or
did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?
    at org.apache.flink.streaming.api.windowing.assigners.
TumblingEventTimeWindows.assignWindows(TumblingEventTimeWindows.java:69)
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator
.processElement(WindowOperator.java:295)
    at org.apache.flink.streaming.runtime.tasks.
OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask
.java:161)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
.processElement(StreamTaskNetworkInput.java:178)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
.emitNext(StreamTaskNetworkInput.java:153)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
.processInput(StreamOneInputProcessor.java:67)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
StreamTask.java:351)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
.runMailboxStep(MailboxProcessor.java:191)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
.runMailboxLoop(MailboxProcessor.java:181)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(
StreamTask.java:566)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
.java:536)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
    at java.base/java.lang.Thread.run(Thread.java:829)

Which I'm not at all sure how to interpret

On Wed, Jan 27, 2021 at 11:57 PM Rex Fenley <r...@remind101.com> wrote:

> Ok, that sounds like it confirms my expectations.
>
> So I tried running my above code and had to slightly edit to using java
> Tuple2 because our execution environment stuff is all in Java.
>
> class CompactionAggregate
> extends AggregateFunction[
> Tuple2[java.lang.Boolean, Row],
> Tuple2[java.lang.Boolean, Row],
> Tuple2[java.lang.Boolean, Row]
> ] {
>
> override def createAccumulator() = new Tuple2(false, null)
>
> // Just take the lastest value to compact.
> override def add(
> value: Tuple2[java.lang.Boolean, Row],
> accumulator: Tuple2[java.lang.Boolean, Row]
> ) =
> value
>
> override def getResult(accumulator: Tuple2[java.lang.Boolean, Row]) =
> accumulator
>
> // This is a required function that we don't use.
> override def merge(
> a: Tuple2[java.lang.Boolean, Row],
> b: Tuple2[java.lang.Boolean, Row]
> ) =
> throw new NotImplementedException()
> }
>
> But when running I get the following error:
> >Caused by: java.lang.RuntimeException: Could not extract key from
> [redacted row]
> >...
> > Caused by: org.apache.flink.table.api.ValidationException: Unsupported
> kind 'DELETE' of a row [redacted row]. Only rows with 'INSERT' kind are
> supported when converting to an expression.
>
> I'm googling around and haven't found anything informative about what
> might be causing this issue. Any ideas?
>
> I'll also take a look at the SQL functions you suggested and see if I can
> use those.
>
> Thanks!
>
>
>
> On Wed, Jan 27, 2021 at 11:48 PM Arvid Heise <ar...@apache.org> wrote:
>
>> Hi Rex,
>>
>> if your keyby (and with join/grouping/windowing) is random or not depends
>> on the relationship of the join/grouping key with your Kafka partitioning
>> key.
>>
>> Say your partitioning key is document_id. Then, any join/grouping key
>> that is composed of (or is exactly) document_id, will retain the order. You
>> should always ask yourself the question: can two records coming from the
>> ordered Kafka partition X be processed by two different operator instances.
>> For a join/grouping operator, there is only the strict guarantee that all
>> records with the same key will be shuffled into the same operator instance.
>>
>> Your compaction in general looks good but I'm not deep into Table API.
>> I'm quite sure that *FIRST_VALUE* and *LAST_VALUE* functions in Table
>> API should already do what you want. [1]
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/functions/systemFunctions.html#aggregate-functions
>>
>> On Thu, Jan 28, 2021 at 6:45 AM Rex Fenley <r...@remind101.com> wrote:
>>
>>> In addition to those questions, assuming that keyed streams are in
>>> order, I've come up with the following solution to compact our records and
>>> only pick the most recent one per id before sending to the ES sink.
>>>
>>> The first item in the Row is the document ID / primary key which we want
>>> to compact records on.
>>>
>>> val userDocsStream = userDocsTable.toRetractStream[Row].keyBy(_._2.get(0))
>>>     userDocsStream
>>>       .window(TumblingEventTimeWindows.of(Time.seconds(1)))
>>>       .aggregate(new CompactionAggregate())class CompactionAggregate
>>>     extends AggregateFunction[
>>>       (Boolean, Row),
>>>       (Boolean, Row),
>>>       (Boolean, Row)
>>>     ] {  override def createAccumulator() = (false, null)  // Just take the 
>>> latest value to compact.
>>>   override def add(value: (Boolean, Row), accumulator: (Boolean, Row)) =
>>>     value  override def getResult(accumulator: (Boolean, Row)) = 
>>> accumulator  // This is a required function that we don't use.
>>>   override def merge(a: (Boolean, Row), b: (Boolean, Row)) =
>>>     throw new NotImplementedException()
>>> }
>>>
>>> I'm hoping that if the last record in the window is an insert it picks
>>> that if it's a retract then it picks that and then when we send this to the
>>> ES sink we will simply check true or false in the first element of the
>>> tuple for an insert or delete request to ES. Does this seem like it will
>>> work?
>>>
>>> Thanks!
>>>
>>>
>>> On Wed, Jan 27, 2021 at 12:45 PM Rex Fenley <r...@remind101.com> wrote:
>>>
>>>> This is great info, thanks!
>>>>
>>>> My question then becomes, what constitutes a random shuffle? Currently
>>>> we're using the Table API with minibatch on flink v1.11.3. Do our joins
>>>> output a keyed stream of records by join key or is this random? I imagine
>>>> that they'd have to have a key for retracts and accumulates to arrive in
>>>> order on the next downstream operator. Same with aggs but on the groupBy
>>>> key.
>>>>
>>>> Does this sound correct to you?
>>>>
>>>> Thanks!
>>>>
>>>> On Wed, Jan 27, 2021 at 11:39 AM Arvid Heise <ar...@apache.org> wrote:
>>>>
>>>>> Hi Rex,
>>>>>
>>>>> indeed these two statements look like they contradict each other, but
>>>>> they are looking at both sides from the same coin.
>>>>> Flink is simply putting records in FIFO in windows. That is, there is
>>>>> no ordering on event time if there are late events. So if your elements
>>>>> arrive ordered, the ordering is retained. If your elements arrive
>>>>> unordered, the same unordered order is retained.
>>>>>
>>>>> However, note that Flink can only guarantee FIFO according to your
>>>>> topology. Consider a source with parallelism 2, each reading data from an
>>>>> ordered kafka partition (k1, k2) respectively. Each partition has records
>>>>> with keys, such that no key appears in both partitions (default behavior 
>>>>> if
>>>>> you set keys but no partition while writing to Kafka).
>>>>> 1) Let's assume you do a simple transformation and write them back
>>>>> into kafka with the same key. Then you can be sure that the order of the
>>>>> records is retained.
>>>>>
>>>>> 2) Now you add a random shuffle and have the transformation. Now two
>>>>> successive records may be processed in parallel and there is a race
>>>>> condition who is written first into Kafka. So order is not retained.
>>>>>
>>>>> 3) You shuffle both partitions by the Kafka key (keyby) and do some
>>>>> aggregation. Two successive records with the same key will always be
>>>>> processed by the same aggregation operator. So the order is retained for
>>>>> each key (note that this is what you usually want and want Kafka gives you
>>>>> if you don't set the partition explicitly and just provide a key)
>>>>>
>>>>> 4) You shuffle both partitions by a different key. Then two successive
>>>>> Kafka records could be again calculated in parallel such that there is a
>>>>> race condition.
>>>>>
>>>>> Note that windows are a kind of aggregation.
>>>>>
>>>>> So Flink is never going to restore an ordering that is not there
>>>>> (because it's too costly and there are too many unknowns). But you can
>>>>> infer the guarantees by analyzing your topology.
>>>>>
>>>>> ---
>>>>>
>>>>> Please note that there is a common pitfall when you work with Kafka:
>>>>> - Ordering of records in Kafka is only guaranteed if you set 
>>>>> *max.in.flight.requests.per.connection
>>>>> *to 1
>>>>> *. [1]*
>>>>> *- *Often you also want to set *enable.idempotence* and *acks=all*
>>>>>
>>>>> That is true for the upstream application and if you plan back to
>>>>> write to Kafka you also need to set that in Flink.
>>>>>
>>>>> [1]
>>>>> https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html
>>>>>
>>>>> On Wed, Jan 27, 2021 at 6:08 AM Rex Fenley <r...@remind101.com> wrote:
>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> I began reading
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/concepts/concepts.html#parallel-dataflows
>>>>>>
>>>>>>    -
>>>>>>
>>>>>>    *> Redistributing* streams (between *map()* and *keyBy/window*,
>>>>>>    as well as between *keyBy/window* and *sink*) change the
>>>>>>    partitioning of streams. Each *operator subtask* sends data to
>>>>>>    different target subtasks, depending on the selected transformation.
>>>>>>    Examples are *keyBy()* (re-partitions by hash code), *broadcast()*,
>>>>>>    or *rebalance()* (random redistribution). In a *redistributing*
>>>>>>    exchange, order among elements is only preserved for each pair of 
>>>>>> sending-
>>>>>>    and receiving task (for example subtask[1] of *map()* and
>>>>>>    subtask[2] of *keyBy/window*).
>>>>>>
>>>>>> This makes it sounds like ordering on the same partition/key is
>>>>>> always maintained. Which is exactly the ordering guarantee that I need.
>>>>>> This seems to slightly contradict the statement "Flink provides no
>>>>>> guarantees about the order of the elements within a window" for keyed
>>>>>> state. So is it true that ordering _is_ guaranteed for identical keys?
>>>>>>
>>>>>> If I'm not mistaken, the state in the TableAPI is always considered
>>>>>> keyed state for a join or aggregate. Or am I missing something?
>>>>>>
>>>>>> Thanks!
>>>>>>
>>>>>> On Tue, Jan 26, 2021 at 8:53 PM Rex Fenley <r...@remind101.com> wrote:
>>>>>>
>>>>>>> Our data arrives in order from Kafka, so we are hoping to use that
>>>>>>> same order for our processing.
>>>>>>>
>>>>>>> On Tue, Jan 26, 2021 at 8:40 PM Rex Fenley <r...@remind101.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Going further, if "Flink provides no guarantees about the order of
>>>>>>>> the elements within a window" then with minibatch, which I assume uses 
>>>>>>>> a
>>>>>>>> window under the hood, any aggregates that expect rows to arrive in 
>>>>>>>> order
>>>>>>>> will fail to keep their consistency. Is this correct?
>>>>>>>>
>>>>>>>> On Tue, Jan 26, 2021 at 5:36 PM Rex Fenley <r...@remind101.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hello,
>>>>>>>>>
>>>>>>>>> We have a job from CDC to a large unbounded Flink plan to
>>>>>>>>> Elasticsearch.
>>>>>>>>>
>>>>>>>>> Currently, we have been relentlessly trying to reduce our record
>>>>>>>>> amplification which, when our Elasticsearch index is near fully 
>>>>>>>>> populated,
>>>>>>>>> completely bottlenecks our write performance. We decided recently to 
>>>>>>>>> try a
>>>>>>>>> new job using mini-batch. At first this seemed promising but at some 
>>>>>>>>> point
>>>>>>>>> we began getting huge record amplification in a join operator. It 
>>>>>>>>> appears
>>>>>>>>> that minibatch may only batch on aggregate operators?
>>>>>>>>>
>>>>>>>>> So we're now thinking that we should have a window before our ES
>>>>>>>>> sink which only takes the last record for any unique document id in 
>>>>>>>>> the
>>>>>>>>> window, since that's all we really want to send anyway. However, when
>>>>>>>>> investigating turning a table, to a keyed window stream for deduping, 
>>>>>>>>> and
>>>>>>>>> then back into a table I read the following:
>>>>>>>>>
>>>>>>>>> >Attention Flink provides no guarantees about the order of the
>>>>>>>>> elements within a window. This implies that although an evictor may 
>>>>>>>>> remove
>>>>>>>>> elements from the beginning of the window, these are not necessarily 
>>>>>>>>> the
>>>>>>>>> ones that arrive first or last. [1]
>>>>>>>>>
>>>>>>>>> which has put a damper on our investigation.
>>>>>>>>>
>>>>>>>>> I then found the deduplication SQL doc [2], but I have a hard time
>>>>>>>>> parsing what the SQL does and we've never used TemporaryViews or 
>>>>>>>>> proctime
>>>>>>>>> before.
>>>>>>>>> Is this essentially what we want?
>>>>>>>>> Will just using this SQL be safe for a job that is unbounded and
>>>>>>>>> just wants to deduplicate a document write to whatever the most 
>>>>>>>>> current one
>>>>>>>>> is (i.e. will restoring from a checkpoint maintain our unbounded
>>>>>>>>> consistency and will deletes work)?
>>>>>>>>>
>>>>>>>>> [1]
>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html
>>>>>>>>> [2]
>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#deduplication
>>>>>>>>>
>>>>>>>>> Thanks!
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>>
>>>>>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Remind.com <https://www.remind.com/> |  BLOG
>>>>>>>>> <http://blog.remind.com/>  |  FOLLOW US
>>>>>>>>> <https://twitter.com/remindhq>  |  LIKE US
>>>>>>>>> <https://www.facebook.com/remindhq>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>>
>>>>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>>>>
>>>>>>>>
>>>>>>>> Remind.com <https://www.remind.com/> |  BLOG
>>>>>>>> <http://blog.remind.com/>  |  FOLLOW US
>>>>>>>> <https://twitter.com/remindhq>  |  LIKE US
>>>>>>>> <https://www.facebook.com/remindhq>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>>
>>>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>>>
>>>>>>>
>>>>>>> Remind.com <https://www.remind.com/> |  BLOG
>>>>>>> <http://blog.remind.com/>  |  FOLLOW US
>>>>>>> <https://twitter.com/remindhq>  |  LIKE US
>>>>>>> <https://www.facebook.com/remindhq>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>>
>>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>>
>>>>>>
>>>>>> Remind.com <https://www.remind.com/> |  BLOG
>>>>>> <http://blog.remind.com/>  |  FOLLOW US
>>>>>> <https://twitter.com/remindhq>  |  LIKE US
>>>>>> <https://www.facebook.com/remindhq>
>>>>>>
>>>>>
>>>>
>>>> --
>>>>
>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>
>>>>
>>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>>> <https://www.facebook.com/remindhq>
>>>>
>>>
>>>
>>> --
>>>
>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>
>>>
>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>> <https://www.facebook.com/remindhq>
>>>
>>
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>  |
>  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> <https://www.facebook.com/remindhq>
>


-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>

Reply via email to