It looks like it wants me to call assignTimestampsAndWatermarks but I
already have a timer on my window which I'd expect everything entering this
stream would simply be aggregated during that window
.window(TumblingEventTimeWindows.of(Time.seconds(1)))

On Thu, Jan 28, 2021 at 12:59 PM Rex Fenley <r...@remind101.com> wrote:

> I think I may have been affected by some late night programming.
>
> Slightly revised how I'm using my aggregate
> val userDocsStream =
> this.tableEnv
> .toRetractStream(userDocsTable, classOf[Row])
> .keyBy(_.f1.getField(0))
> val compactedUserDocsStream = userDocsStream
> .window(TumblingEventTimeWindows.of(Time.seconds(1)))
> .aggregate(new CompactionAggregate())
> but this now gives me the following exception:
> java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no
> timestamp marker). Is the time characteristic set to 'ProcessingTime', or
> did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?
>     at org.apache.flink.streaming.api.windowing.assigners.
> TumblingEventTimeWindows.assignWindows(TumblingEventTimeWindows.java:69)
>     at org.apache.flink.streaming.runtime.operators.windowing.
> WindowOperator.processElement(WindowOperator.java:295)
>     at org.apache.flink.streaming.runtime.tasks.
> OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask
> .java:161)
>     at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
> .processElement(StreamTaskNetworkInput.java:178)
>     at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
> .emitNext(StreamTaskNetworkInput.java:153)
>     at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
> .processInput(StreamOneInputProcessor.java:67)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
> StreamTask.java:351)
>     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
> .runMailboxStep(MailboxProcessor.java:191)
>     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
> .runMailboxLoop(MailboxProcessor.java:181)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(
> StreamTask.java:566)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
> StreamTask.java:536)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>     at java.base/java.lang.Thread.run(Thread.java:829)
>
> Which I'm not at all sure how to interpret
>
> On Wed, Jan 27, 2021 at 11:57 PM Rex Fenley <r...@remind101.com> wrote:
>
>> Ok, that sounds like it confirms my expectations.
>>
>> So I tried running my above code and had to slightly edit to using java
>> Tuple2 because our execution environment stuff is all in Java.
>>
>> class CompactionAggregate
>> extends AggregateFunction[
>> Tuple2[java.lang.Boolean, Row],
>> Tuple2[java.lang.Boolean, Row],
>> Tuple2[java.lang.Boolean, Row]
>> ] {
>>
>> override def createAccumulator() = new Tuple2(false, null)
>>
>> // Just take the lastest value to compact.
>> override def add(
>> value: Tuple2[java.lang.Boolean, Row],
>> accumulator: Tuple2[java.lang.Boolean, Row]
>> ) =
>> value
>>
>> override def getResult(accumulator: Tuple2[java.lang.Boolean, Row]) =
>> accumulator
>>
>> // This is a required function that we don't use.
>> override def merge(
>> a: Tuple2[java.lang.Boolean, Row],
>> b: Tuple2[java.lang.Boolean, Row]
>> ) =
>> throw new NotImplementedException()
>> }
>>
>> But when running I get the following error:
>> >Caused by: java.lang.RuntimeException: Could not extract key from
>> [redacted row]
>> >...
>> > Caused by: org.apache.flink.table.api.ValidationException: Unsupported
>> kind 'DELETE' of a row [redacted row]. Only rows with 'INSERT' kind are
>> supported when converting to an expression.
>>
>> I'm googling around and haven't found anything informative about what
>> might be causing this issue. Any ideas?
>>
>> I'll also take a look at the SQL functions you suggested and see if I can
>> use those.
>>
>> Thanks!
>>
>>
>>
>> On Wed, Jan 27, 2021 at 11:48 PM Arvid Heise <ar...@apache.org> wrote:
>>
>>> Hi Rex,
>>>
>>> if your keyby (and with join/grouping/windowing) is random or not
>>> depends on the relationship of the join/grouping key with your Kafka
>>> partitioning key.
>>>
>>> Say your partitioning key is document_id. Then, any join/grouping key
>>> that is composed of (or is exactly) document_id, will retain the order. You
>>> should always ask yourself the question: can two records coming from the
>>> ordered Kafka partition X be processed by two different operator instances.
>>> For a join/grouping operator, there is only the strict guarantee that all
>>> records with the same key will be shuffled into the same operator instance.
>>>
>>> Your compaction in general looks good but I'm not deep into Table API.
>>> I'm quite sure that *FIRST_VALUE* and *LAST_VALUE* functions in Table
>>> API should already do what you want. [1]
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/functions/systemFunctions.html#aggregate-functions
>>>
>>> On Thu, Jan 28, 2021 at 6:45 AM Rex Fenley <r...@remind101.com> wrote:
>>>
>>>> In addition to those questions, assuming that keyed streams are in
>>>> order, I've come up with the following solution to compact our records and
>>>> only pick the most recent one per id before sending to the ES sink.
>>>>
>>>> The first item in the Row is the document ID / primary key which we
>>>> want to compact records on.
>>>>
>>>> val userDocsStream = userDocsTable.toRetractStream[Row].keyBy(_._2.get(0))
>>>>     userDocsStream
>>>>       .window(TumblingEventTimeWindows.of(Time.seconds(1)))
>>>>       .aggregate(new CompactionAggregate())class CompactionAggregate
>>>>     extends AggregateFunction[
>>>>       (Boolean, Row),
>>>>       (Boolean, Row),
>>>>       (Boolean, Row)
>>>>     ] {  override def createAccumulator() = (false, null)  // Just take 
>>>> the latest value to compact.
>>>>   override def add(value: (Boolean, Row), accumulator: (Boolean, Row)) =
>>>>     value  override def getResult(accumulator: (Boolean, Row)) = 
>>>> accumulator  // This is a required function that we don't use.
>>>>   override def merge(a: (Boolean, Row), b: (Boolean, Row)) =
>>>>     throw new NotImplementedException()
>>>> }
>>>>
>>>> I'm hoping that if the last record in the window is an insert it picks
>>>> that if it's a retract then it picks that and then when we send this to the
>>>> ES sink we will simply check true or false in the first element of the
>>>> tuple for an insert or delete request to ES. Does this seem like it will
>>>> work?
>>>>
>>>> Thanks!
>>>>
>>>>
>>>> On Wed, Jan 27, 2021 at 12:45 PM Rex Fenley <r...@remind101.com> wrote:
>>>>
>>>>> This is great info, thanks!
>>>>>
>>>>> My question then becomes, what constitutes a random shuffle? Currently
>>>>> we're using the Table API with minibatch on flink v1.11.3. Do our joins
>>>>> output a keyed stream of records by join key or is this random? I imagine
>>>>> that they'd have to have a key for retracts and accumulates to arrive in
>>>>> order on the next downstream operator. Same with aggs but on the groupBy
>>>>> key.
>>>>>
>>>>> Does this sound correct to you?
>>>>>
>>>>> Thanks!
>>>>>
>>>>> On Wed, Jan 27, 2021 at 11:39 AM Arvid Heise <ar...@apache.org> wrote:
>>>>>
>>>>>> Hi Rex,
>>>>>>
>>>>>> indeed these two statements look like they contradict each other, but
>>>>>> they are looking at both sides from the same coin.
>>>>>> Flink is simply putting records in FIFO in windows. That is, there is
>>>>>> no ordering on event time if there are late events. So if your elements
>>>>>> arrive ordered, the ordering is retained. If your elements arrive
>>>>>> unordered, the same unordered order is retained.
>>>>>>
>>>>>> However, note that Flink can only guarantee FIFO according to your
>>>>>> topology. Consider a source with parallelism 2, each reading data from an
>>>>>> ordered kafka partition (k1, k2) respectively. Each partition has records
>>>>>> with keys, such that no key appears in both partitions (default behavior 
>>>>>> if
>>>>>> you set keys but no partition while writing to Kafka).
>>>>>> 1) Let's assume you do a simple transformation and write them back
>>>>>> into kafka with the same key. Then you can be sure that the order of the
>>>>>> records is retained.
>>>>>>
>>>>>> 2) Now you add a random shuffle and have the transformation. Now two
>>>>>> successive records may be processed in parallel and there is a race
>>>>>> condition who is written first into Kafka. So order is not retained.
>>>>>>
>>>>>> 3) You shuffle both partitions by the Kafka key (keyby) and do some
>>>>>> aggregation. Two successive records with the same key will always be
>>>>>> processed by the same aggregation operator. So the order is retained for
>>>>>> each key (note that this is what you usually want and want Kafka gives 
>>>>>> you
>>>>>> if you don't set the partition explicitly and just provide a key)
>>>>>>
>>>>>> 4) You shuffle both partitions by a different key. Then two
>>>>>> successive Kafka records could be again calculated in parallel such that
>>>>>> there is a race condition.
>>>>>>
>>>>>> Note that windows are a kind of aggregation.
>>>>>>
>>>>>> So Flink is never going to restore an ordering that is not there
>>>>>> (because it's too costly and there are too many unknowns). But you can
>>>>>> infer the guarantees by analyzing your topology.
>>>>>>
>>>>>> ---
>>>>>>
>>>>>> Please note that there is a common pitfall when you work with Kafka:
>>>>>> - Ordering of records in Kafka is only guaranteed if you set 
>>>>>> *max.in.flight.requests.per.connection
>>>>>> *to 1
>>>>>> *. [1]*
>>>>>> *- *Often you also want to set *enable.idempotence* and *acks=all*
>>>>>>
>>>>>> That is true for the upstream application and if you plan back to
>>>>>> write to Kafka you also need to set that in Flink.
>>>>>>
>>>>>> [1]
>>>>>> https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html
>>>>>>
>>>>>> On Wed, Jan 27, 2021 at 6:08 AM Rex Fenley <r...@remind101.com> wrote:
>>>>>>
>>>>>>> Hello,
>>>>>>>
>>>>>>> I began reading
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/concepts/concepts.html#parallel-dataflows
>>>>>>>
>>>>>>>    -
>>>>>>>
>>>>>>>    *> Redistributing* streams (between *map()* and *keyBy/window*,
>>>>>>>    as well as between *keyBy/window* and *sink*) change the
>>>>>>>    partitioning of streams. Each *operator subtask* sends data to
>>>>>>>    different target subtasks, depending on the selected transformation.
>>>>>>>    Examples are *keyBy()* (re-partitions by hash code),
>>>>>>>    *broadcast()*, or *rebalance()* (random redistribution). In a
>>>>>>>    *redistributing* exchange, order among elements is only
>>>>>>>    preserved for each pair of sending- and receiving task (for example
>>>>>>>    subtask[1] of *map()* and subtask[2] of *keyBy/window*).
>>>>>>>
>>>>>>> This makes it sounds like ordering on the same partition/key is
>>>>>>> always maintained. Which is exactly the ordering guarantee that I need.
>>>>>>> This seems to slightly contradict the statement "Flink provides no
>>>>>>> guarantees about the order of the elements within a window" for keyed
>>>>>>> state. So is it true that ordering _is_ guaranteed for identical keys?
>>>>>>>
>>>>>>> If I'm not mistaken, the state in the TableAPI is always considered
>>>>>>> keyed state for a join or aggregate. Or am I missing something?
>>>>>>>
>>>>>>> Thanks!
>>>>>>>
>>>>>>> On Tue, Jan 26, 2021 at 8:53 PM Rex Fenley <r...@remind101.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Our data arrives in order from Kafka, so we are hoping to use that
>>>>>>>> same order for our processing.
>>>>>>>>
>>>>>>>> On Tue, Jan 26, 2021 at 8:40 PM Rex Fenley <r...@remind101.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Going further, if "Flink provides no guarantees about the order of
>>>>>>>>> the elements within a window" then with minibatch, which I assume 
>>>>>>>>> uses a
>>>>>>>>> window under the hood, any aggregates that expect rows to arrive in 
>>>>>>>>> order
>>>>>>>>> will fail to keep their consistency. Is this correct?
>>>>>>>>>
>>>>>>>>> On Tue, Jan 26, 2021 at 5:36 PM Rex Fenley <r...@remind101.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hello,
>>>>>>>>>>
>>>>>>>>>> We have a job from CDC to a large unbounded Flink plan to
>>>>>>>>>> Elasticsearch.
>>>>>>>>>>
>>>>>>>>>> Currently, we have been relentlessly trying to reduce our record
>>>>>>>>>> amplification which, when our Elasticsearch index is near fully 
>>>>>>>>>> populated,
>>>>>>>>>> completely bottlenecks our write performance. We decided recently to 
>>>>>>>>>> try a
>>>>>>>>>> new job using mini-batch. At first this seemed promising but at some 
>>>>>>>>>> point
>>>>>>>>>> we began getting huge record amplification in a join operator. It 
>>>>>>>>>> appears
>>>>>>>>>> that minibatch may only batch on aggregate operators?
>>>>>>>>>>
>>>>>>>>>> So we're now thinking that we should have a window before our ES
>>>>>>>>>> sink which only takes the last record for any unique document id in 
>>>>>>>>>> the
>>>>>>>>>> window, since that's all we really want to send anyway. However, when
>>>>>>>>>> investigating turning a table, to a keyed window stream for 
>>>>>>>>>> deduping, and
>>>>>>>>>> then back into a table I read the following:
>>>>>>>>>>
>>>>>>>>>> >Attention Flink provides no guarantees about the order of the
>>>>>>>>>> elements within a window. This implies that although an evictor may 
>>>>>>>>>> remove
>>>>>>>>>> elements from the beginning of the window, these are not necessarily 
>>>>>>>>>> the
>>>>>>>>>> ones that arrive first or last. [1]
>>>>>>>>>>
>>>>>>>>>> which has put a damper on our investigation.
>>>>>>>>>>
>>>>>>>>>> I then found the deduplication SQL doc [2], but I have a hard
>>>>>>>>>> time parsing what the SQL does and we've never used TemporaryViews or
>>>>>>>>>> proctime before.
>>>>>>>>>> Is this essentially what we want?
>>>>>>>>>> Will just using this SQL be safe for a job that is unbounded and
>>>>>>>>>> just wants to deduplicate a document write to whatever the most 
>>>>>>>>>> current one
>>>>>>>>>> is (i.e. will restoring from a checkpoint maintain our unbounded
>>>>>>>>>> consistency and will deletes work)?
>>>>>>>>>>
>>>>>>>>>> [1]
>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html
>>>>>>>>>> [2]
>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#deduplication
>>>>>>>>>>
>>>>>>>>>> Thanks!
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>>
>>>>>>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Remind.com <https://www.remind.com/> |  BLOG
>>>>>>>>>> <http://blog.remind.com/>  |  FOLLOW US
>>>>>>>>>> <https://twitter.com/remindhq>  |  LIKE US
>>>>>>>>>> <https://www.facebook.com/remindhq>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>>
>>>>>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Remind.com <https://www.remind.com/> |  BLOG
>>>>>>>>> <http://blog.remind.com/>  |  FOLLOW US
>>>>>>>>> <https://twitter.com/remindhq>  |  LIKE US
>>>>>>>>> <https://www.facebook.com/remindhq>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>>
>>>>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>>>>
>>>>>>>>
>>>>>>>> Remind.com <https://www.remind.com/> |  BLOG
>>>>>>>> <http://blog.remind.com/>  |  FOLLOW US
>>>>>>>> <https://twitter.com/remindhq>  |  LIKE US
>>>>>>>> <https://www.facebook.com/remindhq>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>>
>>>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>>>
>>>>>>>
>>>>>>> Remind.com <https://www.remind.com/> |  BLOG
>>>>>>> <http://blog.remind.com/>  |  FOLLOW US
>>>>>>> <https://twitter.com/remindhq>  |  LIKE US
>>>>>>> <https://www.facebook.com/remindhq>
>>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>
>>>>>
>>>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>>>> <https://www.facebook.com/remindhq>
>>>>>
>>>>
>>>>
>>>> --
>>>>
>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>
>>>>
>>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>>> <https://www.facebook.com/remindhq>
>>>>
>>>
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>> <https://www.facebook.com/remindhq>
>>
>
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>  |
>  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> <https://www.facebook.com/remindhq>
>


-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>

Reply via email to