rocksdb block cache usage

2021-01-27 Thread ??????
hi, all
   I've enable state.backend.rocksdb.metrics.block-pinned-usage 
metric , 
 and the 
flink_taskmanager_job_task_operator_window_contents_rocksdb_block_cache_pinned_usage
 metric exposed. 
 I'm confused  that the total memory used for block cache pinned is 
sum of 
flink_taskmanager_job_task_operator_window_contents_rocksdb_block_cache_pinned_usage
 or just
 flink_taskmanager_job_task_operator_window_contents_rocksdb_block_cache_pinned_usage
 (for block cache usage the metric seems per slot)? 







 

Re: Difference between table.exec.source.idle-timeout and setIdleStateRetentionTime ?

2021-01-27 Thread Dawid Wysakowicz
Hey,

As for the MATCH_RECOGNIZE clause, I highly recommend applying a time
constraint[1]. The idle state retention time does not apply to the
MATCH_RECOGNIZE, but you can think of the time constraint as something
similar, but it is closer to the actual query logic.

If you are hitting FLINK-15160 unfortunately I don't have a good
solution for it. The only thing that comes to my mind is adding a
heartbeat event to the event stream to prune the partial matches, but I
understand it is quite invasive.

If you would be willing to help fixing the problem in FLINK, I could
also help review it and give pointers how it could be done.

Best,

Dawid

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/match_recognize.html#time-constraint

On 26/01/2021 17:39, Dcosta, Agnelo (HBO) wrote:
>
> Hi Dawid, thanks for the clarification and it helps a lot.
> Reply to couple of points :
>
> what is causing the state to grow?
> We are using flink SQL and have 5 pattern match queries , 3 group by
> tumble windows. State growth over time is primarily coming from
> pattern match queries.
>
> Is it ever growing keyspace?
> Yes. By design our keyspace is ever growing. The expectation is that
> messages for one key will come in for couple of hours, then stop
> coming in. We would never see messages from that key again. New keys
> are constantly coming in.
>
> Is it that a watermark does not progress?
> Watermark on the subtask level is constantly updating and is in sync
> with other subtasks. We have not seen any issues with watermark
> updating as such.
>
> Looking through mailing list archive, our problem seems similar to
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Memory-in-CEP-queries-keep-increasing-td31045.html
> https://issues.apache.org/jira/browse/FLINK-15160 : Clean up is not
> applied if there are no incoming events for a key.
>
> By design we can have partial matched states/matches in pattern match
> queries. And key space is such that no new event comes in for those
> partial matches.
>
> thanks.
>
>  
>
> *From: *Dawid Wysakowicz 
> *Date: *Tuesday, January 26, 2021 at 3:14 AM
> *To: *Dcosta, Agnelo (HBO) ,
> user@flink.apache.org 
> *Subject: *Re: Difference between table.exec.source.idle-timeout and
> setIdleStateRetentionTime ?
>
> **External Email received from: dwysakow...@apache.org **
>
>  
>
> Hi,
>
> The difference is that the *table.exec.source.idle-timeout *is used
> for dealing with source idleness[1]. It is a problem that a watermark
> cannot advance if some of the partition become idle (do not produce
> any records). Watermark is always the minimum of watermarks of all
> input partitions. The setting makes flink ignore certain partitions in
> the calculation after the time threshold is reached.
>
> The IdleStateRetention is Table API specific. As described in the link
> you provided it removes entries from a state for keys that were not
> seen for a given time threshold.
>
> As for your issue, I'd recommend first investigating what is causing
> the state to grow. Is it ever growing keyspace? Is it that a watermark
> does not progress (this should manifest in results as well). Or is it
> something else.
>
> Best,
>
> Dawid
>
>  
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/event_timestamps_watermarks.html#dealing-with-idle-sources
>
> On 25/01/2021 20:12, Dcosta, Agnelo (HBO) wrote:
>
> Hi,
>
> What is the difference between *table.exec.source.idle-timeout* and
> *setIdleStateRetentionTime* ?
>
> table.exec.source.idle-timeout:
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/config.html#table-exec-source-idle-timeout
>
>  
>
> setIdleStateRetentionTime:
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/query_configuration.html#idle-state-retention-time
>
>  
>
> Some context:
> Hi we are using flink 1.12.
> Our checkpoint size is constantly increasing once app is deployed.
> After performing a restart, checkpoint size goes back to expected size.
> Looking at actual checkpoint files generated, it seems our app is
> holding on to state/events since the time app started up.
> Based on our sql, the maximum time we would need to hold state is 10
> minutes.
>
>  
>
> This e-mail is intended only for the use of the addressees. Any
> copying, forwarding, printing or other use of this e-mail by persons
> other than the addressees is not authorized. This e-mail may contain
> information that is privileged, confidential and exempt from
> disclosure. If you are not the intended recipient, please notify us
> immediately by return e-mail (including the original message in your
> reply) and then delete and discard all copies of the e-mail. Thank you.
> HB75
>


signature.asc
Description: OpenPGP digital signature


Re: Initializing broadcast state

2021-01-27 Thread Wei Jiang
Hi guys,
i meet the same question, but i use a different way to init: 
```
val list = ...   //i use jdbc to get the init data
val dimensionInitStream = env.fromCollection(list)
//the main stream and the `dimensionStream` is a stream from flink cdc
val dimension =
dimensionStream.union(dimensionInitStream).broadcast(descriptor)
...
```
then the main stream can connect the broadcast state...
e... i dont know why it works, how do you think about that?


Guowei Ma wrote
> Hi, Nick
>   You might need to handle it yourself If you have to process an element
> only after you get the broadcast state.
>   For example, you could “cache” the element to the state and handle it
> when the element from the broadcast side elements are arrived. Specially
> if
> you are using the `KeyedBroadcastProcessFunction` you could use the
> `applyToKeyedState` to access the element you cache before.
> 
> Best,
> Guowei
> 
> 
> On Mon, Jan 25, 2021 at 10:24 AM Nick Bendtner <

> buggie89@

> > wrote:
> 
>> Hi guys,
>> What is the way to initialize broadcast state(say with default values)
>> before the first element shows up in the broadcasting stream? I do a
>> lookup
>> on the broadcast state to process transactions which come from another
>> stream. The problem is the broadcast state is empty until the first
>> element
>> shows up.
>>
>>
>> Best,
>> Nick.
>>





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Seeing Rocks Native Metrics in Data Dog

2021-01-27 Thread Chesnay Schepler
AFAIK all IDs (and in fact all variables except ) are exposed as 
tags. (the  is transmitted separately and I would've though 
Datadog automatically provides similar functionality for it).


On 1/27/2021 2:11 AM, Rex Fenley wrote:
Oddly, I'm seeing them now. I'm not sure what has changed. Fwiw, we 
have modified the scopes per 
https://docs.datadoghq.com/integrations/flink/#metric-collection 
 but 
their modifications ids as tags. We do need to modify them according 
to that documentation - "*Note*: The system scopes must be remapped 
for your Flink metrics to be supported, otherwise they are submitted 
as custom metrics." Could we instead add host and ids as tags to our 
metrics?


Thanks for your help!

On Tue, Jan 26, 2021 at 2:49 PM Chesnay Schepler > wrote:


It is good to know that something from the task executors arrives
at datadog.

Do you see any metrics for a specific job, like the numRestarts
metric of the JobManager?

Are you using the default scope formats

,
or have you modified them?
Could you try these instead and report back? (I replaced all
job/task/operator names with their IDs, in case some special
character is messing with datadog)

metrics.scope.jm : .jobmanager
metrics.scope.jm.job: .jobmanager.
metrics.scope.tm : .taskmanager.
metrics.scope.tm.job: .taskmanager..
metrics.scope.task:
.taskmanager
metrics.scope.operator:
.taskmanager


On 1/26/2021 9:28 PM, Rex Fenley wrote:

All taskmanager and jobmanager logs show up. Anything specific to
an operator does not.
For example, flink.taskmanager.Status.JVM.Memory.Heap.Used shows
up, but I can't see stats on an individual operator.

I mostly followed a combination of
https://docs.datadoghq.com/integrations/flink/#metric-collection

and

https://ci.apache.org/projects/flink/flink-docs-release-1.11/monitoring/metrics.html#datadog-orgapacheflinkmetricsdatadogdatadoghttpreporter


since datadog's documentation was slightly out of date.

Thanks

On Tue, Jan 26, 2021 at 10:28 AM Chesnay Schepler
mailto:ches...@apache.org>> wrote:

Anything metric that is shown in the Flink UI should also
appear in DataDog.
If this is not the case then something goes wrong within the
reporter.

Is there anything suspicious in the Flink logs?

Can you give some example of metrics that /do/ show up in
DataDog?

On 1/26/2021 6:32 PM, Rex Fenley wrote:

Hi,

I need to get a deeper dive into how rocks is performing so
I turned on Rocks Native Metrics. However, I don't see any
of the metrics in DataDog (though I have other Flink metrics
in DataDog). I only see rocks metrics in the operator
metrics in Flink UI, and unfortunately I can't really zoom
in or out of those metrics or compare against multiple
operators at a time which makes it really difficult to get
an overview of how rocks is doing.

Is this there any way to get the Rocks Native Metrics
forwarded over to DataDog?

Thanks!

-- 


Rex Fenley|Software Engineer - Mobile and Backend


Remind.com | BLOG
 | FOLLOW US
 | LIKE US






-- 


Rex Fenley|Software Engineer - Mobile and Backend


Remind.com | BLOG
 | FOLLOW US
 | LIKE US






--

Rex Fenley|Software Engineer - Mobile and Backend


Remind.com | BLOG  | 
FOLLOW US  | LIKE US 







Re: Datadog reporter timeout & OOM issue

2021-01-27 Thread Chesnay Schepler

Yes, I could see how the memory issue can occur.

However, it should be limited to buffering 64 requests; this is the 
default limit that okhttp imposes on concurrent calls.

Maybe lowering this value already does the trick.

On 1/27/2021 5:52 AM, Xingcan Cui wrote:

Hi all,

Recently, I tried to use the Datadog reporter to collect some 
user-defined metrics. Sometimes when reaching traffic peaks (which are 
also peaks for metrics), the HTTP client will throw the following 
exception:


```
[OkHttp https://app.datadoghq.com/.. .] 
WARN  org.apache.flink.metrics.datadog.DatadogHttpClient  - Failed 
sending request to Datadog

java.net.SocketTimeoutException: timeout
at 
okhttp3.internal.http2.Http2Stream$StreamTimeout.newTimeoutException(Http2Stream.java:593)
at 
okhttp3.internal.http2.Http2Stream$StreamTimeout.exitAndThrowIfTimedOut(Http2Stream.java:601)
at 
okhttp3.internal.http2.Http2Stream.takeResponseHeaders(Http2Stream.java:146)
at 
okhttp3.internal.http2.Http2Codec.readResponseHeaders(Http2Codec.java:120)
at 
okhttp3.internal.http.CallServerInterceptor.intercept(CallServerInterceptor.java:75)
at 
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
at 
okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:45)
at 
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
at 
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
at 
okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93)
at 
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
at 
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
at 
okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.java:93)
at 
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
at 
okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.java:120)
at 
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
at 
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)

at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:185)
at okhttp3.RealCall$AsyncCall.execute(RealCall.java:135)
at okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

at java.lang.Thread.run(Thread.java:748)
```

I guess this may be caused by the rate limit of the Datadog server 
since too many HTTP requests look like a kind of "attack". The real 
problem is that after throwing the above exceptions, the JVM heap size 
of the taskmanager starts to increase and finally causes OOM. I'm 
curious if this may be caused by metrics accumulation, i.e., for some 
reason, the client can't reconnect to the Datadog server and send the 
metrics so that the metrics data is buffered in memory and causes OOM.


I'm running Flink 1.11.2 on EMR-6.2.0 with 
flink-metrics-datadog-1.11.2.jar.


Thanks,
Xingcan





Re: Datadog reporter timeout & OOM issue

2021-01-27 Thread Chesnay Schepler
(setting this field is currently not possible from a Flink user 
perspective; it is something I will investigate)



On 1/27/2021 10:30 AM, Chesnay Schepler wrote:

Yes, I could see how the memory issue can occur.

However, it should be limited to buffering 64 requests; this is the 
default limit that okhttp imposes on concurrent calls.

Maybe lowering this value already does the trick.

On 1/27/2021 5:52 AM, Xingcan Cui wrote:

Hi all,

Recently, I tried to use the Datadog reporter to collect some 
user-defined metrics. Sometimes when reaching traffic peaks (which 
are also peaks for metrics), the HTTP client will throw the following 
exception:


```
[OkHttp https://app.datadoghq.com/.. .] 
WARN  org.apache.flink.metrics.datadog.DatadogHttpClient  - Failed 
sending request to Datadog

java.net.SocketTimeoutException: timeout
at 
okhttp3.internal.http2.Http2Stream$StreamTimeout.newTimeoutException(Http2Stream.java:593)
at 
okhttp3.internal.http2.Http2Stream$StreamTimeout.exitAndThrowIfTimedOut(Http2Stream.java:601)
at 
okhttp3.internal.http2.Http2Stream.takeResponseHeaders(Http2Stream.java:146)
at 
okhttp3.internal.http2.Http2Codec.readResponseHeaders(Http2Codec.java:120)
at 
okhttp3.internal.http.CallServerInterceptor.intercept(CallServerInterceptor.java:75)
at 
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
at 
okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:45)
at 
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
at 
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
at 
okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93)
at 
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
at 
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
at 
okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.java:93)
at 
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
at 
okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.java:120)
at 
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
at 
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)

at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:185)
at okhttp3.RealCall$AsyncCall.execute(RealCall.java:135)
at okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

at java.lang.Thread.run(Thread.java:748)
```

I guess this may be caused by the rate limit of the Datadog server 
since too many HTTP requests look like a kind of "attack". The real 
problem is that after throwing the above exceptions, the JVM heap 
size of the taskmanager starts to increase and finally causes OOM. 
I'm curious if this may be caused by metrics accumulation, i.e., for 
some reason, the client can't reconnect to the Datadog server and 
send the metrics so that the metrics data is buffered in memory and 
causes OOM.


I'm running Flink 1.11.2 on EMR-6.2.0 with 
flink-metrics-datadog-1.11.2.jar.


Thanks,
Xingcan







Overhead when using map state

2021-01-27 Thread Lasse Nedergaard
Hi

We use Rocksdb for storing state and run on Flink 1.10.  
We have followed best practices and used map state instead of a map in value 
state. We have seen problems with OOM exceptions and investigated it be 
creating a job with n numbers of key by where each key had a map either stored 
in map state or value state. The job reads and updates random values in the 
maps. 
It turn out that the same map stored in map state consumes 3-4 times the memory 
compared with storing it in value state. 
1. Can anyone explain why the overhead is so big?

At the same time we also see the throughput drops compared with value state. If 
iterate over all key in the state it would make sense but in our test we access 
random individual keys. We had a huge pressure on rocksdb and that could be the 
case. 
2. Can anyone explain why the pressure on rocksdb are higher using map state 
compared to value state with a map?



Med venlig hilsen / Best regards
Lasse Nedergaard



Re: Flink upgrade to Flink-1.12

2021-01-27 Thread Aljoscha Krettek
I'm afraid I also don't know more than that. But I agree with Ufuk that 
it should just work.


I think the best way would be to try it in a test environment and then 
go forward with upgrading the production jobs/cluster.


Best,
Aljoscha

On 2021/01/25 18:59, Ufuk Celebi wrote:

Thanks for reaching out. Semi-asynchronous does *not* refer to incremental 
checkpoints and Savepoints are always triggered as full snapshots (not 
incremental).

Earlier versions of the RocksDb state backend supported two snapshotting modes, 
fully and semi-asynchronous snapshots. Semi-asynchronous state snapshots for 
RocksDb have been removed a long time ago by Aljoscha in 
https://github.com/apache/flink/pull/2345 (FLINK-4340). The notes you are 
referencing were added around that time and I'm afraid they might have become 
mostly obsolete.

I'm pulling in Aljoscha who should be able to give a definitive answer here.

To make a long story short, it should simply work for you to upgrade from 1.11 
to 1.12 via a Savepoint.

Cheers,

Ufuk

On Wed, Jan 20, 2021, at 3:58 AM, 耿延杰 wrote:

Hi all,

As flink doc says:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/upgrading.html#preconditions


We do not support migration for state in RocksDB that was checkpointed using 
`semi-asynchronous` mode. In case your old job was using this mode, you can 
still change your job to use `fully-asynchronous` mode before taking the 
savepoint that is used as the basis for the migration.


So, my first question:
Is "semi-asynchronous" means "incremental checkpoint"?

And second question:
If so, assume I'm using flink-1.11 and RocksDB with incremental asynchronous 
checkpoint as state backend.
I should:
1. take a savepoint for old version(flink-1.11),
2. and change job to use "full asynchronous checkpoint" ,
3. restart old version(flink-1.11) job with new config (full asynchronous 
checkpoint),
4. then, take a savepoint
5. and finally, stop old version(flink-1.11) and upgrade to flink-1.12

Whether I understand correctly?

Best regards


Re: JobManager seems to be leaking temporary jar files

2021-01-27 Thread Chesnay Schepler
The problem of submitted jar files not being closed is a known one: 
https://issues.apache.org/jira/browse/FLINK-9844

IIRC it's not exactly trivial to fix since class-loading is involved.
It's not strictly related to the REST API; it also occurs in the CLI but 
is less noticeable since jars are usually not deleted.


As for the issue with deleteExtractedLibraries, Maciek is generally on a 
good track.
The explicit delete call is indeed missing. The best place to put is 
probably JarRunHandler#handleRequest, within handle after the job was run.

A similar issue also exists in the JarPlanHandler.

I've opened https://issues.apache.org/jira/browse/FLINK-21164 to fix 
this issue.


On 1/26/2021 12:21 PM, Maciek Próchniak wrote:


Hi Matthias,

I think the problem lies somewhere in JarRunHandler, as this is the 
place where the files are created.


I think these are not the files that are managed via BlobService, as 
they are not stored in BlobService folders (I made experiment changing 
default BlobServer folders).


It seems to me that CliFrontend deletes those files explicitly:

https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java#L250

whereas I couldn't find such invocation in JarRunHandler (not deleting 
those files does not fully explain leak on heap though...)



thanks,

maciek

On 26.01.2021 11:16, Matthias Pohl wrote:

Hi Maciek,
my understanding is that the jars in the JobManager should be cleaned 
up after the job is terminated (I assume that your jobs successfully 
finished). The jars are managed by the BlobService. The dispatcher 
will trigger the jobCleanup in [1] after job termination. Are there 
any suspicious log messages that might indicate an issue?

I'm adding Chesnay to this thread as he might have more insights here.

[1] 
https://github.com/apache/flink/blob/2c4e0ab921ccfaf003073ee50faeae4d4e4f4c93/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L797 



On Mon, Jan 25, 2021 at 8:37 PM Maciek Próchniak > wrote:


Hello,

in our setup we have:

- Flink 1.11.2

- job submission via REST API (first we upload jar, then we submit
multiple jobs with it)

- additional jars embedded in lib directory of main jar (this is
crucial
part)

When we submit jobs this way, Flink creates new temp jar files via
PackagedProgram.extractContainedLibraries method.

We observe that they are not removed after job finishes - it
seems that
PackagedProgram.deleteExtractedLibraries is not invoked when
using REST
API.

What's more, it seems that those jars remain open in JobManager
process.
We observe that when we delete them manually via scripts, the
disk space
is not reclaimed until process is restarted, we also see via heap
dump
inspection that java.util.zip.ZipFile$Source  objects remain,
pointing
to those files. This is quite a problem for us, as we submit
quite a few
jobs, and after a while we ran out of either heap or disk space on
JobManager process/host. Unfortunately, I cannot so far find
where this
leak would happen...

Does anybody have some pointers where we can search? Or how to
fix this
behaviour?


thanks,

maciek





Timers not firing until stream end

2021-01-27 Thread Pilgrim Beart
A newbie question:

I've created a basic Flink DataStream job for an IoT use-case, with file
source and sink for testing.
I key by device ID, then in a ProcessFunction set an EventTime Timer to
fire if a device falls silent, i.e. a timeout, which I cancel if another
message arrives from that device within the timeout.

My test source generates 3 devices, one of which falls silent for more than
the timeout period during the stream, then resumes again. So I expect the
Timer to fire for that device during the stream, and then for all the
Timers to fire after the end of the stream.

The timers do indeed fire at the end of the stream (e.g. with a timeout of
1000, the timers all fire 1000 after the end of the stream, which is
correct). But no timer fires for the device which falls silent during the
stream (even though other devices are still talking, advancing event time).
I've verified that I am keying correctly by ID.

I suspect this is something to do with Watermarks. I'm using
forBoundedOutOfOrderness watermarking with a duration of 0.

All suggestions welcome, thanks.

-Pilgrim
--
Learn more at https://devicepilot.com @devicepilot

 +44 7961 125282
See our latest features

and book me

for
a video call.


Re: Timers not firing until stream end

2021-01-27 Thread Chesnay Schepler

Based on your description you aren't doing anything obviously wrong.

Would it be possible for you to share the code with us?

On 1/27/2021 1:02 PM, Pilgrim Beart wrote:

A newbie question:

I've created a basic Flink DataStream job for an IoT use-case, with 
file source and sink for testing.
I key by device ID, then in a ProcessFunction set an EventTime 
Timer to fire if a device falls silent, i.e. a timeout, which I cancel 
if another message arrives from that device within the timeout.


My test source generates 3 devices, one of which falls silent for more 
than the timeout period during the stream, then resumes again. So I 
expect the Timer to fire for that device during the stream, and then 
for all the Timers to fire after the end of the stream.


The timers do indeed fire at the end of the stream (e.g. with a 
timeout of 1000, the timers all fire 1000 after the end of the stream, 
which is correct). But no timer fires for the device which falls 
silent during the stream (even though other devices are still talking, 
advancing event time). I've verified that I am keying correctly by ID.


I suspect this is something to do with Watermarks. I'm using 
forBoundedOutOfOrderness watermarking with a duration of 0.


All suggestions welcome, thanks.

-Pilgrim
--
Learn more at https://devicepilot.com  
@devicepilot 
 
 +44 7961 125282
See our latest features 
 
and book me 
 for 
a video call.






Re: rocksdb block cache usage

2021-01-27 Thread Chesnay Schepler
I don't quite understand the question; all 3 metrics you listed are the 
same one?


On 1/27/2021 9:23 AM, ?? wrote:

hi, all
?0?2 ?0?2I've enable state.backend.rocksdb.metrics.block-pinned-usage metric ,
?0?2and the 
flink_taskmanager_job_task_operator_window_contents_rocksdb_block_cache_pinned_usage 
metric exposed.
?0?2I'm confused?0?2 that the total memory used for block cache pinned is 
sum of 
flink_taskmanager_job_task_operator_window_contents_rocksdb_block_cache_pinned_usage 
or just
?0?2flink_taskmanager_job_task_operator_window_contents_rocksdb_block_cache_pinned_usage 
(for block cache usage the metric seems per slot)?







Re: Flink Job Manager & Task Manager heap size

2021-01-27 Thread Chesnay Schepler

Generally I see 2 options:

a) There's a memory leak somewhere. It would be good to know how the 
baseline heap usage during idleness evolves over time. Are the same 20 
jobs running continuously or are they (or others) periodically re-submitted?


b) The JVM just doesn't feel like running garbage collection. This 
doesn't seem that unreasonable given that there's plenty of memory to go 
around.


Overall, unless you run into OutOfMemoryErrors or the usage during 
idleness keeps steadily rising I wouldn't worry about it too much at 
this time.


On 1/27/2021 8:12 AM, Daniel Peled wrote:

Hi,

We have a flink cluster with 1 JM and 7 TM running about 20 jobs.
We have noticed that both JM & TM are consuming a huge amount of 
memory (several GB) *_although the jobs are doing nothing_* meaning no 
records are passing through the pipeline.
Checkpoints are enabled and the interval between checkpoints is 10 
second (but again no records coming in)


Attached are screenshots of metrics of both JM and one of the TM

Is that normal ?
Any tips for debugging this issue ?

BR,
Danny





Flink sql problem

2021-01-27 Thread ?g???U?[????
Hi all


          After grouping by users, message A 
arrives. If message B also arrives later, and the time of message B is less 
than that of message A within 10 minutes, mark the field in message A with Tag 
= True. How to achieve this?


Thanks
Jiazhi

Re: Timers not firing until stream end

2021-01-27 Thread Chesnay Schepler

||
You were right that it is an issue with the watermarks; outside of the 
when the job was stopped they were never emitted downstream, so no timer 
was ever triggered.


It appears that you need to set the setAutoWatermarkInterval in the 
ExecutionConfig via


env.getConfig().setAutoWatermarkInterval(Duration.ofMillis(500).toMillis());


to have them periodically emitted. Alternatively you could override 
BoundedOutOfOrdernessWatermarks#onEvent to also emit a watermark for 
event (for example, by calling #onPeriodicEmit).


Put another way, if you use any of the built-in WatermarkGenerators and 
use event-time, then it appears that you *must* set this interval.


This behavior is...less than ideal I must admit, and it does not appear 
to be properly documented.


On 1/27/2021 1:56 PM, Chesnay Schepler wrote:


Based on your description you aren't doing anything obviously wrong.

Would it be possible for you to share the code with us?

On 1/27/2021 1:02 PM, Pilgrim Beart wrote:

A newbie question:

I've created a basic Flink DataStream job for an IoT use-case, with 
file source and sink for testing.
I key by device ID, then in a ProcessFunction set an EventTime 
Timer to fire if a device falls silent, i.e. a timeout, which I 
cancel if another message arrives from that device within the timeout.


My test source generates 3 devices, one of which falls silent for 
more than the timeout period during the stream, then resumes again. 
So I expect the Timer to fire for that device during the stream, and 
then for all the Timers to fire after the end of the stream.


The timers do indeed fire at the end of the stream (e.g. with a 
timeout of 1000, the timers all fire 1000 after the end of the 
stream, which is correct). But no timer fires for the device which 
falls silent during the stream (even though other devices are still 
talking, advancing event time). I've verified that I am keying 
correctly by ID.


I suspect this is something to do with Watermarks. I'm using 
forBoundedOutOfOrderness watermarking with a duration of 0.


All suggestions welcome, thanks.

-Pilgrim
--
Learn more at https://devicepilot.com  
@devicepilot 
 
 +44 7961 125282
See our latest features 
 
and book me 
 for 
a video call.








Re: Datadog reporter timeout & OOM issue

2021-01-27 Thread Xingcan Cui
Hi Juha and Chesnay,

I do appreciate your prompt responses! I'll also continue to investigate
this issue.

Best,
Xingcan

On Wed, Jan 27, 2021, 04:32 Chesnay Schepler  wrote:

> (setting this field is currently not possible from a Flink user
> perspective; it is something I will investigate)
>
>
> On 1/27/2021 10:30 AM, Chesnay Schepler wrote:
>
> Yes, I could see how the memory issue can occur.
>
> However, it should be limited to buffering 64 requests; this is the
> default limit that okhttp imposes on concurrent calls.
> Maybe lowering this value already does the trick.
>
> On 1/27/2021 5:52 AM, Xingcan Cui wrote:
>
> Hi all,
>
> Recently, I tried to use the Datadog reporter to collect some user-defined
> metrics. Sometimes when reaching traffic peaks (which are also peaks for
> metrics), the HTTP client will throw the following exception:
>
> ```
> [OkHttp https://app.datadoghq.com/...] WARN
>  org.apache.flink.metrics.datadog.DatadogHttpClient  - Failed sending
> request to Datadog
> java.net.SocketTimeoutException: timeout
> at
> okhttp3.internal.http2.Http2Stream$StreamTimeout.newTimeoutException(Http2Stream.java:593)
> at
> okhttp3.internal.http2.Http2Stream$StreamTimeout.exitAndThrowIfTimedOut(Http2Stream.java:601)
> at
> okhttp3.internal.http2.Http2Stream.takeResponseHeaders(Http2Stream.java:146)
> at
> okhttp3.internal.http2.Http2Codec.readResponseHeaders(Http2Codec.java:120)
> at
> okhttp3.internal.http.CallServerInterceptor.intercept(CallServerInterceptor.java:75)
> at
> okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
> at
> okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:45)
> at
> okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
> at
> okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
> at
> okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93)
> at
> okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
> at
> okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
> at
> okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.java:93)
> at
> okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
> at
> okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.java:120)
> at
> okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
> at
> okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
> at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:185)
> at okhttp3.RealCall$AsyncCall.execute(RealCall.java:135)
> at okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> ```
>
> I guess this may be caused by the rate limit of the Datadog server since
> too many HTTP requests look like a kind of "attack". The real problem is
> that after throwing the above exceptions, the JVM heap size of the
> taskmanager starts to increase and finally causes OOM. I'm curious if this
> may be caused by metrics accumulation, i.e., for some reason, the client
> can't reconnect to the Datadog server and send the metrics so that the
> metrics data is buffered in memory and causes OOM.
>
> I'm running Flink 1.11.2 on EMR-6.2.0 with
> flink-metrics-datadog-1.11.2.jar.
>
> Thanks,
> Xingcan
>
>
>
>


Re: rocksdb block cache usage

2021-01-27 Thread Yun Tang
Hi,

If you have enabled managed memory, and since all rocksDB instances share the 
same block cache within one slot, all 
flink_taskmanager_job_task_operator_window_contents_rocksdb_block_cache_pinned_usage
 in the same slot would report the same value.


Best
Yun Tang

From: Chesnay Schepler 
Sent: Wednesday, January 27, 2021 20:59
To: 曾祥才 ; User-Flink 
Subject: Re: rocksdb block cache usage

I don't quite understand the question; all 3 metrics you listed are the same 
one?

On 1/27/2021 9:23 AM, 曾祥才 wrote:
hi, all
   I've enable state.backend.rocksdb.metrics.block-pinned-usage metric ,
 and the 
flink_taskmanager_job_task_operator_window_contents_rocksdb_block_cache_pinned_usage
 metric exposed.
 I'm confused  that the total memory used for block cache pinned is sum of 
flink_taskmanager_job_task_operator_window_contents_rocksdb_block_cache_pinned_usage
 or just
 
flink_taskmanager_job_task_operator_window_contents_rocksdb_block_cache_pinned_usage
 (for block cache usage the metric seems per slot)?






Stateful Functions - accessing the state aside of normal processing

2021-01-27 Thread Stephan Pelikan
Hi,

We are trying to use Statefuns for our tool and it seems to be a good fit. I 
already adopted it and it works quite well. However, we have millions of 
different states (all the same FunctionType but different ids) and each state 
consists of several @Persisted values (values and tables). We want to build an 
administration tool for examining the crowd of states (count, histogram, etc.) 
and each state in detail (the persisted-tables and -values).

Additionally we need some kind of dig-down functionality for finding those 
individual states. For example some of those persisted values can be used to 
categorize the crowd of states.

My question now is how to achieve this. Is there a way to browse and examine 
statefuns in a read-only fashion (their ids, their persisted values)? How can 
one achieve this without duplicating status in e.g. a relational database?

Thanks,
Stephan

PS: I have another questions but I will send them in separate mails to avoid 
mixing up topics.


Re: Timers not firing until stream end

2021-01-27 Thread Pilgrim Beart
Chesnay,
Thanks for this - I've made the change you suggested
(setAutoWatermarkInterval) but it hasn't changed the behaviour - timers
still get processed only on stream end.
I have pushed a new version, with this change, and also emitting some
information in a .log field.
If you search for "!!!" in Ingest.java and DPTimeoutFunction.java you'll
see the relevant changes.

In DPTimeoutFunction you'll see that if I add code to say "cancel the timer
only if it wouldn't have gone off" then the output is now correct -
individual devices do timeout. However, this output only appears at the end
of the stream (i.e. time jumps backwards as all the timers are processed)
so I still appear not to be seeing timer processing at the correct event
time. If there was no end of stream, I would never get any timeouts.

Below is the output I get when I run. This output is correct but:
a) only because I am manually cancelling timers in DPTimeoutFunction
(search for "!!!")
b) the timer events are timestamped correctly, but are not emitted into the
stream at the right time - and if the stream didn't end then no timeouts
would ever occur (which in particular means that devices that never come
back online will never get marked as offline).

Perhaps I do need to implement an onPeriodicEmit function? Does that
require a customer watermark strategy? I can see how to define a custom
watermark at link below, but unclear how to install that?
https://stackoverflow.com/questions/64369613/how-to-add-a-custom-watermarkgenerator-to-a-watermarkstrategy

{"ts":0,"id":"0","value":0.01,"is_online":true,"log":"new state. "}
{"ts":0,"id":"1","value":0.02,"is_online":true,"log":"new state. "}
{"ts":0,"id":"2","value":0.03,"is_online":true,"log":"new state. "}
{"ts":1000,"id":"0","value":0.04,"is_online":true,"log":"prevMsg.ts 0
msg_in.ts 1000 Cancelling previous timer. "}
{"ts":1000,"id":"1","value":0.05,"is_online":true,"log":"prevMsg.ts 0
msg_in.ts 1000 Cancelling previous timer. "}
{"ts":2000,"id":"0","value":0.06,"is_online":true,"log":"prevMsg.ts 1000
msg_in.ts 2000 Cancelling previous timer. "}
{"ts":2000,"id":"1","value":0.07,"is_online":true,"log":"prevMsg.ts 1000
msg_in.ts 2000 Cancelling previous timer. "}
{"ts":3000,"id":"0","value":0.08,"is_online":true,"log":"prevMsg.ts 2000
msg_in.ts 3000 Cancelling previous timer. "}
{"ts":3000,"id":"1","value":0.09,"is_online":true,"log":"prevMsg.ts 2000
msg_in.ts 3000 Cancelling previous timer. "}
{"ts":4000,"id":"0","value":0.1,"is_online":true,"log":"prevMsg.ts 3000
msg_in.ts 4000 Cancelling previous timer. "}
{"ts":4000,"id":"1","value":0.11,"is_online":true,"log":"prevMsg.ts 3000
msg_in.ts 4000 Cancelling previous timer. "}
{"ts":5000,"id":"0","value":0.12,"is_online":true,"log":"prevMsg.ts 4000
msg_in.ts 5000 Cancelling previous timer. "}
{"ts":5000,"id":"2","value":0.13,"is_online":true,"log":"prevMsg.ts 0
msg_in.ts 5000 "}
{"ts":6000,"id":"0","value":0.14,"is_online":true,"log":"prevMsg.ts 5000
msg_in.ts 6000 Cancelling previous timer. "}
{"ts":6000,"id":"2","value":0.15,"is_online":true,"log":"prevMsg.ts 5000
msg_in.ts 6000 Cancelling previous timer. "}
{"ts":7000,"id":"0","value":0.16,"is_online":true,"log":"prevMsg.ts 6000
msg_in.ts 7000 Cancelling previous timer. "}
{"ts":7000,"id":"1","value":0.17,"is_online":true,"log":"prevMsg.ts 4000
msg_in.ts 7000 "}
{"ts":7000,"id":"2","value":0.18,"is_online":true,"log":"prevMsg.ts 6000
msg_in.ts 7000 Cancelling previous timer. "}
{"ts":8000,"id":"0","value":0.19,"is_online":true,"log":"prevMsg.ts 7000
msg_in.ts 8000 Cancelling previous timer. "}
{"ts":8000,"id":"1","value":0.2,"is_online":true,"log":"prevMsg.ts 7000
msg_in.ts 8000 Cancelling previous timer. "}
{"ts":8000,"id":"2","value":0.21,"is_online":true,"log":"prevMsg.ts 7000
msg_in.ts 8000 Cancelling previous timer. "}
{"ts":9000,"id":"0","value":0.22,"is_online":true,"log":"prevMsg.ts 8000
msg_in.ts 9000 Cancelling previous timer. "}
{"ts":9000,"id":"1","value":0.23,"is_online":true,"log":"prevMsg.ts 8000
msg_in.ts 9000 Cancelling previous timer. "}
{"ts":9000,"id":"2","value":0.24,"is_online":true,"log":"prevMsg.ts 8000
msg_in.ts 9000 Cancelling previous timer. "}
{"ts":1,"id":"0","value":0.25,"is_online":true,"log":"prevMsg.ts 9000
msg_in.ts 1 Cancelling previous timer. "}
{"ts":1,"id":"1","value":0.26,"is_online":true,"log":"prevMsg.ts 9000
msg_in.ts 1 Cancelling previous timer. "}
{"ts":1,"id":"2","value":0.27,"is_online":true,"log":"prevMsg.ts 9000
msg_in.ts 1 Cancelling previous timer. "}
{"ts":1001,"id":"2","is_online":false} // These are the "going offline"
events that we want to see. But they are emitted only once the stream has
ended.
{"ts":5001,"id":"1","is_online":false}
{"ts":11001,"id":"1","is_online":false}
{"ts":11001,"id":"0","is_online":false}
{"ts":11001,"id":"2","is_online":false}

Thanks,

-Pilgrim
--
Learn more at https://devicepilot.com @devicepilot


Re: Timers not firing until stream end

2021-01-27 Thread Aljoscha Krettek

On 2021/01/27 15:09, Chesnay Schepler wrote:
Put another way, if you use any of the built-in WatermarkGenerators and 
use event-time, then it appears that you *must* set this interval.


This behavior is...less than ideal I must admit, and it does not 
appear to be properly documented.


Setting the watermark interval is done when calling 
`env.setStreamTimeCharacteristic()`. It's the first thing we documented

for working with event time [1].

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_time.html


To me it was always a usability problem that we didn't have event time 
enabled by default. We didn't have this because of "performance 
considerations". This changed in Flink 1.12 [2].


[2] https://issues.apache.org/jira/browse/FLINK-19317

@Pilgrim: Which version of Flink are you using?


Proctime consistency

2021-01-27 Thread Rex Fenley
Hello,

I'm looking at ways to deduplicate data and found [1], but does proctime
get committed with operators? How does this work against clock skew on
different machines?

Thanks

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#deduplication

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com  |  BLOG 
 |  FOLLOW
US   |  LIKE US



Re: Timers not firing until stream end

2021-01-27 Thread Pilgrim Beart
I am calling
 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
I am using Flink 11.1 (because I need to run on AWS Kinesis Data Analytics).

-Pilgrim
--
Learn more at https://devicepilot.com @devicepilot

 +44 7961 125282
See our latest features

and book me

for
a video call.



On Wed, 27 Jan 2021 at 17:07, Aljoscha Krettek  wrote:

> On 2021/01/27 15:09, Chesnay Schepler wrote:
> >Put another way, if you use any of the built-in WatermarkGenerators and
> >use event-time, then it appears that you *must* set this interval.
> >
> >This behavior is...less than ideal I must admit, and it does not
> >appear to be properly documented.
>
> Setting the watermark interval is done when calling
> `env.setStreamTimeCharacteristic()`. It's the first thing we documented
> for working with event time [1].
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_time.html
>
> To me it was always a usability problem that we didn't have event time
> enabled by default. We didn't have this because of "performance
> considerations". This changed in Flink 1.12 [2].
>
> [2] https://issues.apache.org/jira/browse/FLINK-19317
>
> @Pilgrim: Which version of Flink are you using?
>


Re: Timers not firing until stream end

2021-01-27 Thread Chesnay Schepler
My bad, I was still using the custom WatermarkStrategy that emits a 
watermark for each event.


.assignTimestampsAndWatermarks( new WatermarkStrategy() { @Override 
public WatermarkGenerator 
createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) { 
return new BoundedOutOfOrdernessWatermarks(Duration.ofSeconds(1)) { 
@Override public void onEvent(Tevent, long eventTimestamp, 
WatermarkOutput output) { super.onEvent(event, eventTimestamp, output); 
super.onPeriodicEmit(output); } }; } } .withTimestampAssigner(...)



@Aljoscha This is about Flink 1.11. Since the periodic watermarks are 
dependent on processing time, am I assuming correctly if the job 
finishes quickly that watermarks may never be emitted (except for those 
at the job)? Is there any way to emit periodic watermarks based on event 
time?
Is there any way to enable punctuated watermarks for the existing 
watermark strategies without having to implement a custom one?


On 1/27/2021 5:57 PM, Pilgrim Beart wrote:

Chesnay,
Thanks for this - I've made the change you suggested 
(setAutoWatermarkInterval) but it hasn't changed the behaviour - 
timers still get processed only on stream end.
I have pushed a new version, with this change, and also emitting some 
information in a .log field.
If you search for "!!!" in Ingest.java and DPTimeoutFunction.java 
you'll see the relevant changes.


In DPTimeoutFunction you'll see that if I add code to say "cancel the 
timer only if it wouldn't have gone off" then the output is now 
correct - individual devices do timeout. However, this output only 
appears at the end of the stream (i.e. time jumps backwards as all the 
timers are processed) so I still appear not to be seeing timer 
processing at the correct event time. If there was no end of stream, I 
would never get any timeouts.


Below is the output I get when I run. This output is correct but:
a) only because I am manually cancelling timers in DPTimeoutFunction 
(search for "!!!")
b) the timer events are timestamped correctly, but are not emitted 
into the stream at the right time - and if the stream didn't end then 
no timeouts would ever occur (which in particular means that devices 
that never come back online will never get marked as offline).


Perhaps I do need to implement an onPeriodicEmit function? Does that 
require a customer watermark strategy? I can see how to define a 
custom watermark at link below, but unclear how to install that?
https://stackoverflow.com/questions/64369613/how-to-add-a-custom-watermarkgenerator-to-a-watermarkstrategy 



{"ts":0,"id":"0","value":0.01,"is_online":true,"log":"new state. "}
{"ts":0,"id":"1","value":0.02,"is_online":true,"log":"new state. "}
{"ts":0,"id":"2","value":0.03,"is_online":true,"log":"new state. "}
{"ts":1000,"id":"0","value":0.04,"is_online":true,"log":"prevMsg.ts 0 
msg_in.ts 1000 Cancelling previous timer. "}
{"ts":1000,"id":"1","value":0.05,"is_online":true,"log":"prevMsg.ts 0 
msg_in.ts 1000 Cancelling previous timer. "}
{"ts":2000,"id":"0","value":0.06,"is_online":true,"log":"prevMsg.ts 
1000 msg_in.ts 2000 Cancelling previous timer. "}
{"ts":2000,"id":"1","value":0.07,"is_online":true,"log":"prevMsg.ts 
1000 msg_in.ts 2000 Cancelling previous timer. "}
{"ts":3000,"id":"0","value":0.08,"is_online":true,"log":"prevMsg.ts 
2000 msg_in.ts 3000 Cancelling previous timer. "}
{"ts":3000,"id":"1","value":0.09,"is_online":true,"log":"prevMsg.ts 
2000 msg_in.ts 3000 Cancelling previous timer. "}
{"ts":4000,"id":"0","value":0.1,"is_online":true,"log":"prevMsg.ts 
3000 msg_in.ts 4000 Cancelling previous timer. "}
{"ts":4000,"id":"1","value":0.11,"is_online":true,"log":"prevMsg.ts 
3000 msg_in.ts 4000 Cancelling previous timer. "}
{"ts":5000,"id":"0","value":0.12,"is_online":true,"log":"prevMsg.ts 
4000 msg_in.ts 5000 Cancelling previous timer. "}
{"ts":5000,"id":"2","value":0.13,"is_online":true,"log":"prevMsg.ts 0 
msg_in.ts 5000 "}
{"ts":6000,"id":"0","value":0.14,"is_online":true,"log":"prevMsg.ts 
5000 msg_in.ts 6000 Cancelling previous timer. "}
{"ts":6000,"id":"2","value":0.15,"is_online":true,"log":"prevMsg.ts 
5000 msg_in.ts 6000 Cancelling previous timer. "}
{"ts":7000,"id":"0","value":0.16,"is_online":true,"log":"prevMsg.ts 
6000 msg_in.ts 7000 Cancelling previous timer. "}
{"ts":7000,"id":"1","value":0.17,"is_online":true,"log":"prevMsg.ts 
4000 msg_in.ts 7000 "}
{"ts":7000,"id":"2","value":0.18,"is_online":true,"log":"prevMsg.ts 
6000 msg_in.ts 7000 Cancelling previous timer. "}
{"ts":8000,"id":"0","value":0.19,"is_online":true,"log":"prevMsg.ts 
7000 msg_in.ts 8000 Cancelling previous timer. "}
{"ts":8000,"id":"1","value":0.2,"is_online":true,"log":"prevMsg.ts 
7000 msg_in.ts 8000 Cancelling previous timer. "}
{"ts":8000,"id":"2","value":0.21,"is_online":true,"log":"prevMsg.ts 
7000 msg_in.ts 8000 Cancelling previous timer. "}
{"ts":9000,"id":"0","value":0.22,"is_online"

Re: Timers not firing until stream end

2021-01-27 Thread Chesnay Schepler
Note that while this does fix the issue of timers not firing while the 
job is running, it seems to be firing too many timers...


On 1/27/2021 6:49 PM, Chesnay Schepler wrote:
My bad, I was still using the custom WatermarkStrategy that emits a 
watermark for each event.
.assignTimestampsAndWatermarks( new WatermarkStrategy() { @Override 
public WatermarkGenerator 
createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) { 
return new BoundedOutOfOrdernessWatermarks(Duration.ofSeconds(1)) { 
@Override public void onEvent(Tevent, long eventTimestamp, 
WatermarkOutput output) { super.onEvent(event, eventTimestamp, 
output); super.onPeriodicEmit(output); } }; } } .withTimestampAssigner(...)


@Aljoscha This is about Flink 1.11. Since the periodic watermarks are 
dependent on processing time, am I assuming correctly if the job 
finishes quickly that watermarks may never be emitted (except for 
those at the job)? Is there any way to emit periodic watermarks based 
on event time?
Is there any way to enable punctuated watermarks for the existing 
watermark strategies without having to implement a custom one?


On 1/27/2021 5:57 PM, Pilgrim Beart wrote:

Chesnay,
Thanks for this - I've made the change you suggested 
(setAutoWatermarkInterval) but it hasn't changed the behaviour - 
timers still get processed only on stream end.
I have pushed a new version, with this change, and also emitting some 
information in a .log field.
If you search for "!!!" in Ingest.java and DPTimeoutFunction.java 
you'll see the relevant changes.


In DPTimeoutFunction you'll see that if I add code to say "cancel the 
timer only if it wouldn't have gone off" then the output is now 
correct - individual devices do timeout. However, this output only 
appears at the end of the stream (i.e. time jumps backwards as all 
the timers are processed) so I still appear not to be seeing timer 
processing at the correct event time. If there was no end of stream, 
I would never get any timeouts.


Below is the output I get when I run. This output is correct but:
a) only because I am manually cancelling timers in DPTimeoutFunction 
(search for "!!!")
b) the timer events are timestamped correctly, but are not emitted 
into the stream at the right time - and if the stream didn't end then 
no timeouts would ever occur (which in particular means that devices 
that never come back online will never get marked as offline).


Perhaps I do need to implement an onPeriodicEmit function? Does that 
require a customer watermark strategy? I can see how to define a 
custom watermark at link below, but unclear how to install that?
https://stackoverflow.com/questions/64369613/how-to-add-a-custom-watermarkgenerator-to-a-watermarkstrategy 



{"ts":0,"id":"0","value":0.01,"is_online":true,"log":"new state. "}
{"ts":0,"id":"1","value":0.02,"is_online":true,"log":"new state. "}
{"ts":0,"id":"2","value":0.03,"is_online":true,"log":"new state. "}
{"ts":1000,"id":"0","value":0.04,"is_online":true,"log":"prevMsg.ts 0 
msg_in.ts 1000 Cancelling previous timer. "}
{"ts":1000,"id":"1","value":0.05,"is_online":true,"log":"prevMsg.ts 0 
msg_in.ts 1000 Cancelling previous timer. "}
{"ts":2000,"id":"0","value":0.06,"is_online":true,"log":"prevMsg.ts 
1000 msg_in.ts 2000 Cancelling previous timer. "}
{"ts":2000,"id":"1","value":0.07,"is_online":true,"log":"prevMsg.ts 
1000 msg_in.ts 2000 Cancelling previous timer. "}
{"ts":3000,"id":"0","value":0.08,"is_online":true,"log":"prevMsg.ts 
2000 msg_in.ts 3000 Cancelling previous timer. "}
{"ts":3000,"id":"1","value":0.09,"is_online":true,"log":"prevMsg.ts 
2000 msg_in.ts 3000 Cancelling previous timer. "}
{"ts":4000,"id":"0","value":0.1,"is_online":true,"log":"prevMsg.ts 
3000 msg_in.ts 4000 Cancelling previous timer. "}
{"ts":4000,"id":"1","value":0.11,"is_online":true,"log":"prevMsg.ts 
3000 msg_in.ts 4000 Cancelling previous timer. "}
{"ts":5000,"id":"0","value":0.12,"is_online":true,"log":"prevMsg.ts 
4000 msg_in.ts 5000 Cancelling previous timer. "}
{"ts":5000,"id":"2","value":0.13,"is_online":true,"log":"prevMsg.ts 0 
msg_in.ts 5000 "}
{"ts":6000,"id":"0","value":0.14,"is_online":true,"log":"prevMsg.ts 
5000 msg_in.ts 6000 Cancelling previous timer. "}
{"ts":6000,"id":"2","value":0.15,"is_online":true,"log":"prevMsg.ts 
5000 msg_in.ts 6000 Cancelling previous timer. "}
{"ts":7000,"id":"0","value":0.16,"is_online":true,"log":"prevMsg.ts 
6000 msg_in.ts 7000 Cancelling previous timer. "}
{"ts":7000,"id":"1","value":0.17,"is_online":true,"log":"prevMsg.ts 
4000 msg_in.ts 7000 "}
{"ts":7000,"id":"2","value":0.18,"is_online":true,"log":"prevMsg.ts 
6000 msg_in.ts 7000 Cancelling previous timer. "}
{"ts":8000,"id":"0","value":0.19,"is_online":true,"log":"prevMsg.ts 
7000 msg_in.ts 8000 Cancelling previous timer. "}
{"ts":8000,"id":"1","value":0.2,"is_online":true,"log":"prevMsg.ts 
7000 msg_in.ts 8000 Cancelling previous

Re: Timers not firing until stream end

2021-01-27 Thread Chesnay Schepler

Actually, if the parallelism is 1 then it works as it should. sigh

On 1/27/2021 6:52 PM, Chesnay Schepler wrote:
Note that while this does fix the issue of timers not firing while the 
job is running, it seems to be firing too many timers...


On 1/27/2021 6:49 PM, Chesnay Schepler wrote:
My bad, I was still using the custom WatermarkStrategy that emits a 
watermark for each event.
.assignTimestampsAndWatermarks( new WatermarkStrategy() { 
@Override public WatermarkGenerator 
createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) 
{ return new 
BoundedOutOfOrdernessWatermarks(Duration.ofSeconds(1)) { @Override 
public void onEvent(Tevent, long eventTimestamp, WatermarkOutput 
output) { super.onEvent(event, eventTimestamp, output); 
super.onPeriodicEmit(output); } }; } } .withTimestampAssigner(...)


@Aljoscha This is about Flink 1.11. Since the periodic watermarks are 
dependent on processing time, am I assuming correctly if the job 
finishes quickly that watermarks may never be emitted (except for 
those at the job)? Is there any way to emit periodic watermarks based 
on event time?
Is there any way to enable punctuated watermarks for the existing 
watermark strategies without having to implement a custom one?


On 1/27/2021 5:57 PM, Pilgrim Beart wrote:

Chesnay,
Thanks for this - I've made the change you suggested 
(setAutoWatermarkInterval) but it hasn't changed the behaviour - 
timers still get processed only on stream end.
I have pushed a new version, with this change, and also emitting 
some information in a .log field.
If you search for "!!!" in Ingest.java and DPTimeoutFunction.java 
you'll see the relevant changes.


In DPTimeoutFunction you'll see that if I add code to say "cancel 
the timer only if it wouldn't have gone off" then the output is now 
correct - individual devices do timeout. However, this output only 
appears at the end of the stream (i.e. time jumps backwards as all 
the timers are processed) so I still appear not to be seeing timer 
processing at the correct event time. If there was no end of stream, 
I would never get any timeouts.


Below is the output I get when I run. This output is correct but:
a) only because I am manually cancelling timers in DPTimeoutFunction 
(search for "!!!")
b) the timer events are timestamped correctly, but are not emitted 
into the stream at the right time - and if the stream didn't end 
then no timeouts would ever occur (which in particular means that 
devices that never come back online will never get marked as offline).


Perhaps I do need to implement an onPeriodicEmit function? Does that 
require a customer watermark strategy? I can see how to define a 
custom watermark at link below, but unclear how to install that?
https://stackoverflow.com/questions/64369613/how-to-add-a-custom-watermarkgenerator-to-a-watermarkstrategy 



{"ts":0,"id":"0","value":0.01,"is_online":true,"log":"new state. "}
{"ts":0,"id":"1","value":0.02,"is_online":true,"log":"new state. "}
{"ts":0,"id":"2","value":0.03,"is_online":true,"log":"new state. "}
{"ts":1000,"id":"0","value":0.04,"is_online":true,"log":"prevMsg.ts 
0 msg_in.ts 1000 Cancelling previous timer. "}
{"ts":1000,"id":"1","value":0.05,"is_online":true,"log":"prevMsg.ts 
0 msg_in.ts 1000 Cancelling previous timer. "}
{"ts":2000,"id":"0","value":0.06,"is_online":true,"log":"prevMsg.ts 
1000 msg_in.ts 2000 Cancelling previous timer. "}
{"ts":2000,"id":"1","value":0.07,"is_online":true,"log":"prevMsg.ts 
1000 msg_in.ts 2000 Cancelling previous timer. "}
{"ts":3000,"id":"0","value":0.08,"is_online":true,"log":"prevMsg.ts 
2000 msg_in.ts 3000 Cancelling previous timer. "}
{"ts":3000,"id":"1","value":0.09,"is_online":true,"log":"prevMsg.ts 
2000 msg_in.ts 3000 Cancelling previous timer. "}
{"ts":4000,"id":"0","value":0.1,"is_online":true,"log":"prevMsg.ts 
3000 msg_in.ts 4000 Cancelling previous timer. "}
{"ts":4000,"id":"1","value":0.11,"is_online":true,"log":"prevMsg.ts 
3000 msg_in.ts 4000 Cancelling previous timer. "}
{"ts":5000,"id":"0","value":0.12,"is_online":true,"log":"prevMsg.ts 
4000 msg_in.ts 5000 Cancelling previous timer. "}
{"ts":5000,"id":"2","value":0.13,"is_online":true,"log":"prevMsg.ts 
0 msg_in.ts 5000 "}
{"ts":6000,"id":"0","value":0.14,"is_online":true,"log":"prevMsg.ts 
5000 msg_in.ts 6000 Cancelling previous timer. "}
{"ts":6000,"id":"2","value":0.15,"is_online":true,"log":"prevMsg.ts 
5000 msg_in.ts 6000 Cancelling previous timer. "}
{"ts":7000,"id":"0","value":0.16,"is_online":true,"log":"prevMsg.ts 
6000 msg_in.ts 7000 Cancelling previous timer. "}
{"ts":7000,"id":"1","value":0.17,"is_online":true,"log":"prevMsg.ts 
4000 msg_in.ts 7000 "}
{"ts":7000,"id":"2","value":0.18,"is_online":true,"log":"prevMsg.ts 
6000 msg_in.ts 7000 Cancelling previous timer. "}
{"ts":8000,"id":"0","value":0.19,"is_online":true,"log":"prevMsg.ts 
7000 msg_in.ts 8000 Cancelling previo

TaskManager crash. Zookeeper timeout

2021-01-27 Thread Colletta, Edward
Using flink 11.2 on java 11, session cluster with 16 jobs running on aws ecs 
instances.  Cluster has 3 JMs and 3 TMs, separate zookeeper cluster has 3 nodes.

One of our taskmanagers crashed today with what seems to be rooted in a 
zookeeper timeout.   We are wondering if there is any tuning that might cause 
this timeout.  Any help will be greatly appreciated.

The first sign of trouble in the log is the following:

2021-01-27 11:16:39,795 WARN  
org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn [] - Client 
session timed out, have not heard from server in 34951ms for sessionid 
0x140c01570036
2021-01-27 11:16:39,795 INFO  
org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn [] - Client 
session timed out, have not heard from server in 34951ms for sessionid 
0x140c01570036, closing socket connection and attempting reconnect
2021-01-27 11:16:39,897 INFO  
org.apache.flink.shaded.curator4.org.apache.curator.framework.state.ConnectionStateManager
 [] - State change: SUSPENDED
2021-01-27 11:16:39,969 WARN  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService [] - 
Connection to ZooKeeper suspended. Can no longer retrieve the leader from 
ZooKeeper.
2021-01-27 11:16:39,969 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - JobManager 
for job 7613291aea3f4892a0deed0e7036e229 with leader id 
8959b1fb00fdd4e3d28daade48204e1f lost leadership.
2021-01-27 11:16:39,969 WARN  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService [] - 
Connection to ZooKeeper suspended. Can no longer retrieve the leader from 
ZooKeeper.
2021-01-27 11:16:39,969 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - JobManager 
for job 3230dacf7fa0b8b8f9fe1c77ebdde2bb with leader id 
bccda87aa8ab14f23e98a4b6d2bf4081 lost leadership.
2021-01-27 11:16:39,969 WARN  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService [] - 
Connection to ZooKeeper suspended. Can no longer retrieve the leader from 
ZooKeeper.
2021-01-27 11:16:39,969 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - JobManager 
for job 8f2ee940006ebb6d8f6d12e3db917da3 with leader id 
b72d64c2ec112d96cc3b93697d85478d lost leadership.
2021-01-27 11:16:39,969 WARN  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService [] - 
Connection to ZooKeeper suspended. Can no longer retrieve the leader from 
ZooKeeper.
2021-01-27 11:16:39,969 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - JobManager 
for job aaec26e3924e81c12bd5a6d71f6c0d77 with leader id 
8d91fefd14539d11d60a16e0e5cd45b1 lost leadership.
2021-01-27 11:16:39,969 WARN  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService [] - 
Connection to ZooKeeper suspended. Can no longer retrieve the leader from 
ZooKeeper.
2021-01-27 11:16:39,969 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - JobManager 
for job 2d5f912867ff70a58638aff51c7f6f33 with leader id 
b24724d3e03bee3486fdc5dc616b4a9c lost leadership.
2021-01-27 11:16:39,969 WARN  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService [] - 
Connection to ZooKeeper suspended. Can no longer retrieve the leader from 
ZooKeeper.
2021-01-27 11:16:39,969 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - JobManager 
for job 29eb631a7a07aa6b2c0224972b9937bb with leader id 
8479de79b7eda73fca6593da93c04027 lost leadership.
2021-01-27 11:16:39,970 WARN  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService [] - 
Connection to ZooKeeper suspended. Can no longer retrieve the leader from 
ZooKeeper.
2021-01-27 11:16:39,970 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - JobManager 
for job bc7688332e73f330f08c95428630b99e with leader id 
a541d5eb3b60d29afc3a16cab2f742e7 lost leadership.
2021-01-27 11:16:39,970 WARN  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService [] - 
Connection to ZooKeeper suspended. Can no longer retrieve the leader from 
ZooKeeper.
2021-01-27 11:16:39,970 WARN  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService [] - 
Connection to ZooKeeper suspended. Can no longer retrieve the leader from 
ZooKeeper.
2021-01-27 11:16:39,970 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - JobManager 
for job a70b0023b705c39fa66f47f1a666b65d with leader id 
a0bfc94c9ff40689a7143396cafe4ac7 lost leadership.
2021-01-27 11:16:39,970 WARN  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService [] - 
Connection to ZooKeeper suspended. Can no longer retrieve the leader from 
ZooKeeper.
2021-01-27 11:16:39,970 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - JobManager 
for job 4c929f573971b8520a76ee1dfe5c3e35 with leader id 
922675f382f87225300696bae21841cc lost leadership.
2021-01-27 11:16:39,970 WARN  
org.apache.flink.runtime.leaderret

How to maintain output order of events by execution initiation time.

2021-01-27 Thread narasimha
Hi,

Below is my dataflow

DataStream stream ...

stream.process(new ProcessFunction())
.sink(...)


class ProcessFunction ...{
MapState time ...;

processElement(...){
//add Element to Mapstate by eventtime
// register eventime+60 seconds
}

// Reason for maintaining onTimer in the process function is to update
the timer period.
onTime(timestamp, ...) {
// Iterate over the last n elements added to business objects at a
given timestamp.
// Emit to output if condition passes
}
}


BusinessObject {
List entities;

addEntity(BusinessEntities b);
}


This all looks good on code, but when processing elements with rate of 2k,
the execution result emissions order is changing, i.e,
results of time T+t, are getting emitted first than results of T.

Can someone give suggestions on how this can be handled, so that the
results emissions order is guaranteed.

Would popping them out and emitting using Sliding window of 1 sec would
solve this?


Thanks,
Narasimha

-- 
A.Narasimha Swamy


Flink running in k8s pod - pod is able to access S3 bucket, flink does not

2021-01-27 Thread Oran Shuster
So i'm really stumped on this for a couple of days now
Some general info - 
Flink version 1.12.1, using k8s HA service. The k8s is self managed on AWS
our checkpoints and savepoints are on s3, i created a new bucket just for it 
and set the proper permissions to the k8s node

The job manager is working, i can access the UI and upload a job. Looking at 
the startup logs i can see the bucket i set with no errors

2021-01-27 14:46:38,740 INFO  org.apache.flink.runtime.blob.FileSystemBlobStore 
   [] - Creating highly available BLOB storage directory at 
s3://ha-storage/default/blob

(while there is no error, i can't find that directory in the bucket)


However, once i submit the job i get an exception. Looking at the job manager 
logs im getting S3 access denied

2021-01-27 14:28:08,628 ERROR 
org.apache.flink.runtime.blob.BlobServerConnection   [] - PUT operation 
failed
java.io.IOException: com.amazonaws.services.s3.model.AmazonS3Exception: Access 
Denied (Service: Amazon S3; Status Code: 403; Error Code: AccessDenied; Request 
ID: 8W0N0T2R4P8P7YBT; S3 Extended Request ID: 
B6zBzIoBmzNoJ4bWQE9Ydt65+IN8pyHeJQuTc28AscyG0dSEM3G7WZHutOT2scJ/6WCoOuRi27A=; 
Proxy: null), S3 Extended Request ID: 
B6zBzIoBmzNoJ4bWQE9Ydt65+IN8pyHeJQuTc28AscyG0dSEM3G7WZHutOT2scJ/6WCoOuRi27A=

So i created a new image based on the flink image with the aws cli installed 
and tried doing some s3 actions from the flink user through the shell

flink@flink-jobmanager-1-12-f6cf4b5b6-xmkvb:~$ aws s3 ls s3://
flink@flink-jobmanager-1-12-f6cf4b5b6-xmkvb:~$ touch oran.txt
flink@flink-jobmanager-1-12-f6cf4b5b6-xmkvb:~$ aws s3 cp oran.txt 
s3:///oran.txt
upload: ./oran.txt to s3://houzz-flink-1-12-session-cluster/oran.txt

Some more information - we already have an older version of flink running on 
the same cluster/namespace (version 1.9.1) and it also uses s3 (a different 
bucket) and it's working. we used a homebrewed image for that version but it is 
closely based on how the original flink image is created (no funny buisness)

Also, the s3 plugin im using is flink-s3-fs-presto-1.12.1.jar using the 
ENABLE_BUILT_IN_PLUGINS env variable. i tried using the hadoop one but got an 
error message it's missing, not sure what's up with that.

totally working... and here i'm stuck. This makes 0 sense to me so i thought i 
should ask in the mailing list
Thanks for all the help




Re: Deduplicating record amplification

2021-01-27 Thread Arvid Heise
Hi Rex,

indeed these two statements look like they contradict each other, but they
are looking at both sides from the same coin.
Flink is simply putting records in FIFO in windows. That is, there is no
ordering on event time if there are late events. So if your elements arrive
ordered, the ordering is retained. If your elements arrive unordered, the
same unordered order is retained.

However, note that Flink can only guarantee FIFO according to your
topology. Consider a source with parallelism 2, each reading data from an
ordered kafka partition (k1, k2) respectively. Each partition has records
with keys, such that no key appears in both partitions (default behavior if
you set keys but no partition while writing to Kafka).
1) Let's assume you do a simple transformation and write them back into
kafka with the same key. Then you can be sure that the order of the records
is retained.

2) Now you add a random shuffle and have the transformation. Now two
successive records may be processed in parallel and there is a race
condition who is written first into Kafka. So order is not retained.

3) You shuffle both partitions by the Kafka key (keyby) and do some
aggregation. Two successive records with the same key will always be
processed by the same aggregation operator. So the order is retained for
each key (note that this is what you usually want and want Kafka gives you
if you don't set the partition explicitly and just provide a key)

4) You shuffle both partitions by a different key. Then two successive
Kafka records could be again calculated in parallel such that there is a
race condition.

Note that windows are a kind of aggregation.

So Flink is never going to restore an ordering that is not there (because
it's too costly and there are too many unknowns). But you can infer the
guarantees by analyzing your topology.

---

Please note that there is a common pitfall when you work with Kafka:
- Ordering of records in Kafka is only guaranteed if you set
*max.in.flight.requests.per.connection
*to 1
*. [1]*
*- *Often you also want to set *enable.idempotence* and *acks=all*

That is true for the upstream application and if you plan back to write to
Kafka you also need to set that in Flink.

[1]
https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html

On Wed, Jan 27, 2021 at 6:08 AM Rex Fenley  wrote:

> Hello,
>
> I began reading
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/concepts/concepts.html#parallel-dataflows
>
>-
>
>*> Redistributing* streams (between *map()* and *keyBy/window*, as
>well as between *keyBy/window* and *sink*) change the partitioning of
>streams. Each *operator subtask* sends data to different target
>subtasks, depending on the selected transformation. Examples are
>*keyBy()* (re-partitions by hash code), *broadcast()*, or *rebalance()*
>(random redistribution). In a *redistributing* exchange, order among
>elements is only preserved for each pair of sending- and receiving task
>(for example subtask[1] of *map()* and subtask[2] of *keyBy/window*).
>
> This makes it sounds like ordering on the same partition/key is always
> maintained. Which is exactly the ordering guarantee that I need. This seems
> to slightly contradict the statement "Flink provides no guarantees about
> the order of the elements within a window" for keyed state. So is it true
> that ordering _is_ guaranteed for identical keys?
>
> If I'm not mistaken, the state in the TableAPI is always considered keyed
> state for a join or aggregate. Or am I missing something?
>
> Thanks!
>
> On Tue, Jan 26, 2021 at 8:53 PM Rex Fenley  wrote:
>
>> Our data arrives in order from Kafka, so we are hoping to use that same
>> order for our processing.
>>
>> On Tue, Jan 26, 2021 at 8:40 PM Rex Fenley  wrote:
>>
>>> Going further, if "Flink provides no guarantees about the order of the
>>> elements within a window" then with minibatch, which I assume uses a window
>>> under the hood, any aggregates that expect rows to arrive in order will
>>> fail to keep their consistency. Is this correct?
>>>
>>> On Tue, Jan 26, 2021 at 5:36 PM Rex Fenley  wrote:
>>>
 Hello,

 We have a job from CDC to a large unbounded Flink plan to Elasticsearch.

 Currently, we have been relentlessly trying to reduce our record
 amplification which, when our Elasticsearch index is near fully populated,
 completely bottlenecks our write performance. We decided recently to try a
 new job using mini-batch. At first this seemed promising but at some point
 we began getting huge record amplification in a join operator. It appears
 that minibatch may only batch on aggregate operators?

 So we're now thinking that we should have a window before our ES sink
 which only takes the last record for any unique document id in the window,
 since that's all we really want to send anyway. However, when investigating
 turning a table, to a keyed

Re: Deduplicating record amplification

2021-01-27 Thread Rex Fenley
This is great info, thanks!

My question then becomes, what constitutes a random shuffle? Currently
we're using the Table API with minibatch on flink v1.11.3. Do our joins
output a keyed stream of records by join key or is this random? I imagine
that they'd have to have a key for retracts and accumulates to arrive in
order on the next downstream operator. Same with aggs but on the groupBy
key.

Does this sound correct to you?

Thanks!

On Wed, Jan 27, 2021 at 11:39 AM Arvid Heise  wrote:

> Hi Rex,
>
> indeed these two statements look like they contradict each other, but they
> are looking at both sides from the same coin.
> Flink is simply putting records in FIFO in windows. That is, there is no
> ordering on event time if there are late events. So if your elements arrive
> ordered, the ordering is retained. If your elements arrive unordered, the
> same unordered order is retained.
>
> However, note that Flink can only guarantee FIFO according to your
> topology. Consider a source with parallelism 2, each reading data from an
> ordered kafka partition (k1, k2) respectively. Each partition has records
> with keys, such that no key appears in both partitions (default behavior if
> you set keys but no partition while writing to Kafka).
> 1) Let's assume you do a simple transformation and write them back into
> kafka with the same key. Then you can be sure that the order of the records
> is retained.
>
> 2) Now you add a random shuffle and have the transformation. Now two
> successive records may be processed in parallel and there is a race
> condition who is written first into Kafka. So order is not retained.
>
> 3) You shuffle both partitions by the Kafka key (keyby) and do some
> aggregation. Two successive records with the same key will always be
> processed by the same aggregation operator. So the order is retained for
> each key (note that this is what you usually want and want Kafka gives you
> if you don't set the partition explicitly and just provide a key)
>
> 4) You shuffle both partitions by a different key. Then two successive
> Kafka records could be again calculated in parallel such that there is a
> race condition.
>
> Note that windows are a kind of aggregation.
>
> So Flink is never going to restore an ordering that is not there (because
> it's too costly and there are too many unknowns). But you can infer the
> guarantees by analyzing your topology.
>
> ---
>
> Please note that there is a common pitfall when you work with Kafka:
> - Ordering of records in Kafka is only guaranteed if you set 
> *max.in.flight.requests.per.connection
> *to 1
> *. [1]*
> *- *Often you also want to set *enable.idempotence* and *acks=all*
>
> That is true for the upstream application and if you plan back to write to
> Kafka you also need to set that in Flink.
>
> [1]
> https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html
>
> On Wed, Jan 27, 2021 at 6:08 AM Rex Fenley  wrote:
>
>> Hello,
>>
>> I began reading
>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/concepts/concepts.html#parallel-dataflows
>>
>>-
>>
>>*> Redistributing* streams (between *map()* and *keyBy/window*, as
>>well as between *keyBy/window* and *sink*) change the partitioning of
>>streams. Each *operator subtask* sends data to different target
>>subtasks, depending on the selected transformation. Examples are
>>*keyBy()* (re-partitions by hash code), *broadcast()*, or
>>*rebalance()* (random redistribution). In a *redistributing*
>>exchange, order among elements is only preserved for each pair of sending-
>>and receiving task (for example subtask[1] of *map()* and subtask[2]
>>of *keyBy/window*).
>>
>> This makes it sounds like ordering on the same partition/key is always
>> maintained. Which is exactly the ordering guarantee that I need. This seems
>> to slightly contradict the statement "Flink provides no guarantees about
>> the order of the elements within a window" for keyed state. So is it true
>> that ordering _is_ guaranteed for identical keys?
>>
>> If I'm not mistaken, the state in the TableAPI is always considered keyed
>> state for a join or aggregate. Or am I missing something?
>>
>> Thanks!
>>
>> On Tue, Jan 26, 2021 at 8:53 PM Rex Fenley  wrote:
>>
>>> Our data arrives in order from Kafka, so we are hoping to use that same
>>> order for our processing.
>>>
>>> On Tue, Jan 26, 2021 at 8:40 PM Rex Fenley  wrote:
>>>
 Going further, if "Flink provides no guarantees about the order of the
 elements within a window" then with minibatch, which I assume uses a window
 under the hood, any aggregates that expect rows to arrive in order will
 fail to keep their consistency. Is this correct?

 On Tue, Jan 26, 2021 at 5:36 PM Rex Fenley  wrote:

> Hello,
>
> We have a job from CDC to a large unbounded Flink plan to
> Elasticsearch.
>
> Currently, we have been relentlessly trying to reduce our record
>

Integration with Apache AirFlow

2021-01-27 Thread Flavio Pompermaier
Hello everybody,
is there any suggested way/pointer to schedule Flink jobs using Apache
AirFlow?
What I'd like to achieve is the submission (using the REST API of AirFlow)
of 2 jobs, where the second one can be executed only if the first one
succeed.

Thanks in advance
Flavio


Flink 1.11.2 test cases fail with Scala 2.12.12

2021-01-27 Thread Sourabh Mokhasi
Hi,

We have several flink applications written with Flink 1.9.1 and
Scala 2.11.12 and we are in the process of upgrading to Flink 1.11.2 and
Scala 2.12.12. We are using maven to manage our application dependencies.

After updating the pom.xml file to use the upgraded versions of Scala and
Flink as mentioned above, all the unit tests written with Scalatest
3.0.5(We are using flatspec style) fail with the following exception.

 
org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ExecutionError:
java.lang.NoClassDefFoundError: scala/math/Ordering$$anon$9
  at
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2201)
  at
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937)
  at
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739)
  at
org.apache.flink.api.scala.typeutils.TraversableSerializer$.compileCbf(TraversableSerializer.scala:184)
  at
org.apache.flink.api.scala.typeutils.TraversableSerializer.compileCbf(TraversableSerializer.scala:51)
  at
org.apache.flink.api.scala.typeutils.TraversableSerializer.(TraversableSerializer.scala:41)

  Cause: java.lang.NoClassDefFoundError: scala/math/Ordering$$anon$9
  at
scala.tools.nsc.transform.LambdaLift$LambdaLifter.(LambdaLift.scala:67)
  at
scala.tools.nsc.transform.LambdaLift.newTransformer(LambdaLift.scala:49)
  at scala.tools.nsc.transform.Transform$Phase.apply(Transform.scala:30)
  at scala.tools.nsc.Global$GlobalPhase.applyPhase(Global.scala:441)
  at scala.tools.nsc.Global$GlobalPhase.run(Global.scala:392)
  at scala.tools.nsc.Global$Run.compileUnitsInternal(Global.scala:1467)
  at scala.tools.nsc.Global$Run.compileUnits(Global.scala:1451)
  at
scala.tools.reflect.ToolBoxFactory$ToolBoxImpl$ToolBoxGlobal.wrapInPackageAndCompile(ToolBoxFactory.scala:201)
  at
scala.tools.reflect.ToolBoxFactory$ToolBoxImpl$ToolBoxGlobal.compile(ToolBoxFactory.scala:256)
  at
scala.tools.reflect.ToolBoxFactory$ToolBoxImpl.$anonfun$compile$13(ToolBoxFactory.scala:433)
  ...
  Cause: java.lang.ClassNotFoundException: scala.math.Ordering$$anon$9
  at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
  at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
  at
scala.tools.nsc.transform.LambdaLift$LambdaLifter.(LambdaLift.scala:67)
[flink-akka.actor.default-dispatcher-3] INFO
org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService - Stop job
leader service.
  at
scala.tools.nsc.transform.LambdaLift.newTransformer(LambdaLift.scala:49)
[flink-akka.actor.default-dispatcher-3] INFO
org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager -
Shutting down TaskExecutorLocalStateStoresManager.
  at scala.tools.nsc.transform.Transform$Phase.apply(Transform.scala:30)
  at scala.tools.nsc.Global$GlobalPhase.applyPhase(Global.scala:441)
  at scala.tools.nsc.Global$GlobalPhase.run(Global.scala:392)
  at scala.tools.nsc.Global$Run.compileUnitsInternal(Global.scala:1467)


The app itself compiles without issues and this can be verified by running
mvn clean package `mvn clean package -DskipTests`

In order to troubleshoot/narrow down the issue, I upgraded the flink
package from 1.9.1 to to 1.11.2 while keeping the scala version the same
i.e 2.11.12 instead of 2.12.12 and this seems to have resolved the issue.
The app compiles and the test cases pass as well.

Is this a known compatibility issue between Flink 1.11.2 and Scala 2.12.12?

Thanks,
Sourabh


stopping with save points

2021-01-27 Thread Marco Villalobos
When I try to stop with a savepoint, I usually get the error below. I have
not been able to create a single save point. Please advise.

I am using Flink 1.11.0

Draining job "ed51084378323a7d9fb1c4c97c2657df" with a savepoint.


 The program finished with the following exception:

org.apache.flink.util.FlinkException: Could not stop with a savepoint job
"ed51084378323a7d9fb1c4c97c2657df".
at
org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:495)
at
org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:864)
at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:487)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:931)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
at
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
Caused by: java.util.concurrent.TimeoutException
at
java.base/java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1886)
at
java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2021)
at
org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:493)
... 6 more


Flink and Amazon EMR

2021-01-27 Thread Marco Villalobos
Just curious, has anybody had success with Amazon EMR with RocksDB and
checkpointing in S3?

That's the configuration I am trying to setup, but my system is running
more slowly than expected.


Re: A few questions about minibatch

2021-01-27 Thread Jark Wu
Hi Rex,

Could you share your query here? It would be helpful to identify the root
cause if we have the query.

1) watermark
The framework automatically adds a node (the MiniBatchAssigner) to generate
watermark events as the mini-batch id to broadcast and trigger mini-batch
in the pipeline.

2) MiniBatchAssigner(interval=[1000ms], mode=[ProcTime]
It generates a new mini-batch id in an interval of 1000ms in system time.
The mini-batch id is represented by the watermark event.

3) TWO_PHASE optimization
If users want to have TWO_PHASE optimization, it requires the aggregate
functions all support the merge() method and the mini-batch is enabled.

Best,
Jark




On Tue, 26 Jan 2021 at 19:01, Dawid Wysakowicz 
wrote:

> I am pulling Jark and Godfrey who are more familiar with the planner
> internals.
>
> Best,
>
> Dawid
> On 22/01/2021 20:11, Rex Fenley wrote:
>
> Hello,
>
> Does anyone have any more information here?
>
> Thanks!
>
> On Wed, Jan 20, 2021 at 9:13 PM Rex Fenley  wrote:
>
>> Hi,
>>
>> Our job was experiencing high write amplification on aggregates so we
>> decided to give mini-batch a go. There's a few things I've noticed that are
>> different from our previous job and I would like some clarification.
>>
>> 1) Our operators now say they have Watermarks. We never explicitly added
>> watermarks, and our state is essentially unbounded across all time since it
>> consumes from Debezium and reshapes our database data into another store.
>> Why does it say we have Watermarks then?
>>
>> 2) In our sources I see MiniBatchAssigner(interval=[1000ms],
>> mode=[ProcTime], what does that do?
>>
>> 3) I don't really see anything else different yet in the shape of our
>> plan even though we've turned on
>> configuration.setString(
>> "table.optimizer.agg-phase-strategy",
>> "TWO_PHASE"
>> )
>> is there a way to check that this optimization is on? We use user defined
>> aggregate functions, does it work for UDAF?
>>
>> Thanks!
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com  |  BLOG 
>>  |  FOLLOW US   |  LIKE US
>> 
>>
>
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com  |  BLOG   |
>  FOLLOW US   |  LIKE US
> 
>
>


importing types doesn't fix “could not find implicit value for evidence parameter of type …TypeInformation”

2021-01-27 Thread Devin Bost
I posted this problem on Stack Overflow here:
https://stackoverflow.com/questions/65930023/flink-importing-types-doesnt-fix-could-not-find-implicit-value-for-evidence

Basically, I can't even get a basic map to work like this:

object AmplitudeExample {
  def main(args: Array[String]) {
import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.scala._
val env = StreamExecutionEnvironment.getExecutionEnvironment

val text = 
env.readTextFile("/Users/dbost/src/amplitude-flink/example-data.json")

val partitionedEvents = text
  .map(t => t)

partitionedEvents.print()
  }
}

I get:

could not find implicit value for evidence parameter of type
org.apache.flink.api.common.typeinfo.TypeInformation[String] .map(t =>
t)

I'm using Flink 1.12.1 and Scala 2.12. What am I missing here?

Devin G. Bost


Re: Stateful Functions - accessing the state aside of normal processing

2021-01-27 Thread Tzu-Li (Gordon) Tai
Hi Stephan,

Great to hear about your experience with StateFun so far!

I think what you are looking for is a way to read StateFun checkpoints,
which are basically an immutable consistent point-in-time snapshot of all
the states across all your functions, and run some computation or simply to
explore the state values.
StateFun checkpoints are essentially adopted from Flink, so you can find
more detail about that here [1].

Currently, StateFun does provide a means for state "bootstrapping": running
a batch offline job to write and compose a StateFun checkpoint [2].
What is still missing is the "reading / analysis" side of things, to do
exactly what you described: running a separate batch offline job for
reading and processing an existing StateFun checkpoint.

Before we dive into details on how that may look like, do you think that is
what you would need?

Although I don't think we would be able to support such a feature yet since
we're currently focused on reworking the SDKs and request-reply protocol,
in any case it would be interesting to discuss if this feature would be
important for multiple users already.

Cheers,
Gordon

[1]
https://ci.apache.org/projects/flink/flink-docs-master/concepts/stateful-stream-processing.html#checkpointing
[2]
https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.2/deployment-and-operations/state-bootstrap.html

On Wed, Jan 27, 2021 at 11:41 PM Stephan Pelikan 
wrote:

> Hi,
>
>
>
> We are trying to use Statefuns for our tool and it seems to be a good fit.
> I already adopted it and it works quite well. However, we have millions of
> different states (all the same FunctionType but different ids) and each
> state consists of several @Persisted values (values and tables). We want to
> build an administration tool for examining the crowd of states (count,
> histogram, etc.) and each state in detail (the persisted-tables and
> -values).
>
>
>
> Additionally we need some kind of dig-down functionality for finding those
> individual states. For example some of those persisted values can be used
> to categorize the crowd of states.
>
>
>
> My question now is how to achieve this. Is there a way to browse and
> examine statefuns in a read-only fashion (their ids, their persisted
> values)? How can one achieve this without duplicating status in e.g. a
> relational database?
>
>
>
> Thanks,
>
> Stephan
>
>
>
> PS: I have another questions but I will send them in separate mails to
> avoid mixing up topics.
>


Re: Conflicts between the JDBC and postgresql-cdc SQL connectors

2021-01-27 Thread Jark Wu
Hi Sebastián,

I think Dawid is right.

Could you share the pom file? I also tried to
package flink-connector-postgres-cdc with ServicesResourceTransformer, and
the Factory file contains

com.alibaba.ververica.cdc.connectors.postgres.table.PostgreSQLTableFactory


Best,
Jark


On Tue, 26 Jan 2021 at 21:17, Sebastián Magrí  wrote:

> Thanks a lot for looking into it Dawid,
>
> In the
> src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
> file I only see
>
> org.apache.flink.connector.jdbc.table.JdbcDynamicTableFactory
>
> Even after applying the ServicesResourceTransformer.
>
>
> On Tue, 26 Jan 2021 at 11:58, Dawid Wysakowicz 
> wrote:
>
>> Hi,
>>
>> Unfortunately I am not familiar with the packaging of
>> flink-connector-postgres-cdc. Maybe @Jark could help here?
>>
>> However, I think the problem that you cannot find the connector is caused
>> because of lack of entry in the resulting Manifest file. If there are
>> overlapping classes maven does not exclude whole dependencies, but rather
>> picks the overlapping class from one of the two. Could you check if you see
>> entries for all tables in
>> src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory.
>>
>> If not, you could try applying the ServicesResourceTransformer[1]
>>
>> Best,
>>
>> Dawid
>>
>> [1]
>> https://maven.apache.org/plugins/maven-shade-plugin/examples/resource-transformers.html#ServicesResourceTransformer
>> On 26/01/2021 12:29, Sebastián Magrí wrote:
>>
>> Hi!
>>
>> I've reported an issue with the postgresql-cdc connector apparently
>> caused by the maven shade plugin excluding either the JDBC connector or the
>> cdc connector due to overlapping classes. The issue for reference is here:
>>
>> https://github.com/ververica/flink-cdc-connectors/issues/90
>>
>> In the meantime, however, I've been trying to figure out if I can set up
>> an exclusion rule to fix this in my pom.xml file, without success.
>>
>> The `org.postgresql:postgresql` dependency is being added manually by me
>> to have a sink on a postgresql table and injected by the cdc connector
>> seemingly via its debezium connector dependency.
>>
>> Any guidance or hints I could follow would be really appreciated.
>>
>> --
>> Sebastián Ramírez Magrí
>>
>>
>
> --
> Sebastián Ramírez Magrí
>


Re: Converting non-mini-batch to mini-batch from checkpoint or savepoint

2021-01-27 Thread Jark Wu
Hi Rex,

Currently, it is not state compatible, because we will add a new node
called MiniBatchAssigner after the source which changes the JobGraph , thus
uid is different.

Best,
Jark

On Tue, 26 Jan 2021 at 18:33, Dawid Wysakowicz 
wrote:

> I am pulling in Jark and Godfrey who are more familiar with the internals
> of the planner.
> On 21/01/2021 01:43, Rex Fenley wrote:
>
> Just tested this and I couldn't restore from a savepoint. If I do a new
> job from scratch, can I tune the minibatch parameters and restore from a
> savepoint without having to make yet another brand new job?
>
> Thanks
>
>
> On Wed, Jan 20, 2021 at 12:43 PM Rex Fenley  wrote:
>
>> Hello,
>>
>> Is it safe to convert a non-mini-batch job to a mini-batch job when
>> restoring from a checkpoint or a savepoint?
>>
>> Thanks
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com  |  BLOG 
>>  |  FOLLOW US   |  LIKE US
>> 
>>
>
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com  |  BLOG   |
>  FOLLOW US   |  LIKE US
> 
>
>


Re: A few questions about minibatch

2021-01-27 Thread Rex Fenley
Thanks, that all makes sense!

On Wed, Jan 27, 2021 at 7:00 PM Jark Wu  wrote:

> Hi Rex,
>
> Could you share your query here? It would be helpful to identify the root
> cause if we have the query.
>
> 1) watermark
> The framework automatically adds a node (the MiniBatchAssigner) to
> generate watermark events as the mini-batch id to broadcast and trigger
> mini-batch in the pipeline.
>
> 2) MiniBatchAssigner(interval=[1000ms], mode=[ProcTime]
> It generates a new mini-batch id in an interval of 1000ms in system time.
> The mini-batch id is represented by the watermark event.
>
> 3) TWO_PHASE optimization
> If users want to have TWO_PHASE optimization, it requires the aggregate
> functions all support the merge() method and the mini-batch is enabled.
>
> Best,
> Jark
>
>
>
>
> On Tue, 26 Jan 2021 at 19:01, Dawid Wysakowicz 
> wrote:
>
>> I am pulling Jark and Godfrey who are more familiar with the planner
>> internals.
>>
>> Best,
>>
>> Dawid
>> On 22/01/2021 20:11, Rex Fenley wrote:
>>
>> Hello,
>>
>> Does anyone have any more information here?
>>
>> Thanks!
>>
>> On Wed, Jan 20, 2021 at 9:13 PM Rex Fenley  wrote:
>>
>>> Hi,
>>>
>>> Our job was experiencing high write amplification on aggregates so we
>>> decided to give mini-batch a go. There's a few things I've noticed that are
>>> different from our previous job and I would like some clarification.
>>>
>>> 1) Our operators now say they have Watermarks. We never explicitly added
>>> watermarks, and our state is essentially unbounded across all time since it
>>> consumes from Debezium and reshapes our database data into another store.
>>> Why does it say we have Watermarks then?
>>>
>>> 2) In our sources I see MiniBatchAssigner(interval=[1000ms],
>>> mode=[ProcTime], what does that do?
>>>
>>> 3) I don't really see anything else different yet in the shape of our
>>> plan even though we've turned on
>>> configuration.setString(
>>> "table.optimizer.agg-phase-strategy",
>>> "TWO_PHASE"
>>> )
>>> is there a way to check that this optimization is on? We use user
>>> defined aggregate functions, does it work for UDAF?
>>>
>>> Thanks!
>>>
>>> --
>>>
>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>
>>>
>>> Remind.com  |  BLOG 
>>>  |  FOLLOW US   |  LIKE US
>>> 
>>>
>>
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com  |  BLOG 
>>  |  FOLLOW US   |  LIKE US
>> 
>>
>>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com  |  BLOG 
 |  FOLLOW
US   |  LIKE US



Re: Converting non-mini-batch to mini-batch from checkpoint or savepoint

2021-01-27 Thread Rex Fenley
Thanks for the clarification.

On Wed, Jan 27, 2021 at 7:24 PM Jark Wu  wrote:

> Hi Rex,
>
> Currently, it is not state compatible, because we will add a new node
> called MiniBatchAssigner after the source which changes the JobGraph , thus
> uid is different.
>
> Best,
> Jark
>
> On Tue, 26 Jan 2021 at 18:33, Dawid Wysakowicz 
> wrote:
>
>> I am pulling in Jark and Godfrey who are more familiar with the internals
>> of the planner.
>> On 21/01/2021 01:43, Rex Fenley wrote:
>>
>> Just tested this and I couldn't restore from a savepoint. If I do a new
>> job from scratch, can I tune the minibatch parameters and restore from a
>> savepoint without having to make yet another brand new job?
>>
>> Thanks
>>
>>
>> On Wed, Jan 20, 2021 at 12:43 PM Rex Fenley  wrote:
>>
>>> Hello,
>>>
>>> Is it safe to convert a non-mini-batch job to a mini-batch job when
>>> restoring from a checkpoint or a savepoint?
>>>
>>> Thanks
>>>
>>> --
>>>
>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>
>>>
>>> Remind.com  |  BLOG 
>>>  |  FOLLOW US   |  LIKE US
>>> 
>>>
>>
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com  |  BLOG 
>>  |  FOLLOW US   |  LIKE US
>> 
>>
>>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com  |  BLOG 
 |  FOLLOW
US   |  LIKE US



Is Flink able to parse strings into dynamic JSON?

2021-01-27 Thread Devin Bost
I'm wanting to know if it's possible in Flink to parse strings into a
dynamic JSON object that doesn't require me to know the primitive type
details at compile time.
We have over 300 event types to process, and I need a way to load the types
at runtime. I only need to know if certain fields exist on the incoming
objects, and the object schemas are all different except for certain
fields.
Every example I can find shows Flink users specifying the full type
information at compile time, but there's no way this will scale.

It's possible for us to lookup primitive type details at runtime from JSON,
but I'll still need a way to process that JSON in Flink to extract the
metadata if it's required. So, that brings me back to the original issue.

How can I do this in Flink?

--
Devin G. Bost


Re: Deduplicating record amplification

2021-01-27 Thread Rex Fenley
In addition to those questions, assuming that keyed streams are in order,
I've come up with the following solution to compact our records and only
pick the most recent one per id before sending to the ES sink.

The first item in the Row is the document ID / primary key which we want to
compact records on.

val userDocsStream = userDocsTable.toRetractStream[Row].keyBy(_._2.get(0))
userDocsStream
  .window(TumblingEventTimeWindows.of(Time.seconds(1)))
  .aggregate(new CompactionAggregate())class CompactionAggregate
extends AggregateFunction[
  (Boolean, Row),
  (Boolean, Row),
  (Boolean, Row)
] {  override def createAccumulator() = (false, null)  // Just
take the latest value to compact.
  override def add(value: (Boolean, Row), accumulator: (Boolean, Row)) =
value  override def getResult(accumulator: (Boolean, Row)) =
accumulator  // This is a required function that we don't use.
  override def merge(a: (Boolean, Row), b: (Boolean, Row)) =
throw new NotImplementedException()
}

I'm hoping that if the last record in the window is an insert it picks that
if it's a retract then it picks that and then when we send this to the ES
sink we will simply check true or false in the first element of the tuple
for an insert or delete request to ES. Does this seem like it will work?

Thanks!


On Wed, Jan 27, 2021 at 12:45 PM Rex Fenley  wrote:

> This is great info, thanks!
>
> My question then becomes, what constitutes a random shuffle? Currently
> we're using the Table API with minibatch on flink v1.11.3. Do our joins
> output a keyed stream of records by join key or is this random? I imagine
> that they'd have to have a key for retracts and accumulates to arrive in
> order on the next downstream operator. Same with aggs but on the groupBy
> key.
>
> Does this sound correct to you?
>
> Thanks!
>
> On Wed, Jan 27, 2021 at 11:39 AM Arvid Heise  wrote:
>
>> Hi Rex,
>>
>> indeed these two statements look like they contradict each other, but
>> they are looking at both sides from the same coin.
>> Flink is simply putting records in FIFO in windows. That is, there is no
>> ordering on event time if there are late events. So if your elements arrive
>> ordered, the ordering is retained. If your elements arrive unordered, the
>> same unordered order is retained.
>>
>> However, note that Flink can only guarantee FIFO according to your
>> topology. Consider a source with parallelism 2, each reading data from an
>> ordered kafka partition (k1, k2) respectively. Each partition has records
>> with keys, such that no key appears in both partitions (default behavior if
>> you set keys but no partition while writing to Kafka).
>> 1) Let's assume you do a simple transformation and write them back into
>> kafka with the same key. Then you can be sure that the order of the records
>> is retained.
>>
>> 2) Now you add a random shuffle and have the transformation. Now two
>> successive records may be processed in parallel and there is a race
>> condition who is written first into Kafka. So order is not retained.
>>
>> 3) You shuffle both partitions by the Kafka key (keyby) and do some
>> aggregation. Two successive records with the same key will always be
>> processed by the same aggregation operator. So the order is retained for
>> each key (note that this is what you usually want and want Kafka gives you
>> if you don't set the partition explicitly and just provide a key)
>>
>> 4) You shuffle both partitions by a different key. Then two successive
>> Kafka records could be again calculated in parallel such that there is a
>> race condition.
>>
>> Note that windows are a kind of aggregation.
>>
>> So Flink is never going to restore an ordering that is not there (because
>> it's too costly and there are too many unknowns). But you can infer the
>> guarantees by analyzing your topology.
>>
>> ---
>>
>> Please note that there is a common pitfall when you work with Kafka:
>> - Ordering of records in Kafka is only guaranteed if you set 
>> *max.in.flight.requests.per.connection
>> *to 1
>> *. [1]*
>> *- *Often you also want to set *enable.idempotence* and *acks=all*
>>
>> That is true for the upstream application and if you plan back to write
>> to Kafka you also need to set that in Flink.
>>
>> [1]
>> https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html
>>
>> On Wed, Jan 27, 2021 at 6:08 AM Rex Fenley  wrote:
>>
>>> Hello,
>>>
>>> I began reading
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/concepts/concepts.html#parallel-dataflows
>>>
>>>-
>>>
>>>*> Redistributing* streams (between *map()* and *keyBy/window*, as
>>>well as between *keyBy/window* and *sink*) change the partitioning
>>>of streams. Each *operator subtask* sends data to different target
>>>subtasks, depending on the selected transformation. Examples are
>>>*keyBy()* (re-partitions by hash code), *broadcast()*, or
>>>*rebalance()* (random redistribution).

Re: JobManager seems to be leaking temporary jar files

2021-01-27 Thread Maciek Próchniak

Hi Chesnay,

thanks for reply. I wonder if FLINK-21164 will help without FLINK-9844 - 
if the jar file is not closed, it won't be successfully deleted?


As for FLINK-9844 - I understand that having code like

if (userClassLoader instanceof Closeable) { ((Closeable) 
userClassloader).close() }


is too "dirty trick" to be considered?


thanks,

maciek


On 27.01.2021 13:00, Chesnay Schepler wrote:
The problem of submitted jar files not being closed is a known one: 
https://issues.apache.org/jira/browse/FLINK-9844

IIRC it's not exactly trivial to fix since class-loading is involved.
It's not strictly related to the REST API; it also occurs in the CLI 
but is less noticeable since jars are usually not deleted.


As for the issue with deleteExtractedLibraries, Maciek is generally on 
a good track.
The explicit delete call is indeed missing. The best place to put is 
probably JarRunHandler#handleRequest, within handle after the job was run.

A similar issue also exists in the JarPlanHandler.

I've opened https://issues.apache.org/jira/browse/FLINK-21164 to fix 
this issue.


On 1/26/2021 12:21 PM, Maciek Próchniak wrote:


Hi Matthias,

I think the problem lies somewhere in JarRunHandler, as this is the 
place where the files are created.


I think these are not the files that are managed via BlobService, as 
they are not stored in BlobService folders (I made experiment 
changing default BlobServer folders).


It seems to me that CliFrontend deletes those files explicitly:

https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java#L250

whereas I couldn't find such invocation in JarRunHandler (not 
deleting those files does not fully explain leak on heap though...)



thanks,

maciek

On 26.01.2021 11:16, Matthias Pohl wrote:

Hi Maciek,
my understanding is that the jars in the JobManager should be 
cleaned up after the job is terminated (I assume that your jobs 
successfully finished). The jars are managed by the BlobService. The 
dispatcher will trigger the jobCleanup in [1] after job termination. 
Are there any suspicious log messages that might indicate an issue?

I'm adding Chesnay to this thread as he might have more insights here.

[1] 
https://github.com/apache/flink/blob/2c4e0ab921ccfaf003073ee50faeae4d4e4f4c93/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L797 



On Mon, Jan 25, 2021 at 8:37 PM Maciek Próchniak > wrote:


Hello,

in our setup we have:

- Flink 1.11.2

- job submission via REST API (first we upload jar, then we submit
multiple jobs with it)

- additional jars embedded in lib directory of main jar (this is
crucial
part)

When we submit jobs this way, Flink creates new temp jar files via
PackagedProgram.extractContainedLibraries method.

We observe that they are not removed after job finishes - it
seems that
PackagedProgram.deleteExtractedLibraries is not invoked when
using REST
API.

What's more, it seems that those jars remain open in JobManager
process.
We observe that when we delete them manually via scripts, the
disk space
is not reclaimed until process is restarted, we also see via
heap dump
inspection that java.util.zip.ZipFile$Source  objects remain,
pointing
to those files. This is quite a problem for us, as we submit
quite a few
jobs, and after a while we ran out of either heap or disk space on
JobManager process/host. Unfortunately, I cannot so far find
where this
leak would happen...

Does anybody have some pointers where we can search? Or how to
fix this
behaviour?


thanks,

maciek





Re: Difference between table.exec.source.idle-timeout and setIdleStateRetentionTime ?

2021-01-27 Thread Dcosta, Agnelo (HBO)
Hi Dawid,
Thanks for the tip on time constraint. We are using within in our 
MATCH_RECOGNIZE clause. It set to 3 minutes.
Increase in checkpoint size problem still persists.

Thanks for adding comments to FLINK-15160. I will take a look at changes you 
suggested.

P.S. :
I initially meant to ask what is the difference between
table.exec.state.ttl 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/config.html#table-exec-state-ttl

And
setIdleStateRetentionTime: 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/query_configuration.html#idle-state-retention-time
And if table.exec.state.ttl makes any difference to match_recognize state ?

From: Dawid Wysakowicz 
Date: Wednesday, January 27, 2021 at 12:41 AM
To: Dcosta, Agnelo (HBO) , user@flink.apache.org 

Subject: Re: Difference between table.exec.source.idle-timeout and 
setIdleStateRetentionTime ?
**External Email received from: dwysakow...@apache.org **


Hey,

As for the MATCH_RECOGNIZE clause, I highly recommend applying a time 
constraint[1]. The idle state retention time does not apply to the 
MATCH_RECOGNIZE, but you can think of the time constraint as something similar, 
but it is closer to the actual query logic.

If you are hitting FLINK-15160 unfortunately I don't have a good solution for 
it. The only thing that comes to my mind is adding a heartbeat event to the 
event stream to prune the partial matches, but I understand it is quite 
invasive.

If you would be willing to help fixing the problem in FLINK, I could also help 
review it and give pointers how it could be done.

Best,

Dawid

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/match_recognize.html#time-constraint
On 26/01/2021 17:39, Dcosta, Agnelo (HBO) wrote:
Hi Dawid, thanks for the clarification and it helps a lot.
Reply to couple of points :

what is causing the state to grow?
We are using flink SQL and have 5 pattern match queries , 3 group by tumble 
windows. State growth over time is primarily coming from pattern match queries.

Is it ever growing keyspace?
Yes. By design our keyspace is ever growing. The expectation is that messages 
for one key will come in for couple of hours, then stop coming in. We would 
never see messages from that key again. New keys are constantly coming in.

Is it that a watermark does not progress?
Watermark on the subtask level is constantly updating and is in sync with other 
subtasks. We have not seen any issues with watermark updating as such.

Looking through mailing list archive, our problem seems similar to
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Memory-in-CEP-queries-keep-increasing-td31045.html
https://issues.apache.org/jira/browse/FLINK-15160 : Clean up is not applied if 
there are no incoming events for a key.

By design we can have partial matched states/matches in pattern match queries. 
And key space is such that no new event comes in for those partial matches.

thanks.

From: Dawid Wysakowicz 
Date: Tuesday, January 26, 2021 at 3:14 AM
To: Dcosta, Agnelo (HBO) , 
user@flink.apache.org 

Subject: Re: Difference between table.exec.source.idle-timeout and 
setIdleStateRetentionTime ?
**External Email received from: 
dwysakow...@apache.org **


Hi,

The difference is that the table.exec.source.idle-timeout is used for dealing 
with source idleness[1]. It is a problem that a watermark cannot advance if 
some of the partition become idle (do not produce any records). Watermark is 
always the minimum of watermarks of all input partitions. The setting makes 
flink ignore certain partitions in the calculation after the time threshold is 
reached.

The IdleStateRetention is Table API specific. As described in the link you 
provided it removes entries from a state for keys that were not seen for a 
given time threshold.

As for your issue, I'd recommend first investigating what is causing the state 
to grow. Is it ever growing keyspace? Is it that a watermark does not progress 
(this should manifest in results as well). Or is it something else.

Best,

Dawid



[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/event_timestamps_watermarks.html#dealing-with-idle-sources
On 25/01/2021 20:12, Dcosta, Agnelo (HBO) wrote:

Hi,

What is the difference between table.exec.source.idle-timeout and 
setIdleStateRetentionTime ?

table.exec.source.idle-timeout: 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/config.html#table-exec-source-idle-timeout



setIdleStateRetentionTime: 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/query_configuration.html#idle-state-retention-time



Some context:
Hi we are using flink 1.12.
Our checkpoint size is constantly increasing once app is deployed.
After performing a restart, checkpoint size 

Re: memory tuning

2021-01-27 Thread Matthias Pohl
Thanks for sharing the logs. The configuration looks fine. Have you
analyzed the memory usage?

On Tue, Jan 26, 2021 at 5:02 PM Marco Villalobos 
wrote:

> Yes, I will do that.
>
> PRODUCTION
>
> 2021-01-26 04:03:50,804 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
> [] -
> 
> 2021-01-26 04:03:50,807 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
> [] -  Starting YARN TaskExecutor runner (Version: 1.11.0,
> Scala: 2.12, Rev:d04872d, Date:2020-06-29T16:13:14+02:00)
> 2021-01-26 04:03:50,807 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
> [] -  OS current user: yarn
> 2021-01-26 04:03:50,937 WARN  org.apache.hadoop.util.NativeCodeLoader
>  [] - Unable to load native-hadoop library for your
> platform... using builtin-java classes where applicable
> 2021-01-26 04:03:50,987 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
> [] -  Current Hadoop/Kerberos user: hadoop
> 2021-01-26 04:03:50,987 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
> [] -  JVM: OpenJDK 64-Bit Server VM - Amazon.com Inc. -
> 1.8/25.252-b09
> 2021-01-26 04:03:50,987 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
> [] -  Maximum heap size: 3289 MiBytes
> 2021-01-26 04:03:50,988 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
> [] -  JAVA_HOME: /etc/alternatives/jre
> 2021-01-26 04:03:50,989 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
> [] -  Hadoop version: 3.2.1
> 2021-01-26 04:03:50,989 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
> [] -  JVM Options:
> 2021-01-26 04:03:50,989 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
> [] - -Xmx3597035049
> 2021-01-26 04:03:50,989 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
> [] - -Xms3597035049
> 2021-01-26 04:03:50,989 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
> [] - -XX:MaxDirectMemorySize=880468305
> 2021-01-26 04:03:50,989 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
> [] - -XX:MaxMetaspaceSize=268435456
> 2021-01-26 04:03:50,989 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
> [] -
> -Dlog.file=/var/log/hadoop-yarn/containers/application_1611280261341_0015/container_1611280261341_0015_01_04/taskmanager.log
> 2021-01-26 04:03:50,989 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
> [] - -Dlog4j.configuration=file:./log4j.properties
> 2021-01-26 04:03:50,989 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
> [] - -Dlog4j.configurationFile=file:./log4j.properties
> 2021-01-26 04:03:50,989 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
> [] -  Program Arguments:
> 2021-01-26 04:03:50,989 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
> [] - -D
> 2021-01-26 04:03:50,989 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
> [] -
> taskmanager.memory.framework.off-heap.size=134217728b
> 2021-01-26 04:03:50,989 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
> [] - -D
> 2021-01-26 04:03:50,990 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
> [] - taskmanager.memory.network.max=746250577b
> 2021-01-26 04:03:50,990 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
> [] - -D
> 2021-01-26 04:03:50,990 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
> [] - taskmanager.memory.network.min=746250577b
> 2021-01-26 04:03:50,990 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
> [] - -D
> 2021-01-26 04:03:50,990 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
> [] - taskmanager.memory.framework.heap.size=134217728b
> 2021-01-26 04:03:50,990 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
> [] - -D
> 2021-01-26 04:03:50,990 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
> [] - taskmanager.memory.managed.size=2985002310b
> 2021-01-26 04:03:50,990 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
> [] - -D
> 2021-01-26 04:03:50,990 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
> [] - taskmanager.cpu.cores=1.0
> 2021-01-26 04:03:50,990 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
> [] - -D
> 2021-01-26 04:03:50,990 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
> [] - taskmanager.memory.task.heap.size=3462817321b
> 2021-01-26 04:03:50,990 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
> [] - -D
> 2021-01-26 04:03:50,990 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
> [] - taskmanager.memory.task.off-heap.size=0b
> 2021-01-26 04:03:50,990 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner
> 

Re: Deduplicating record amplification

2021-01-27 Thread Arvid Heise
Hi Rex,

if your keyby (and with join/grouping/windowing) is random or not depends
on the relationship of the join/grouping key with your Kafka partitioning
key.

Say your partitioning key is document_id. Then, any join/grouping key that
is composed of (or is exactly) document_id, will retain the order. You
should always ask yourself the question: can two records coming from the
ordered Kafka partition X be processed by two different operator instances.
For a join/grouping operator, there is only the strict guarantee that all
records with the same key will be shuffled into the same operator instance.

Your compaction in general looks good but I'm not deep into Table API. I'm
quite sure that *FIRST_VALUE* and *LAST_VALUE* functions in Table API
should already do what you want. [1]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/functions/systemFunctions.html#aggregate-functions

On Thu, Jan 28, 2021 at 6:45 AM Rex Fenley  wrote:

> In addition to those questions, assuming that keyed streams are in order,
> I've come up with the following solution to compact our records and only
> pick the most recent one per id before sending to the ES sink.
>
> The first item in the Row is the document ID / primary key which we want
> to compact records on.
>
> val userDocsStream = userDocsTable.toRetractStream[Row].keyBy(_._2.get(0))
> userDocsStream
>   .window(TumblingEventTimeWindows.of(Time.seconds(1)))
>   .aggregate(new CompactionAggregate())class CompactionAggregate
> extends AggregateFunction[
>   (Boolean, Row),
>   (Boolean, Row),
>   (Boolean, Row)
> ] {  override def createAccumulator() = (false, null)  // Just take the 
> latest value to compact.
>   override def add(value: (Boolean, Row), accumulator: (Boolean, Row)) =
> value  override def getResult(accumulator: (Boolean, Row)) = accumulator  
> // This is a required function that we don't use.
>   override def merge(a: (Boolean, Row), b: (Boolean, Row)) =
> throw new NotImplementedException()
> }
>
> I'm hoping that if the last record in the window is an insert it picks
> that if it's a retract then it picks that and then when we send this to the
> ES sink we will simply check true or false in the first element of the
> tuple for an insert or delete request to ES. Does this seem like it will
> work?
>
> Thanks!
>
>
> On Wed, Jan 27, 2021 at 12:45 PM Rex Fenley  wrote:
>
>> This is great info, thanks!
>>
>> My question then becomes, what constitutes a random shuffle? Currently
>> we're using the Table API with minibatch on flink v1.11.3. Do our joins
>> output a keyed stream of records by join key or is this random? I imagine
>> that they'd have to have a key for retracts and accumulates to arrive in
>> order on the next downstream operator. Same with aggs but on the groupBy
>> key.
>>
>> Does this sound correct to you?
>>
>> Thanks!
>>
>> On Wed, Jan 27, 2021 at 11:39 AM Arvid Heise  wrote:
>>
>>> Hi Rex,
>>>
>>> indeed these two statements look like they contradict each other, but
>>> they are looking at both sides from the same coin.
>>> Flink is simply putting records in FIFO in windows. That is, there is no
>>> ordering on event time if there are late events. So if your elements arrive
>>> ordered, the ordering is retained. If your elements arrive unordered, the
>>> same unordered order is retained.
>>>
>>> However, note that Flink can only guarantee FIFO according to your
>>> topology. Consider a source with parallelism 2, each reading data from an
>>> ordered kafka partition (k1, k2) respectively. Each partition has records
>>> with keys, such that no key appears in both partitions (default behavior if
>>> you set keys but no partition while writing to Kafka).
>>> 1) Let's assume you do a simple transformation and write them back into
>>> kafka with the same key. Then you can be sure that the order of the records
>>> is retained.
>>>
>>> 2) Now you add a random shuffle and have the transformation. Now two
>>> successive records may be processed in parallel and there is a race
>>> condition who is written first into Kafka. So order is not retained.
>>>
>>> 3) You shuffle both partitions by the Kafka key (keyby) and do some
>>> aggregation. Two successive records with the same key will always be
>>> processed by the same aggregation operator. So the order is retained for
>>> each key (note that this is what you usually want and want Kafka gives you
>>> if you don't set the partition explicitly and just provide a key)
>>>
>>> 4) You shuffle both partitions by a different key. Then two successive
>>> Kafka records could be again calculated in parallel such that there is a
>>> race condition.
>>>
>>> Note that windows are a kind of aggregation.
>>>
>>> So Flink is never going to restore an ordering that is not there
>>> (because it's too costly and there are too many unknowns). But you can
>>> infer the guarantees by analyzing your topology.
>>>
>>> ---
>>>
>>> Please note that there is a

Re: Deduplicating record amplification

2021-01-27 Thread Rex Fenley
Ok, that sounds like it confirms my expectations.

So I tried running my above code and had to slightly edit to using java
Tuple2 because our execution environment stuff is all in Java.

class CompactionAggregate
extends AggregateFunction[
Tuple2[java.lang.Boolean, Row],
Tuple2[java.lang.Boolean, Row],
Tuple2[java.lang.Boolean, Row]
] {

override def createAccumulator() = new Tuple2(false, null)

// Just take the lastest value to compact.
override def add(
value: Tuple2[java.lang.Boolean, Row],
accumulator: Tuple2[java.lang.Boolean, Row]
) =
value

override def getResult(accumulator: Tuple2[java.lang.Boolean, Row]) =
accumulator

// This is a required function that we don't use.
override def merge(
a: Tuple2[java.lang.Boolean, Row],
b: Tuple2[java.lang.Boolean, Row]
) =
throw new NotImplementedException()
}

But when running I get the following error:
>Caused by: java.lang.RuntimeException: Could not extract key from
[redacted row]
>...
> Caused by: org.apache.flink.table.api.ValidationException: Unsupported
kind 'DELETE' of a row [redacted row]. Only rows with 'INSERT' kind are
supported when converting to an expression.

I'm googling around and haven't found anything informative about what might
be causing this issue. Any ideas?

I'll also take a look at the SQL functions you suggested and see if I can
use those.

Thanks!



On Wed, Jan 27, 2021 at 11:48 PM Arvid Heise  wrote:

> Hi Rex,
>
> if your keyby (and with join/grouping/windowing) is random or not depends
> on the relationship of the join/grouping key with your Kafka partitioning
> key.
>
> Say your partitioning key is document_id. Then, any join/grouping key that
> is composed of (or is exactly) document_id, will retain the order. You
> should always ask yourself the question: can two records coming from the
> ordered Kafka partition X be processed by two different operator instances.
> For a join/grouping operator, there is only the strict guarantee that all
> records with the same key will be shuffled into the same operator instance.
>
> Your compaction in general looks good but I'm not deep into Table API. I'm
> quite sure that *FIRST_VALUE* and *LAST_VALUE* functions in Table API
> should already do what you want. [1]
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/functions/systemFunctions.html#aggregate-functions
>
> On Thu, Jan 28, 2021 at 6:45 AM Rex Fenley  wrote:
>
>> In addition to those questions, assuming that keyed streams are in order,
>> I've come up with the following solution to compact our records and only
>> pick the most recent one per id before sending to the ES sink.
>>
>> The first item in the Row is the document ID / primary key which we want
>> to compact records on.
>>
>> val userDocsStream = userDocsTable.toRetractStream[Row].keyBy(_._2.get(0))
>> userDocsStream
>>   .window(TumblingEventTimeWindows.of(Time.seconds(1)))
>>   .aggregate(new CompactionAggregate())class CompactionAggregate
>> extends AggregateFunction[
>>   (Boolean, Row),
>>   (Boolean, Row),
>>   (Boolean, Row)
>> ] {  override def createAccumulator() = (false, null)  // Just take the 
>> latest value to compact.
>>   override def add(value: (Boolean, Row), accumulator: (Boolean, Row)) =
>> value  override def getResult(accumulator: (Boolean, Row)) = accumulator 
>>  // This is a required function that we don't use.
>>   override def merge(a: (Boolean, Row), b: (Boolean, Row)) =
>> throw new NotImplementedException()
>> }
>>
>> I'm hoping that if the last record in the window is an insert it picks
>> that if it's a retract then it picks that and then when we send this to the
>> ES sink we will simply check true or false in the first element of the
>> tuple for an insert or delete request to ES. Does this seem like it will
>> work?
>>
>> Thanks!
>>
>>
>> On Wed, Jan 27, 2021 at 12:45 PM Rex Fenley  wrote:
>>
>>> This is great info, thanks!
>>>
>>> My question then becomes, what constitutes a random shuffle? Currently
>>> we're using the Table API with minibatch on flink v1.11.3. Do our joins
>>> output a keyed stream of records by join key or is this random? I imagine
>>> that they'd have to have a key for retracts and accumulates to arrive in
>>> order on the next downstream operator. Same with aggs but on the groupBy
>>> key.
>>>
>>> Does this sound correct to you?
>>>
>>> Thanks!
>>>
>>> On Wed, Jan 27, 2021 at 11:39 AM Arvid Heise  wrote:
>>>
 Hi Rex,

 indeed these two statements look like they contradict each other, but
 they are looking at both sides from the same coin.
 Flink is simply putting records in FIFO in windows. That is, there is
 no ordering on event time if there are late events. So if your elements
 arrive ordered, the ordering is retained. If your elements arrive
 unordered, the same unordered order is retained.

 However, note that Flink can only guarantee FIFO according to your
 topology. Consider a source with par