Re: Is outputting from components other than sinks or side outputs a no-no ?

2020-07-27 Thread Stephen Connolly
I am not 100% certain that David is talking about the same pattern of usage 
that you are Tom.

David, the pattern Tom is talking about is something like this...

try {
  do something with record
} catch (SomeException e) {
  push record to DLQ
}

My concern is that if we have a different failure, or even a restart from 
checkpoint because say the task manager OOM'd or was killed... now the record 
is replayed... and this time the "do something with record" succeeded... but 
it's still on the DLQ from last time

If the DLQ is a flink native output that pushes to an exactly-once sink then 
you do not have that issue. When you roll the side-output behind flinks back... 
then you have to take all those potentials into account which significantly 
complicates the code

On 2020/07/27 07:45:27, Tom Fennelly  wrote: 
> Thank you David.
> 
> In the case we have in mind it should only happen literally on the very
> rare Exception i.e. in some cases if somehow an uncaught exception occurs,
> we want to send the record to a DLQ and handle the retry manually Vs
> checkpointing and restarting.
> 
> Regards,
> 
> Tom.
> 
> 
> On Sun, Jul 26, 2020 at 1:14 PM David Anderson 
> wrote:
> 
> > Every job is required to have a sink, but there's no requirement that all
> > output be done via sinks. It's not uncommon, and doesn't have to cause
> > problems, to have other operators that do I/O.
> >
> > What can be problematic, however, is doing blocking I/O. While your user
> > function is blocked, the function will exert back pressure, and checkpoint
> > barriers will be unable to make any progress. This sometimes leads to
> > checkpoint timeouts and job failures. So it's recommended to make any I/O
> > you do asynchronous, using an AsyncFunction [1] or something similar.
> >
> > Note that the asynchronous i/o function stores the records for in-flight
> > asynchronous requests in checkpoints, and restores/re-triggers the requests
> > when recovering from a failure. This might lead to duplicate results if you
> > are using it to do non-idempotent database writes. If you need
> > transactions, use a sink that offers them.
> >
> > [1]
> > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html
> > 
> >
> > Best,
> > David
> >
> > On Sun, Jul 26, 2020 at 11:08 AM Tom Fennelly 
> > wrote:
> >
> >> Hi.
> >>
> >> What are the negative side effects of (for example) a filter function
> >> occasionally making a call out to a DB ? Is this a big no-no and should all
> >> outputs be done through sinks and side outputs, no exceptions ?
> >>
> >> Regards,
> >>
> >> Tom.
> >>
> >
> 


Re: Running Apache Flink on the GraalVM as a Native Image

2020-06-28 Thread Stephen Connolly
On Sun 28 Jun 2020 at 01:34, Stephen Connolly <
stephen.alan.conno...@gmail.com> wrote:

>
>
> On Thu 25 Jun 2020 at 12:48, ivo.kn...@t-online.de 
> wrote:
>
>> Whats up guys,
>>
>>
>>
>> I'm trying to run an Apache Flink Application with the GraalVM Native
>> Image but I get the following error: (check attached file)
>>
>>
>>
>> I suppose this happens, because Flink uses a lot of low-level-code and is
>> highly optimized.
>>
>
> Actually I suspect the reason is that Flink uses dynamic classloading.
>
> GraalVM requires all the code available in order to produce a native image.
>
> You’d need to pre-bind the topology you want Flink to run into the native
> image.
>
> More fun, you’ll actually need two images, one for the job manager and one
> for the task manager.
>
> And you’ll need to convince GraalVM that the entry-point is your topology
> needs reflection support enabled... plus whatever other classes use
> reflection in Flink.
>
> Sounds rather complex to me. If native images are what is important to
> you, there seemed to be a strong contender in the Rust language community,
> didn’t provide as strong management as Flink, and you’d probably have more
> work managing things like checkpointing, but if native code is important
> that’s where I’d be looking. Sadly I cannot remember the name and my
> google-foo is weak tonight
>

I think it might have been
https://github.com/grippy/tempest but that looks less actively developed
than the one I thought I saw...

I’d also check out frameworks for Go even if I dislike Go... if you want
native code it’s either Rust or Go in my book


>
>>
>> When I googled the combination of GraalVM Native Image and Apache Flink I
>> get no results.
>>
>>
>>
>> Did anyone ever succeeded in making it work and how?
>>
>>
>>
>> Best regards,
>>
>>
>>
>> Ivo
>> 
>>
> --
> Sent from my phone
>
-- 
Sent from my phone


Re: Running Apache Flink on the GraalVM as a Native Image

2020-06-27 Thread Stephen Connolly
On Thu 25 Jun 2020 at 12:48, ivo.kn...@t-online.de 
wrote:

> Whats up guys,
>
>
>
> I'm trying to run an Apache Flink Application with the GraalVM Native
> Image but I get the following error: (check attached file)
>
>
>
> I suppose this happens, because Flink uses a lot of low-level-code and is
> highly optimized.
>

Actually I suspect the reason is that Flink uses dynamic classloading.

GraalVM requires all the code available in order to produce a native image.

You’d need to pre-bind the topology you want Flink to run into the native
image.

More fun, you’ll actually need two images, one for the job manager and one
for the task manager.

And you’ll need to convince GraalVM that the entry-point is your topology
needs reflection support enabled... plus whatever other classes use
reflection in Flink.

Sounds rather complex to me. If native images are what is important to you,
there seemed to be a strong contender in the Rust language community,
didn’t provide as strong management as Flink, and you’d probably have more
work managing things like checkpointing, but if native code is important
that’s where I’d be looking. Sadly I cannot remember the name and my
google-foo is weak tonight


>
> When I googled the combination of GraalVM Native Image and Apache Flink I
> get no results.
>
>
>
> Did anyone ever succeeded in making it work and how?
>
>
>
> Best regards,
>
>
>
> Ivo
> 
>
-- 
Sent from my phone


Two questions about Async

2020-04-21 Thread Stephen Connolly
1. On Flink 1.10 when I look at the topology overview, the AsyncFunctions
show non-zero values for Bytes Received; Records Received; Bytes Sent but
Records Sent is always 0... yet the next step in the topology shows approx
the same Bytes Received as the async sent (modulo minor delays) and a
non-zero Records Received. Is the “Records Sent of an AsyncFunction is
always displayed as zero” a bug?

2. Is there an Async Sink? Or do I just rewrite my Sink as an AsyncFunction
followed by a dummy sink? What’s the recommendation if the sink performing
blocking I/O is proven to be the root cause of back pressure?

Thanks in advance for your help
-- 
Sent from my phone


Upgrading Flink

2020-04-06 Thread Stephen Connolly
Quick questions on upgrading Flink.

All our jobs are compiled against Flink 1.8.x

We are planning to upgrade to 1.10.x

1. Is the recommended path to upgrade one minor at a time, i.e. 1.8.x ->
1.9.x and then 1.9.x -> 1.10.x as a second step or is the big jump
supported, i.e. 1.8.x -> 1.10.x in one change

2. Do we need to recompile the jobs against the newer Flink version before
upgrading? Coordinating multiple teams can be tricky, so - short of
spinning up a second flink cluster - our continuous deployment
infrastructure will try to deploy the topologies compiled against 1.8.x for
an hour or two after we have upgraded the cluster


How to debug checkpoints failing to complete

2020-03-23 Thread Stephen Connolly
We have a topology and the checkpoints fail to complete a *lot* of the time.

Typically it is just one subtask that fails.

We have a parallelism of 2 on this topology at present and the other
subtask will complete in 3ms though the end to end duration on the rare
times when the checkpointing completes is like 4m30

How can I start debugging this? When I run locally on my development
cluster I have no issues, the issues only seem to show in production.


Re: Rescaling a running topology

2020-02-07 Thread Stephen Connolly
Ooooh more fun... If I rescale down a job, the job's config at
jobs/{jobid}/config does not reflect the new parallelism (there may not
even be any way to detect such a parallelism change)... but more critically
the job is now unstoppable and seems to end up stuck in the CANCELLING
state for some time (I gave up waiting)

On Fri, 7 Feb 2020 at 11:54, Stephen Connolly <
stephen.alan.conno...@gmail.com> wrote:

> And now the job is stuck in a suspended state and I seem to have no way to
> get it out of that state again!
>
> On Fri, 7 Feb 2020 at 11:50, Stephen Connolly <
> stephen.alan.conno...@gmail.com> wrote:
>
>> The plot thickens... I was able to rescale down... just not back up
>> again!!!
>>
>> root@flink-jobmanager-64bcfdf799-jhs7p:/opt/flink# bin/flink list -m
>> localhost:8081
>> Waiting for response...
>> -- Running/Restarting Jobs ---
>> 07.02.2020 11:26:33 : ebc20a700c334f61ea03ecdf3d8939ca : Test topology
>> (RUNNING)
>> --
>> No scheduled jobs.
>> root@flink-jobmanager-64bcfdf799-jhs7p:/opt/flink# bin/flink modify -m
>> localhost:8081 ebc20a700c334f61ea03ecdf3d8939ca -p 1
>> Modify job ebc20a700c334f61ea03ecdf3d8939ca.
>> Rescaled job ebc20a700c334f61ea03ecdf3d8939ca. Its new parallelism is 1.
>> root@flink-jobmanager-64bcfdf799-jhs7p:/opt/flink# bin/flink modify -m
>> localhost:8081 ebc20a700c334f61ea03ecdf3d8939ca -p 2
>> Modify job ebc20a700c334f61ea03ecdf3d8939ca.
>>
>> 
>>  The program finished with the following exception:
>>
>> org.apache.flink.util.FlinkException: Could not rescale job
>> ebc20a700c334f61ea03ecdf3d8939ca.
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$modify$10(CliFrontend.java:799)
>> at
>> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:985)
>> at org.apache.flink.client.cli.CliFrontend.modify(CliFrontend.java:790)
>> at
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1068)
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
>> at
>> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
>> Caused by: java.util.concurrent.CompletionException:
>> java.lang.IllegalStateException: Suspend needs to happen atomically
>> at
>> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
>> at
>> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
>> at
>> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:975)
>> at
>> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
>> at
>> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:392)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:185)
>> at
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147)
>> at
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>> at
>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>> at akka.actor.Actor.aroundReceive(Actor.scala:502)
>> at akka.actor.Actor.aroundReceive$(Actor.scala:500)
>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>> at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
>> at
>> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
>> at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
>> at
>> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
>> Caused by: java.lang.IllegalStateException: Suspend needs to happen
>> atomically
>> at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
>> at
>> org.apache.flink.runtime.executiongraph.ExecutionGrap

Re: Rescaling a running topology

2020-02-07 Thread Stephen Connolly
And now the job is stuck in a suspended state and I seem to have no way to
get it out of that state again!

On Fri, 7 Feb 2020 at 11:50, Stephen Connolly <
stephen.alan.conno...@gmail.com> wrote:

> The plot thickens... I was able to rescale down... just not back up
> again!!!
>
> root@flink-jobmanager-64bcfdf799-jhs7p:/opt/flink# bin/flink list -m
> localhost:8081
> Waiting for response...
> -- Running/Restarting Jobs ---
> 07.02.2020 11:26:33 : ebc20a700c334f61ea03ecdf3d8939ca : Test topology
> (RUNNING)
> --
> No scheduled jobs.
> root@flink-jobmanager-64bcfdf799-jhs7p:/opt/flink# bin/flink modify -m
> localhost:8081 ebc20a700c334f61ea03ecdf3d8939ca -p 1
> Modify job ebc20a700c334f61ea03ecdf3d8939ca.
> Rescaled job ebc20a700c334f61ea03ecdf3d8939ca. Its new parallelism is 1.
> root@flink-jobmanager-64bcfdf799-jhs7p:/opt/flink# bin/flink modify -m
> localhost:8081 ebc20a700c334f61ea03ecdf3d8939ca -p 2
> Modify job ebc20a700c334f61ea03ecdf3d8939ca.
>
> 
>  The program finished with the following exception:
>
> org.apache.flink.util.FlinkException: Could not rescale job
> ebc20a700c334f61ea03ecdf3d8939ca.
> at
> org.apache.flink.client.cli.CliFrontend.lambda$modify$10(CliFrontend.java:799)
> at
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:985)
> at org.apache.flink.client.cli.CliFrontend.modify(CliFrontend.java:790)
> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1068)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
> at
> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
> Caused by: java.util.concurrent.CompletionException:
> java.lang.IllegalStateException: Suspend needs to happen atomically
> at
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
> at
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
> at
> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:975)
> at
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
> at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:392)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:185)
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147)
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
> at
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
> at akka.actor.Actor.aroundReceive(Actor.scala:502)
> at akka.actor.Actor.aroundReceive$(Actor.scala:500)
> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
> at
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
> at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
> at
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
> Caused by: java.lang.IllegalStateException: Suspend needs to happen
> atomically
> at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
> at
> org.apache.flink.runtime.executiongraph.ExecutionGraph.suspend(ExecutionGraph.java:1172)
> at
> org.apache.flink.runtime.jobmaster.JobMaster.suspendExecutionGraph(JobMaster.java:1221)
> at
> org.apache.flink.runtime.jobmaster.JobMaster.lambda$rescaleOperators$5(JobMaster.java:465)
> at
> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966)
> ... 20 more
>
> On Fri, 7 Feb 2020 at 11:40, Stephen Connolly <
> stephen.alan.conno...@gmail.com> wrote:
>
>> So I am looking at the Flink Management REST API... and, as I see it,
>> there are two paths to rescale a running topology:
>>
>> 1. Stop the topology with a savepoint and then start it up with the new
>> savepoint; or
>> 2. Use the /jobs/:jobid

Re: Rescaling a running topology

2020-02-07 Thread Stephen Connolly
The plot thickens... I was able to rescale down... just not back up again!!!

root@flink-jobmanager-64bcfdf799-jhs7p:/opt/flink# bin/flink list -m
localhost:8081
Waiting for response...
-- Running/Restarting Jobs ---
07.02.2020 11:26:33 : ebc20a700c334f61ea03ecdf3d8939ca : Test topology
(RUNNING)
--
No scheduled jobs.
root@flink-jobmanager-64bcfdf799-jhs7p:/opt/flink# bin/flink modify -m
localhost:8081 ebc20a700c334f61ea03ecdf3d8939ca -p 1
Modify job ebc20a700c334f61ea03ecdf3d8939ca.
Rescaled job ebc20a700c334f61ea03ecdf3d8939ca. Its new parallelism is 1.
root@flink-jobmanager-64bcfdf799-jhs7p:/opt/flink# bin/flink modify -m
localhost:8081 ebc20a700c334f61ea03ecdf3d8939ca -p 2
Modify job ebc20a700c334f61ea03ecdf3d8939ca.


 The program finished with the following exception:

org.apache.flink.util.FlinkException: Could not rescale job
ebc20a700c334f61ea03ecdf3d8939ca.
at
org.apache.flink.client.cli.CliFrontend.lambda$modify$10(CliFrontend.java:799)
at
org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:985)
at org.apache.flink.client.cli.CliFrontend.modify(CliFrontend.java:790)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1068)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
at
org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
Caused by: java.util.concurrent.CompletionException:
java.lang.IllegalStateException: Suspend needs to happen atomically
at
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
at
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
at
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:975)
at
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
at
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:392)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:185)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
at
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
at akka.actor.Actor.aroundReceive(Actor.scala:502)
at akka.actor.Actor.aroundReceive$(Actor.scala:500)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: java.lang.IllegalStateException: Suspend needs to happen
atomically
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
at
org.apache.flink.runtime.executiongraph.ExecutionGraph.suspend(ExecutionGraph.java:1172)
at
org.apache.flink.runtime.jobmaster.JobMaster.suspendExecutionGraph(JobMaster.java:1221)
at
org.apache.flink.runtime.jobmaster.JobMaster.lambda$rescaleOperators$5(JobMaster.java:465)
at
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966)
... 20 more

On Fri, 7 Feb 2020 at 11:40, Stephen Connolly <
stephen.alan.conno...@gmail.com> wrote:

> So I am looking at the Flink Management REST API... and, as I see it,
> there are two paths to rescale a running topology:
>
> 1. Stop the topology with a savepoint and then start it up with the new
> savepoint; or
> 2. Use the /jobs/:jobid/rescaling
> <https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/rest_api.html#jobs-jobid-rescaling>
> endpoint
>
> The first one seems to work just fine.
>
> The second one seems to just blow up every time I try to use it... I'll
> get things like:
>
>
> https://gist.github.com/stephenc/0bbc08391ddce5a781242900e4b33a5d#file-log-txt
>
> The above was for the topology
> https://gist.github.com/stephenc/0bbc08391ddce5a781242900e4b33a5d#file-streamingjob-java
> running with options:
>
> --source parallel
>
> Things are even worse with --source iterator as that has no checkpoint
> state to recover from
>

Rescaling a running topology

2020-02-07 Thread Stephen Connolly
So I am looking at the Flink Management REST API... and, as I see it, there
are two paths to rescale a running topology:

1. Stop the topology with a savepoint and then start it up with the new
savepoint; or
2. Use the /jobs/:jobid/rescaling

endpoint

The first one seems to work just fine.

The second one seems to just blow up every time I try to use it... I'll get
things like:

https://gist.github.com/stephenc/0bbc08391ddce5a781242900e4b33a5d#file-log-txt

The above was for the topology
https://gist.github.com/stephenc/0bbc08391ddce5a781242900e4b33a5d#file-streamingjob-java
running with options:

--source parallel

Things are even worse with --source iterator as that has no checkpoint
state to recover from

Right now I am trying to discover what preconditions are required to be met
in order to be able to safely call the Rescaling endpoint and actually have
it work... I should note that I currently have not managed to get it to
work at all!!!

One of the things we are trying to do is add some automation to enable
scale-up / down as we see surges in processing load. We want to have an
automated system that can respond to those situations automatically for low
deltas and trigger an on-call engineer for persistent excess load. In that
regard I'd like to know what the automation should check to know whether it
can do rescaling via the dedicated end-point or if it should use the
reliable (but presumably slower) path of stop with savepoint & start from
savepoint.

The
https://gist.github.com/stephenc/0bbc08391ddce5a781242900e4b33a5d#file-streamingjob-java
job I have been using is just a quick job to let me test the automation on
a local cluster. It is designed to output a strictly increasing sequence of
numbers without missing any... optionally double them and then print them
out. The different sources are me experimenting with different types of
operator to see what kinds of topology can work with the rescaling end-point

Thanks in advance


POJO serialization vs immutability

2019-10-02 Thread Stephen Connolly
I notice
https://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html#rules-for-pojo-types
says that all non-transient fields need a setter.

That means that the fields cannot be final.

That means that the hashCode() should probably just return a constant value
(otherwise an object could be mutated and then lost from a hash-based
collection.

Is it really the case that we have to either register a serializer or
abandon immutability and consequently force hashCode to be a constant value?

What are the recommended implementation patterns for the POJOs used in a
topology

Thanks

-Stephen


Re: Is there a lifecycle listener that gets notified when a topology starts/stops on a task manager

2019-09-24 Thread Stephen Connolly
I have created https://issues.apache.org/jira/browse/FLINK-14184 as a
proposal to improve Flink in this specific area.

On Tue, 24 Sep 2019 at 03:23, Zhu Zhu  wrote:

> Hi Stephen,
>
> I think disposing static components in the closing stage of a task is
> required.
> This is because your code(operators/UDFs) is part of the task, namely that
> it can only be executed when the task is not disposed.
>
> Thanks,
> Zhu Zhu
>
> Stephen Connolly  于2019年9月24日周二 上午2:13写道:
>
>> Currently the best I can see is to make *everything* a Rich... and hook
>> into the open and close methods... but feels very ugly.
>>
>>
>>
>> On Mon 23 Sep 2019 at 15:45, Stephen Connolly <
>> stephen.alan.conno...@gmail.com> wrote:
>>
>>> We are using a 3rd party library that allocates some resources in one of
>>> our topologies.
>>>
>>> Is there a listener or something that gets notified when the topology
>>> starts / stops running in the Task Manager's JVM?
>>>
>>> The 3rd party library uses a singleton, so I need to initialize the
>>> singleton when the first task is started on the task manager and clear out
>>> the singleton when the last task is stopped in order to allow the topology
>>> classloader to be unloadable.
>>>
>>> I had thought it could all be done from the Topology's main method, but
>>> after much head-banging we were able to identify that *when run on a
>>> distributed cluster* the main method is not invoked to start the topology
>>> for each task manager.
>>>
>> --
>> Sent from my phone
>>
>


Re: Is there a lifecycle listener that gets notified when a topology starts/stops on a task manager

2019-09-23 Thread Stephen Connolly
Currently the best I can see is to make *everything* a Rich... and hook
into the open and close methods... but feels very ugly.



On Mon 23 Sep 2019 at 15:45, Stephen Connolly <
stephen.alan.conno...@gmail.com> wrote:

> We are using a 3rd party library that allocates some resources in one of
> our topologies.
>
> Is there a listener or something that gets notified when the topology
> starts / stops running in the Task Manager's JVM?
>
> The 3rd party library uses a singleton, so I need to initialize the
> singleton when the first task is started on the task manager and clear out
> the singleton when the last task is stopped in order to allow the topology
> classloader to be unloadable.
>
> I had thought it could all be done from the Topology's main method, but
> after much head-banging we were able to identify that *when run on a
> distributed cluster* the main method is not invoked to start the topology
> for each task manager.
>
-- 
Sent from my phone


Is there a lifecycle listener that gets notified when a topology starts/stops on a task manager

2019-09-23 Thread Stephen Connolly
We are using a 3rd party library that allocates some resources in one of
our topologies.

Is there a listener or something that gets notified when the topology
starts / stops running in the Task Manager's JVM?

The 3rd party library uses a singleton, so I need to initialize the
singleton when the first task is started on the task manager and clear out
the singleton when the last task is stopped in order to allow the topology
classloader to be unloadable.

I had thought it could all be done from the Topology's main method, but
after much head-banging we were able to identify that *when run on a
distributed cluster* the main method is not invoked to start the topology
for each task manager.


Re: Logback on AWS EMR

2019-09-05 Thread Stephen Connolly
Answering our own question.

>From what we can see, all you need to do is tweak the /usr/lib/flink/conf
and /usr/lib/flink/lib directories so that you remove the log4j.properties
and have your logback.xml in conf and the required libraries in the lib
directory (removing the log4j backend in place of logback). Then when you
launch your flink jobs they will be clones with the correct files and happy
as larry!

Haven't figured out how to handle for ephemeral EMR clusters... but we
aren't using them so :shrug:

On Wed, 4 Sep 2019 at 22:17, Stephen Connolly <
stephen.alan.conno...@gmail.com> wrote:

> Has anyone configured AWS EMR’s flavour of Flink to use Logback (more
> specifically with the logstash encoder, which would require additional jars
> on the classpath)
>
> Or is there an alternative way people are using to send the logs to a
> service like Datadog
>
> Thanks in advance
>
> Stephen
> --
> Sent from my phone
>


Logback on AWS EMR

2019-09-04 Thread Stephen Connolly
Has anyone configured AWS EMR’s flavour of Flink to use Logback (more
specifically with the logstash encoder, which would require additional jars
on the classpath)

Or is there an alternative way people are using to send the logs to a
service like Datadog

Thanks in advance

Stephen
-- 
Sent from my phone


Re: How to handle JDBC connections in a topology

2019-07-24 Thread Stephen Connolly
Oh and I'd also need some way to clean up the per-node transient state if
the topology stops running on a specific node.

On Wed, 24 Jul 2019 at 08:18, Stephen Connolly <
stephen.alan.conno...@gmail.com> wrote:

> Hi,
>
> So we have a number of nodes in our topology that need to do things like
> checking a database, e.g.
>
> * We need a filter step to drop events on the floor from systems we are no
> longer interested in
> * We need a step that outputs on a side-channel if the event is for an
> object where the parent is not currently known to the database.
>
> Right now we are grabbing a JDBC connection for each node in the topology
> that needs to talk to the database and storing the connection in a
> transient field (to exclude it from the serialized state)
>
> What I'd really like to do is have a JDBC connection pool shared across
> the entire topology as that way we could have the pool check for stale
> connections, etc.
>
> Does anyone have any tips for doing this kind of thing?
>
> (My current idea is to maintain a `static final
> WeakHashMap` in the main class... but that
> feels very much like a hack)
>
> What I'm really looking for is some form of Node Transient State... are
> there any examples of this type of think.
>
> Flink 1.8.x
>
> Thanks,
>
> -Stephen
>


How to handle JDBC connections in a topology

2019-07-24 Thread Stephen Connolly
Hi,

So we have a number of nodes in our topology that need to do things like
checking a database, e.g.

* We need a filter step to drop events on the floor from systems we are no
longer interested in
* We need a step that outputs on a side-channel if the event is for an
object where the parent is not currently known to the database.

Right now we are grabbing a JDBC connection for each node in the topology
that needs to talk to the database and storing the connection in a
transient field (to exclude it from the serialized state)

What I'd really like to do is have a JDBC connection pool shared across the
entire topology as that way we could have the pool check for stale
connections, etc.

Does anyone have any tips for doing this kind of thing?

(My current idea is to maintain a `static final
WeakHashMap` in the main class... but that
feels very much like a hack)

What I'm really looking for is some form of Node Transient State... are
there any examples of this type of think.

Flink 1.8.x

Thanks,

-Stephen


Re: Does flink configuration support configed by environment variables?

2019-04-01 Thread Stephen Connolly
I don't think it does. I ended up writing a small CLI tool to enabling
templating the file from environment variables. There are loads of such
tools, but mine is https://github.com/stephenc/envsub

I have the dockerfile like so:

ARG FLINK_VERSION=1.7.2-alpine
FROM flink:${FLINK_VERSION}
ARG
ENVSUB=0.1.0::SHA::b10600c03236bbf0711476e11a1dff9ae285a50a48568bfd0bf6c6014fc69f0c
RUN apk add --no-cache tini curl \
#
# Get envsub
#
\
&& curl -fsSL "
https://github.com/stephenc/envsub/releases/download/${ENVSUB%%::SHA::*}/envsub;
-o /usr/local/bin/envsub \
&& if [ "${ENVSUB##*::SHA::}" = "${ENVSUB}" ] ; then \
echo "/usr/local/bin/envsub: Unverified" >&2 ; \
else \
echo "${ENVSUB##*::SHA::}  /usr/local/bin/envsub" | sha256sum -c -
; \
fi \
&& chmod +x /usr/local/bin/envsub

COPY rootfs/ /

ENTRYPOINT ["/sbin/tini", "-g", "--", "/docker-entrypoint2.sh"]

CMD []


Then docker-entrypoint2.sh looks like

#!/usr/bin/env bash

if [[ "$#" -eq 0 ]]; then
if [[ -z "${FLINK_ROLE}" ]]; then
echo "Please set environment variable FLINK_ROLE to either
jobmanager or taskmanager"
exit 1
fi
envsub < /opt/flink/conf/flink-conf.template.yaml >
/opt/flink/conf/flink-conf.yaml
exec /docker-entrypoint.sh "${FLINK_ROLE}"
fi

exec /docker-entrypoint.sh "${@}"


and /opt/flink/conf/flink-conf.template.yaml has the environment variable
substitution like so:

fs.s3a.endpoint: ${FLINK_S3_ENDPOINT}
fs.s3a.path.style.access: true
fs.s3a.connection.ssl.enabled: false
fs.s3a.access.key: ${AWS_ACCESS_KEY_ID}
fs.s3a.secret.key: ${AWS_SECRET_KEY}


Hope that is sufficient for you to derive your own solution


On Fri, 29 Mar 2019 at 03:09, Lifei Chen  wrote:

> Hi guys,
>
> I am using flink 1.7.2 deployed by kubernetes,  and I want to change the
> configurations about flink,  for example customize
> `taskmanager.heap.size`.
>
> Does flink support using environment variables to override configurations
> in `conf/flink-conf.yaml` ?
>


Re: REST API question GET /jars/:jarid/plan

2019-03-07 Thread Stephen Connolly
Yep that was it. I have created
https://issues.apache.org/jira/browse/FLINK-11853 so that it is easier for
others to work around if they have restrictions on the HTTP client library
choice

On Thu, 7 Mar 2019 at 11:47, Stephen Connolly <
stephen.alan.conno...@gmail.com> wrote:

>
>
> On Thu, 7 Mar 2019 at 11:33, Chesnay Schepler  wrote:
>
>> I've heard of cases where client libraries are automatically changing
>> the HTTP method when provided with a body.
>>
>
> Hmmm thanks for that... I'll dig into it
>
>
>>
>> To figure out what exactly is received by Flink, enable TRACE logging,
>> try again and look for logging messages from
>> "org.apache.flink.runtime.rest.handler.router.RouterHandler"
>>
>> On 07.03.2019 11:25, Stephen Connolly wrote:
>> > In the documentation for the /jars/:jarid/plan endpoint
>> >
>> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jars-jarid-plan
>> >
>> > It says:
>> >
>> > > Program arguments can be passed both via the JSON request
>> > (recommended) or query parameters.
>> >
>> > Has anyone got sample code that sends the JSON request?
>> >
>> > I have got the end-point working with query parameters, but I need to
>> > support more than 2083 GET URL length limit.
>> >
>> > When I have my code like this:
>> >
>> > UriTemplate template = UriTemplate.fromTemplate(apiUrl)
>> > .literal("/v1/jars")
>> > .path("jarId")
>> > .literal("/plan")
>> > .query("entryClass", "programArg*", "parallelism")
>> > .build()
>> > .set("jarId", jarId);
>> > if (requestBody.getEntryClass() != null) {
>> > // TODO find a way to have this as entry-class even if the
>> > spec says no
>> > template.set("entryClass", requestBody.getEntryClass());
>> > }
>> > if (!requestBody.getProgramArgs().isEmpty()) {
>> > template.set("programArg",
>> > requestBody.getProgramArgs().toArray(new String[0]));
>> > }
>> > if (requestBody.getParallelism() > 0) {
>> > template.set("parallelism", requestBody.getParallelism());
>> > }
>> > return get(
>> > template,
>> > null,
>> > null,
>> > JsonNode.class,
>> > MEDIA_TYPE_JSON
>> > );
>> >
>> > Then I get the plan returned.
>> >
>> > When I change to this
>> >
>> > UriTemplate template = UriTemplate.fromTemplate(apiUrl)
>> > .literal("/v1/jars")
>> > .path("jarId")
>> > .literal("/plan")
>> > .build()
>> > .set("jarId", jarId);
>> > return get(
>> > template,
>> > requestBody,
>> > MEDIA_TYPE_JSON,
>> > JsonNode.class,
>> > MEDIA_TYPE_JSON
>> > );
>> >
>> > I get a 404.
>> >
>> > Basically, adding the request body makes the URL go 404... For fun I
>> > tried having both query parameters and request body and that gets a
>> > 404 also.
>> >
>> > So does anyone have a working example using the JSON request body (and
>> > let's not get started on how a request body is a really bad idea for
>> > GET requests)
>>
>>
>>


Re: REST API question GET /jars/:jarid/plan

2019-03-07 Thread Stephen Connolly
On Thu, 7 Mar 2019 at 11:33, Chesnay Schepler  wrote:

> I've heard of cases where client libraries are automatically changing
> the HTTP method when provided with a body.
>

Hmmm thanks for that... I'll dig into it


>
> To figure out what exactly is received by Flink, enable TRACE logging,
> try again and look for logging messages from
> "org.apache.flink.runtime.rest.handler.router.RouterHandler"
>
> On 07.03.2019 11:25, Stephen Connolly wrote:
> > In the documentation for the /jars/:jarid/plan endpoint
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jars-jarid-plan
> >
> > It says:
> >
> > > Program arguments can be passed both via the JSON request
> > (recommended) or query parameters.
> >
> > Has anyone got sample code that sends the JSON request?
> >
> > I have got the end-point working with query parameters, but I need to
> > support more than 2083 GET URL length limit.
> >
> > When I have my code like this:
> >
> > UriTemplate template = UriTemplate.fromTemplate(apiUrl)
> > .literal("/v1/jars")
> > .path("jarId")
> > .literal("/plan")
> > .query("entryClass", "programArg*", "parallelism")
> > .build()
> > .set("jarId", jarId);
> > if (requestBody.getEntryClass() != null) {
> > // TODO find a way to have this as entry-class even if the
> > spec says no
> > template.set("entryClass", requestBody.getEntryClass());
> > }
> > if (!requestBody.getProgramArgs().isEmpty()) {
> > template.set("programArg",
> > requestBody.getProgramArgs().toArray(new String[0]));
> > }
> > if (requestBody.getParallelism() > 0) {
> > template.set("parallelism", requestBody.getParallelism());
> > }
> > return get(
> > template,
> > null,
> > null,
> > JsonNode.class,
> > MEDIA_TYPE_JSON
> > );
> >
> > Then I get the plan returned.
> >
> > When I change to this
> >
> > UriTemplate template = UriTemplate.fromTemplate(apiUrl)
> > .literal("/v1/jars")
> > .path("jarId")
> > .literal("/plan")
> > .build()
> > .set("jarId", jarId);
> > return get(
> > template,
> > requestBody,
> > MEDIA_TYPE_JSON,
> > JsonNode.class,
> > MEDIA_TYPE_JSON
> > );
> >
> > I get a 404.
> >
> > Basically, adding the request body makes the URL go 404... For fun I
> > tried having both query parameters and request body and that gets a
> > 404 also.
> >
> > So does anyone have a working example using the JSON request body (and
> > let's not get started on how a request body is a really bad idea for
> > GET requests)
>
>
>


Re: DataStream EventTime last data cannot be output?

2019-03-07 Thread Stephen Connolly
I had this issue myself.

Your timestamp assigner will only advance the window as it receives data,
thus when you reach the end of the data there will be data which is newer
than the last window.

One solution is to have the source flag that there will be no more data. If
you can do this then that is the best solution.

Another solution is to mix event time and wall clock time in deciding the
window, thus the window will eventually move past and output the data. Note
that if you use this approach and you are reprocessing the data, because
the wall clock will be different, your data may be grouped differently and
you could see different results depending on what kind of computation you
are using.

The next gotcha that I hit was parallelism, if you are assigning timestamps
in a parallel task (say after a keyBy) then each of the parallel tasks will
have their own window assigner. If your data is poorly distributed for your
key function then you might end up with one of those parallel timestamp
assigners only getting one or zero data points and thus all data output is
blocked forever!

This is all hinted at on
https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html
but it could be more explicit.
https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html#idling-sources
is describing your exact issue...

HTH

On Wed, 6 Mar 2019 at 14:51, 刘 文  wrote:

> DataStream EventTime last data cannot be output ?
>
>
> In the verification of EventTime plus watermark processing, I found that
> the data sent to the socket cannot be output in time or output.
> ). The verification found that only the timestamp of the current send data
> of getCurrentWatermark() > TimeWindow + maxOutOfOrderness will trigger the
> end of the last window
> ). But the latest record can not be processed in time, or can not be
> processed
> ). How can I deal with this problem?
>
>
>
> The following is the Flink program ,Flink 1.7.2
> ---
>
>
>
> package 
> com.opensourceteams.module.bigdata.flink.example.stream.worldcount.nc.eventtime
>
> import java.util.{Date, Properties}
>
> import com.alibaba.fastjson.JSON
> import com.opensourceteams.module.bigdata.flink.common.ConfigurationUtil
> import org.apache.flink.api.common.serialization.SimpleStringSchema
> import org.apache.flink.configuration.Configuration
> import org.apache.flink.streaming.api.TimeCharacteristic
> import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
> import org.apache.flink.streaming.api.scala.function.ProcessAllWindowFunction
> import org.apache.flink.streaming.api.watermark.Watermark
> import org.apache.flink.streaming.api.windowing.time.Time
> import org.apache.flink.streaming.api.windowing.windows.TimeWindow
> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
> import org.apache.flink.util.Collector
>
>
> object SockWordCountRun {
>
>
>
>   def main(args: Array[String]): Unit = {
>
>
> // get the execution environment
>// val env: StreamExecutionEnvironment = 
> StreamExecutionEnvironment.getExecutionEnvironment
>
>
> val configuration : Configuration = 
> ConfigurationUtil.getConfiguration(true)
>
> val env:StreamExecutionEnvironment = 
> StreamExecutionEnvironment.createLocalEnvironment(1,configuration)
>
>
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>
>
>
> import org.apache.flink.streaming.api.scala._
> val dataStream = env.socketTextStream("localhost", 1234, '\n')
>
>  // .setParallelism(3)
>
>
> dataStream.assignTimestampsAndWatermarks(new 
> AssignerWithPeriodicWatermarks[String] {
>
> val maxOutOfOrderness =  2 * 1000L // 3.5 seconds
> var currentMaxTimestamp: Long = _
> var currentTimestamp: Long = _
>
> override def getCurrentWatermark: Watermark =  new 
> Watermark(currentMaxTimestamp - maxOutOfOrderness)
>
> override def extractTimestamp(element: String, 
> previousElementTimestamp: Long): Long = {
>   val jsonObject = JSON.parseObject(element)
>
>   val timestamp = jsonObject.getLongValue("extract_data_time")
>   currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp)
>   currentTimestamp = timestamp
>
> /*  println("===watermark begin===")
>   println()
>   println(new Date(currentMaxTimestamp - 20 * 1000))
>   println(jsonObject)
>   println("===watermark end===")
>   println()*/
>   timestamp
> }
>
>   })
>   .timeWindowAll(Time.seconds(3))
>
>   .process(new ProcessAllWindowFunction[String,String,TimeWindow]() {
>   override def process(context: Context, elements: Iterable[String], out: 
> Collector[String]): Unit = {
>
>
> println()
> println("开始提交window")
> println(new 

REST API question GET /jars/:jarid/plan

2019-03-07 Thread Stephen Connolly
In the documentation for the /jars/:jarid/plan endpoint
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jars-jarid-plan

It says:

> Program arguments can be passed both via the JSON request (recommended)
or query parameters.

Has anyone got sample code that sends the JSON request?

I have got the end-point working with query parameters, but I need to
support more than 2083 GET URL length limit.

When I have my code like this:

UriTemplate template = UriTemplate.fromTemplate(apiUrl)
.literal("/v1/jars")
.path("jarId")
.literal("/plan")
.query("entryClass", "programArg*", "parallelism")
.build()
.set("jarId", jarId);
if (requestBody.getEntryClass() != null) {
// TODO find a way to have this as entry-class even if the spec
says no
template.set("entryClass", requestBody.getEntryClass());
}
if (!requestBody.getProgramArgs().isEmpty()) {
template.set("programArg",
requestBody.getProgramArgs().toArray(new String[0]));
}
if (requestBody.getParallelism() > 0) {
template.set("parallelism", requestBody.getParallelism());
}
return get(
template,
null,
null,
JsonNode.class,
MEDIA_TYPE_JSON
);

Then I get the plan returned.

When I change to this

UriTemplate template = UriTemplate.fromTemplate(apiUrl)
.literal("/v1/jars")
.path("jarId")
.literal("/plan")
.build()
.set("jarId", jarId);
return get(
template,
requestBody,
MEDIA_TYPE_JSON,
JsonNode.class,
MEDIA_TYPE_JSON
);

I get a 404.

Basically, adding the request body makes the URL go 404... For fun I tried
having both query parameters and request body and that gets a 404 also.

So does anyone have a working example using the JSON request body (and
let's not get started on how a request body is a really bad idea for GET
requests)


Re: Checkpoints and catch-up burst (heavy back pressure)

2019-03-05 Thread Stephen Connolly
On Tue, 5 Mar 2019 at 12:48, Stephen Connolly <
stephen.alan.conno...@gmail.com> wrote:

>
>
> On Fri, 1 Mar 2019 at 13:05, LINZ, Arnaud 
> wrote:
>
>> Hi,
>>
>>
>>
>> I think I should go into more details to explain my use case.
>>
>> I have one non parallel source (parallelism = 1) that list binary files
>> in a HDFS directory. DataSet emitted by the source is a data set of file
>> names, not file content. These filenames are rebalanced, and sent to
>> workers (parallelism = 15) that will use a flatmapper that open the file,
>> read it, decode it, and send records (forward mode) to the sinks (with a
>> few 1-to-1 mapping in-between). So the flatmap operation is a
>> time-consuming one as the files are more than 200Mb large each; the
>> flatmapper will emit millions of record to the sink given one source record
>> (filename).
>>
>>
>>
>> The rebalancing, occurring at the file name level, does not use much I/O
>> and I cannot use one-to-one mode at that point if I want some parallelims
>> since I have only one source.
>>
>>
>>
>> I did not put file decoding directly in the sources because I have no
>> good way to distribute files to sources without a controller (input
>> directory is unique, filenames are random and cannot be “attributed” to one
>> particular source instance easily).
>>
>
> Crazy idea: If you know the task number and the number of tasks, you can
> hash the filename using a shared algorithm (e.g. md5 or sha1 or crc32) and
> then just check modulo number of tasks == task number.
>
> That would let you run the list files in parallel without sharing state.
> which would allow file decoding directly in the sources
>

if you extend RichParallelSourceFunction you will have:

int index = getRuntimeContext().getIndexOfThisSubtask();
int count = getRuntimeContext().getNumberOfParallelSubtasks();

then a hash function like:

private static int hash(String string) {
int result = 0;
for (byte b : DigestUtils.sha1(string)) {
result = result * 31 + b;
}
return result;
}

and just compare the filename like so:

for (String filename: listFiles()) {
  if (Math.floorMod(hash(filename), count) != index) {
continue;
  }
  // this is our file
  ...
}

Note: if you know the file name patterns, you should tune the hash function
to distribute them evenly. The SHA1 with prime reduction of the bytes is ok
for general levelling... but may be poor over 15 buckets with your typical
data set of filenames


>
>
>> Alternatively, I could have used a dispatcher daemon separated from the
>> streaming app that distribute files to various directories, each directory
>> being associated with a flink source instance, and put the file reading &
>> decoding directly in the source, but that seemed more complex to code and
>> exploit than the filename source. Would it have been better from the
>> checkpointing perspective?
>>
>>
>>
>> About the ungraceful source sleep(), is there a way, programmatically, to
>> know the “load” of the app, or to determine if checkpointing takes too much
>> time, so that I can do it only on purpose?
>>
>>
>>
>> Thanks,
>>
>> Arnaud
>>
>>
>>
>> *De :* zhijiang 
>> *Envoyé :* vendredi 1 mars 2019 04:59
>> *À :* user ; LINZ, Arnaud <
>> al...@bouyguestelecom.fr>
>> *Objet :* Re: Checkpoints and catch-up burst (heavy back pressure)
>>
>>
>>
>> Hi Arnaud,
>>
>>
>>
>> Thanks for the further feedbacks!
>>
>>
>>
>> For option1: 40min still does not makes sense, which indicates it might
>> take more time to finish checkpoint in your case. I also experienced some
>> scenarios of catching up data to take several hours to finish one
>> checkpoint. If the current checkpoint expires because of timeout, the next
>> new triggered checkpoint might still be failed for timeout. So it seems
>> better to wait the current checkpoint until finishes, not expires it,
>> unless we can not bear this long time for some reasons such as wondering
>> failover to restore more data during this time.
>>
>>
>>
>> For option2: The default network setting should be make sense. The lower
>> values might cause performance regression and the higher values would
>> increase the inflighing buffers and checkpoint delay more seriously.
>>
>>
>>
>> For option3: If the resource is limited, it is still not working on your
>> side.
>>
>>
>>
>> It is an option and might work in your c

Re: Checkpoints and catch-up burst (heavy back pressure)

2019-03-05 Thread Stephen Connolly
On Fri, 1 Mar 2019 at 13:05, LINZ, Arnaud  wrote:

> Hi,
>
>
>
> I think I should go into more details to explain my use case.
>
> I have one non parallel source (parallelism = 1) that list binary files in
> a HDFS directory. DataSet emitted by the source is a data set of file
> names, not file content. These filenames are rebalanced, and sent to
> workers (parallelism = 15) that will use a flatmapper that open the file,
> read it, decode it, and send records (forward mode) to the sinks (with a
> few 1-to-1 mapping in-between). So the flatmap operation is a
> time-consuming one as the files are more than 200Mb large each; the
> flatmapper will emit millions of record to the sink given one source record
> (filename).
>
>
>
> The rebalancing, occurring at the file name level, does not use much I/O
> and I cannot use one-to-one mode at that point if I want some parallelims
> since I have only one source.
>
>
>
> I did not put file decoding directly in the sources because I have no good
> way to distribute files to sources without a controller (input directory is
> unique, filenames are random and cannot be “attributed” to one particular
> source instance easily).
>

Crazy idea: If you know the task number and the number of tasks, you can
hash the filename using a shared algorithm (e.g. md5 or sha1 or crc32) and
then just check modulo number of tasks == task number.

That would let you run the list files in parallel without sharing state.
which would allow file decoding directly in the sources


> Alternatively, I could have used a dispatcher daemon separated from the
> streaming app that distribute files to various directories, each directory
> being associated with a flink source instance, and put the file reading &
> decoding directly in the source, but that seemed more complex to code and
> exploit than the filename source. Would it have been better from the
> checkpointing perspective?
>
>
>
> About the ungraceful source sleep(), is there a way, programmatically, to
> know the “load” of the app, or to determine if checkpointing takes too much
> time, so that I can do it only on purpose?
>
>
>
> Thanks,
>
> Arnaud
>
>
>
> *De :* zhijiang 
> *Envoyé :* vendredi 1 mars 2019 04:59
> *À :* user ; LINZ, Arnaud  >
> *Objet :* Re: Checkpoints and catch-up burst (heavy back pressure)
>
>
>
> Hi Arnaud,
>
>
>
> Thanks for the further feedbacks!
>
>
>
> For option1: 40min still does not makes sense, which indicates it might
> take more time to finish checkpoint in your case. I also experienced some
> scenarios of catching up data to take several hours to finish one
> checkpoint. If the current checkpoint expires because of timeout, the next
> new triggered checkpoint might still be failed for timeout. So it seems
> better to wait the current checkpoint until finishes, not expires it,
> unless we can not bear this long time for some reasons such as wondering
> failover to restore more data during this time.
>
>
>
> For option2: The default network setting should be make sense. The lower
> values might cause performance regression and the higher values would
> increase the inflighing buffers and checkpoint delay more seriously.
>
>
>
> For option3: If the resource is limited, it is still not working on your
> side.
>
>
>
> It is an option and might work in your case for sleeping some time in
> source as you mentioned, although it seems not a graceful way.
>
>
>
> I think there are no data skew in your case to cause backpressure, because
> you used the rebalance mode as mentioned. Another option might use the
> forward mode which would be better than rebalance mode if possible in your
> case. Because the source and downstream task is one-to-one in forward mode,
> so the total flighting buffers are 2+2+8 for one single downstream task
> before barrier. If in rebalance mode, the total flighting buffer would be
> (a*2+a*2+8) for one single downstream task (`a` is the parallelism of
> source vertex), because it is all-to-all connection. The barrier alignment
> takes more time in rebalance mode than forward mode.
>
>
>
> Best,
>
> Zhijiang
>
> --
>
> From:LINZ, Arnaud 
>
> Send Time:2019年3月1日(星期五) 00:46
>
> To:zhijiang ; user 
>
> Subject:RE: Checkpoints and catch-up burst (heavy back pressure)
>
>
>
> Update :
>
> Option  1 does not work. It still fails at the end of the timeout, no
> matter its value.
>
> Should I implement a “bandwidth” management system by using artificial
> Thread.sleep in the source depending on the back pressure ?
>
>
>
> *De :* LINZ, Arnaud
> *Envoyé :* jeudi 28 février 2019 15:47
> *À :* 'zhijiang' ; user  >
> *Objet :* RE: Checkpoints and catch-up burst (heavy back pressure)
>
>
>
> Hi Zhihiang,
>
>
>
> Thanks for your feedback.
>
>- I’ll try option 1 ; time out is 4min for now, I’ll switch it to
>40min and will let you know. Setting it higher than 40 min does not make
>much sense since after 40 min the pending output is 

Re: Why don't Tuple types implement Comparable?

2019-02-22 Thread Stephen Connolly
On Fri, 22 Feb 2019 at 10:38, Stephen Connolly <
stephen.alan.conno...@gmail.com> wrote:

>
>
> On Fri, 22 Feb 2019 at 10:16, Stephen Connolly <
> stephen.alan.conno...@gmail.com> wrote:
>
>> On Thu, 21 Feb 2019 at 18:29, Frank Grimes 
>> wrote:
>>
>>> Hi,
>>>
>>> I've recently started to evaluate Flink and have found it odd that its
>>> Tuple types, while Serializable, don't implement java.lang.Comparable.
>>> This means that I either need to provide an KeySelector for many
>>> operations or subtype the Tuple types and provide my own implementation of
>>> compareTo for each.
>>>
>>> Is there a technical reason why this was omitted?
>>>
>>
>> A Tuple1 would need to have an implementation of the comparable
>> contract in order to be comparable. The only way I can see of writing that
>> would be if we had:
>>
>> Tuple1>
>>
>> Similarly you'd have
>>
>> Tuple25, T1 extends Comparable> T1>, T2 extends Comparable, ..., T24 extends Comparable> T24>>
>>
>> Which is a lot less screen friendly than say
>>
>> Tuple25
>>
>> So from a purely code readability PoV you would need to at least replace
>> the  Comparable with Comparable with the immediate reduction
>> of utility... or perhaps you would remove the generics entirely and go for
>> just Comparable and have the implementation just follow the guarantee...
>> except javac will error out for the erasure.
>>
>> Now jOOQ just ignores the comparability contract on the generic types,
>> e.g.
>> https://github.com/jOOQ/jOOL/blob/master/jOOL/src/main/java/org/jooq/lambda/tuple/Tuple1.java#L35
>>
>> The result is that when asked to compare you get a cast that will work or
>> fail:
>> https://github.com/jOOQ/jOOL/blob/master/jOOL/src/main/java/org/jooq/lambda/tuple/Tuples.java#L31
>>
>> So jOOQ is following the "fail at runtime" mode... you can make tuples of
>> non-comparable objects and only at runtime if you try to compare them will
>> it blow up.
>>
>> Flink is following the "fail at compile time" mode... if you try to
>> compare a tuple, you need to provide a way to compare them.
>>
>> Flink does a lot of work - from what I can see - to fail early and fast.
>> For example, if type inference fails on the dataflow plan, it will error
>> out fast. I would hate to have a Tuple3 where A and B are both
>> Comparable and Y is not... my tests happened to have a small subset of A
>> and B values that never resulted in a comparison of Y values... so the
>> tests didn't fail... but now on the production environment... at runtime...
>> 1 month later in the middle of my vacation I get a call because the
>> topology is stuck and failing on (a,b,y1).compareTo((a,b,y2))
>>
>> So my analysis would say it is not a technical reason but rather a
>> principal reason of "fail fast"
>>
>
> Idea: you could fake comparable for things that are not comparable.
>
> We know the two objects are serializable, so we could just do a byte by
> byte comparison of their serialized representations if they do not
> implement Comparable... would be slower, would probably not be the
> comparison result that people expect... but it would be consistent and not
> error out.
>
> public int compare(Serializable a,Serializable b) {
>   byte[] b1;
>   try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
> ObjectOutputStream oos = new ObjectOutputStream(bos)) {
> oos.writeObject(a);
> b1 = bos.toByteArray();
>   }
>   byte[] b2;
>   try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
> ObjectOutputStream oos = new ObjectOutputStream(bos)) {
> oos.writeObject(a);
> b2 = bos.toByteArray();
>   }
>   for (int i = 0; i < Math.min(b1.length,b2.length); i++) {
> int r = Byte.compare(b1[i],b2[i]);
> if (r != 0) {
>   return r;
> }
>   }
>   return Integer.compare(b1.length,b2.length);
> }
>

Hmmm there is nothing stopping you from having the above as the basis for a
utility universal comparison method that you would just pass by method
handle... or if you want the jOOQ behaviour just go with

public class TupleComparator implements Comparator {
  ...
  // do the class cast here and let it blow up at runtime
}


>
> is an example of the kind of algorithm. The idea being that all you really
> want is a consistent comparison because they do not implement Comparable...
> if somebody had a useful comparison contract for objects of that type then
> presumably they would already have impleme

Re: Why don't Tuple types implement Comparable?

2019-02-22 Thread Stephen Connolly
On Fri, 22 Feb 2019 at 10:16, Stephen Connolly <
stephen.alan.conno...@gmail.com> wrote:

> On Thu, 21 Feb 2019 at 18:29, Frank Grimes 
> wrote:
>
>> Hi,
>>
>> I've recently started to evaluate Flink and have found it odd that its
>> Tuple types, while Serializable, don't implement java.lang.Comparable.
>> This means that I either need to provide an KeySelector for many
>> operations or subtype the Tuple types and provide my own implementation of
>> compareTo for each.
>>
>> Is there a technical reason why this was omitted?
>>
>
> A Tuple1 would need to have an implementation of the comparable
> contract in order to be comparable. The only way I can see of writing that
> would be if we had:
>
> Tuple1>
>
> Similarly you'd have
>
> Tuple25, T1 extends Comparable T1>, T2 extends Comparable, ..., T24 extends Comparable T24>>
>
> Which is a lot less screen friendly than say
>
> Tuple25
>
> So from a purely code readability PoV you would need to at least replace
> the  Comparable with Comparable with the immediate reduction
> of utility... or perhaps you would remove the generics entirely and go for
> just Comparable and have the implementation just follow the guarantee...
> except javac will error out for the erasure.
>
> Now jOOQ just ignores the comparability contract on the generic types,
> e.g.
> https://github.com/jOOQ/jOOL/blob/master/jOOL/src/main/java/org/jooq/lambda/tuple/Tuple1.java#L35
>
> The result is that when asked to compare you get a cast that will work or
> fail:
> https://github.com/jOOQ/jOOL/blob/master/jOOL/src/main/java/org/jooq/lambda/tuple/Tuples.java#L31
>
> So jOOQ is following the "fail at runtime" mode... you can make tuples of
> non-comparable objects and only at runtime if you try to compare them will
> it blow up.
>
> Flink is following the "fail at compile time" mode... if you try to
> compare a tuple, you need to provide a way to compare them.
>
> Flink does a lot of work - from what I can see - to fail early and fast.
> For example, if type inference fails on the dataflow plan, it will error
> out fast. I would hate to have a Tuple3 where A and B are both
> Comparable and Y is not... my tests happened to have a small subset of A
> and B values that never resulted in a comparison of Y values... so the
> tests didn't fail... but now on the production environment... at runtime...
> 1 month later in the middle of my vacation I get a call because the
> topology is stuck and failing on (a,b,y1).compareTo((a,b,y2))
>
> So my analysis would say it is not a technical reason but rather a
> principal reason of "fail fast"
>

Idea: you could fake comparable for things that are not comparable.

We know the two objects are serializable, so we could just do a byte by
byte comparison of their serialized representations if they do not
implement Comparable... would be slower, would probably not be the
comparison result that people expect... but it would be consistent and not
error out.

public int compare(Serializable a,Serializable b) {
  byte[] b1;
  try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos)) {
oos.writeObject(a);
b1 = bos.toByteArray();
  }
  byte[] b2;
  try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos)) {
oos.writeObject(a);
b2 = bos.toByteArray();
  }
  for (int i = 0; i < Math.min(b1.length,b2.length); i++) {
int r = Byte.compare(b1[i],b2[i]);
if (r != 0) {
  return r;
}
  }
  return Integer.compare(b1.length,b2.length);
}

is an example of the kind of algorithm. The idea being that all you really
want is a consistent comparison because they do not implement Comparable...
if somebody had a useful comparison contract for objects of that type then
presumably they would already have implemented the comparability contract.

Another approach that could work would be to use Tuple as a builder for
ComparableTuple by giving each one a comparable method

public class Tuple2 ... {
  ...
  public ComparableTuple2 comparable(Comparator c0,
Comparator c1) {
...
  }
  ...
}

public class ComparableTuple2 extends ComparableTuple {
  // notice no bounds on the generic type arguments
  ...
  // naïve storage (reduces GC pressure as the builder is not wasted)
  // would need analysis to determine if better to allow the builder
allocation
  // to be elided by JVM and store the values directly as f0 and f1
  private final Tuple2 tuple;
  private final Comparator c0;
  private final Comparator> c1;

  ...
}

That way you get ComparableTuple2 can ignore the type bounds on its generic
arguments because the instantiation path guarantees a comparison can be
made.

ComparableTupl

Re: Why don't Tuple types implement Comparable?

2019-02-22 Thread Stephen Connolly
On Thu, 21 Feb 2019 at 18:29, Frank Grimes  wrote:

> Hi,
>
> I've recently started to evaluate Flink and have found it odd that its
> Tuple types, while Serializable, don't implement java.lang.Comparable.
> This means that I either need to provide an KeySelector for many
> operations or subtype the Tuple types and provide my own implementation of
> compareTo for each.
>
> Is there a technical reason why this was omitted?
>

A Tuple1 would need to have an implementation of the comparable
contract in order to be comparable. The only way I can see of writing that
would be if we had:

Tuple1>

Similarly you'd have

Tuple25, T1 extends Comparable, T2 extends Comparable, ..., T24 extends Comparable>

Which is a lot less screen friendly than say

Tuple25

So from a purely code readability PoV you would need to at least replace
the  Comparable with Comparable with the immediate reduction
of utility... or perhaps you would remove the generics entirely and go for
just Comparable and have the implementation just follow the guarantee...
except javac will error out for the erasure.

Now jOOQ just ignores the comparability contract on the generic types, e.g.
https://github.com/jOOQ/jOOL/blob/master/jOOL/src/main/java/org/jooq/lambda/tuple/Tuple1.java#L35

The result is that when asked to compare you get a cast that will work or
fail:
https://github.com/jOOQ/jOOL/blob/master/jOOL/src/main/java/org/jooq/lambda/tuple/Tuples.java#L31

So jOOQ is following the "fail at runtime" mode... you can make tuples of
non-comparable objects and only at runtime if you try to compare them will
it blow up.

Flink is following the "fail at compile time" mode... if you try to compare
a tuple, you need to provide a way to compare them.

Flink does a lot of work - from what I can see - to fail early and fast.
For example, if type inference fails on the dataflow plan, it will error
out fast. I would hate to have a Tuple3 where A and B are both
Comparable and Y is not... my tests happened to have a small subset of A
and B values that never resulted in a comparison of Y values... so the
tests didn't fail... but now on the production environment... at runtime...
1 month later in the middle of my vacation I get a call because the
topology is stuck and failing on (a,b,y1).compareTo((a,b,y2))

So my analysis would say it is not a technical reason but rather a
principal reason of "fail fast"


>
> For example, the JOOQ/JOOL Tuple types all implement Comparable:
>
> https://github.com/jOOQ/jOOL/blob/master/jOOL-java-8/src/main/java/org/jooq/lambda/tuple/Tuple2.java#L39
>
> As an aside, I tried replacing usage of Flink's Tuple types with the JOOL
> ones but they caused a StackOverflowError similar to this one:
> https://issues.apache.org/jira/browse/FLINK-3922
>
> Thanks,
>
> Frank Grimes
>
>


Re: How to debug difference between Kinesis and Kafka

2019-02-21 Thread Stephen Connolly
On Thu, 21 Feb 2019 at 14:00, Dawid Wysakowicz 
wrote:

> If an event arrived at WindowOperator before the Watermark, then it will
> be accounted for window aggregation and put in state. Once that state gets
> checkpointed this same event won't be processed again. In other words if a
> checkpoint succeeds elements that produced corresponding state won't be
> processed again. You may want to read this docs for further
> understanding[1].
>
> What I meant by reprocessing is when you want to reprocess the same input
> records. E.g. you want to rerun your job once again on data from a past
> week. This computation might result in different results than the original
> ones cause Watermarks might get generated after different elements as they
> are bound by "ProcessingTime".
>
Ahh that clarifies. Nope we are processing the stream of events Taylor
Swift style... a.k.a. "We are never processing them again, like ever".

The stream of events is too much data to retain it all. Kinesis will just
keep 1 day's history for recovery.

I'd read [1] before, but then when you mentioned "you might get different
results in case of reprocessing" I started to think that maybe the
Watermarks are the Barrier but after your clarification I'm back to
thinking they are separate similar mechanisms operating in the stream


> Best,
>
> Dawid
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/internals/stream_checkpointing.html#checkpointing
> On 21/02/2019 14:42, Stephen Connolly wrote:
>
>
>
> On Thu, 21 Feb 2019 at 13:36, Dawid Wysakowicz 
> wrote:
>
>> It is definitely a solution ;)
>>
>> You should be aware of the downsides though:
>>
>>- you might get different results in case of reprocessing
>>- you might drop some data as late, due to some delays in processing,
>>if the events arrive later then the "ProcessingTime" threshold
>>
>> So I have a separate stream processor from the "late" side of my window
> that works out what the update is.
>
> But I guess the question I have is around what happens with reprocessing.
>
> 1. Event 1 goes into the window aggregation because it is before the
> watermark
>
> 2. State gets checkpointed
>
> 3. Crash
>
> 4. Recover
>
> Will Event 1 now go to the late stream or will it be tagged as having been
> included into the state in the checkpoint.
>
> I don't mind if Event 1 gets included in the window's "create event count
> for timebox" output or the "update event count for timebox from late
> events" output as long as it is always one and only one of those paths.
>
>
>>
>>
>> Best,
>>
>> Dawid
>> On 21/02/2019 14:18, Stephen Connolly wrote:
>>
>> Yes, it was the "watermarks for event time when no events for that shard"
>> problem.
>>
>> I am now investigating whether we can use a blended watermark of
>> max(lastEventTimestamp - 1min, System.currentTimeMillis() - 5min) to ensure
>> idle shards do not cause excessive data retention.
>>
>> Is that the best solution?
>>
>> On Thu, 21 Feb 2019 at 08:30, Dawid Wysakowicz 
>> wrote:
>>
>>> Hi Stephen,
>>>
>>> Watermark for a single operator is the minimum of Watermarks received
>>> from all inputs, therefore if one of your shards/operators does not have
>>> incoming data it will not produce Watermarks thus the Watermark of
>>> WindowOperator will not progress. So this is sort of an expected behavior.
>>>
>>> I recommend reading the docs linked by Congxian, especially this
>>> section[1].
>>>
>>> Best,
>>>
>>> Dawid
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/event_time.html#watermarks-in-parallel-streams
>>> On 19/02/2019 14:31, Stephen Connolly wrote:
>>>
>>> Hmmm my suspicions are now quite high. I created a file source that just
>>> replays the events straight then I get more results
>>>
>>> On Tue, 19 Feb 2019 at 11:50, Stephen Connolly <
>>> stephen.alan.conno...@gmail.com> wrote:
>>>
>>>> Hmmm after expanding the dataset such that there was additional data
>>>> that ended up on shard-0 (everything in my original dataset was
>>>> coincidentally landing on shard-1) I am now getting output... should I
>>>> expect this kind of behaviour if no data arrives at shard-0 ever?
>>>>
>>>> On Tue, 19 Feb 2019 at 11:14, Stephen Connolly <
>>>> stephen.alan.conno...@gmail.com> wrote:
>>>>
&g

Re: Reduce one event under multiple keys

2019-02-21 Thread Stephen Connolly
Thanks!

On Mon, 18 Feb 2019 at 12:36, Fabian Hueske  wrote:

> Hi Stephen,
>
> Sorry for the late response.
> If you don't need to match open and close events, your approach of using a
> flatMap to fan-out for the hierarchical folder structure and a window
> operator (or two for open and close) for counting and aggregating should be
> a good design.
>
> Best, Fabian
>
> Am Mo., 11. Feb. 2019 um 11:29 Uhr schrieb Stephen Connolly <
> stephen.alan.conno...@gmail.com>:
>
>>
>>
>> On Mon, 11 Feb 2019 at 09:42, Fabian Hueske  wrote:
>>
>>> Hi Stephen,
>>>
>>> A window is created with the first record that is assigned to it.
>>> If the windows are based on time and a key, than no window will be
>>> created (and not space be occupied) if there is not a first record for a
>>> key and time interval.
>>>
>>> Anyway, if tracking the number of open files & average opening time is
>>> your use case, you might want to implement the logic with a ProcessFunction
>>> instead of a window.
>>> The reason is that it is that time windows don't share state, i.e., the
>>> information about an opened but not yet closed file would not be "carried
>>> over" to the next window.
>>> However, if you use a ProcessFunction, you are responsible for cleaning
>>> up the state.
>>>
>>
>> Ahh but I am cheating by ensuring the events are rich enough that I do
>> not need to match them.
>>
>> I get the "open" (they are not really "open" events - I have mapped to an
>> analogy... it might be more like a build job start events... or not... I'm
>> not at liberty to say ;-) ) events because I need to count the number of
>> "open"s per time period.
>>
>> I get the "close" events and they include the duration plus other
>> information that can then be transformed into the required metrics... yes I
>> could derive the "open" from the "close" by subtracting the duration but:
>>
>> 1. they would cross window boundaries quite often, leading to repeated
>> fetch-update-write operations on the backing data store
>> 2. they wouldn't be as "live" and one of the things we need to know is
>> how many "open"s there are in the previous window... given some durations
>> can be many days, waiting for the "close" event to create the "open" metric
>> would not be a good plan.
>>
>> Basically, I am pushing some of the calculations to the edge where there
>> is state that makes those calculations cheap and then the rich events are
>> *hopefully* easy to aggregate with just simple aggregation functions that
>> only need to maintain the running total... at least that's what the PoC I
>> am experimenting with Flink should show
>>
>>
>>>
>>> Hope this helps,
>>> Fabian
>>>
>>> Am So., 10. Feb. 2019 um 20:36 Uhr schrieb Stephen Connolly <
>>> stephen.alan.conno...@gmail.com>:
>>>
>>>>
>>>>
>>>> On Sun, 10 Feb 2019 at 09:09, Chesnay Schepler 
>>>> wrote:
>>>>
>>>>> This sounds reasonable to me.
>>>>>
>>>>> I'm a bit confused by this question: "*Additionally, I am (naïevely)
>>>>> hoping that if a window has no events for a particular key, the
>>>>> memory/storage costs are zero for that key.*"
>>>>>
>>>>> Are you asking whether a key that was received in window X (as part of
>>>>> an event) is still present in window x+1? If so, then the answer is no; a
>>>>> key will only be present in a given window if an event was received that
>>>>> fits into that window.
>>>>>
>>>>
>>>> To confirm:
>>>>
>>>> So let's say I'l tracking the average time a file is opened in folders.
>>>>
>>>> In window N we get the events:
>>>>
>>>> {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/README.txt"}
>>>>
>>>> {"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/README.txt","duration":"67"}
>>>> {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/User
>>>> guide.txt"}
>>>> {"source":"ca:fe:ba:be","action":"open&

Re: How to debug difference between Kinesis and Kafka

2019-02-21 Thread Stephen Connolly
On Thu, 21 Feb 2019 at 13:36, Dawid Wysakowicz 
wrote:

> It is definitely a solution ;)
>
> You should be aware of the downsides though:
>
>- you might get different results in case of reprocessing
>- you might drop some data as late, due to some delays in processing,
>if the events arrive later then the "ProcessingTime" threshold
>
> So I have a separate stream processor from the "late" side of my window
that works out what the update is.

But I guess the question I have is around what happens with reprocessing.

1. Event 1 goes into the window aggregation because it is before the
watermark

2. State gets checkpointed

3. Crash

4. Recover

Will Event 1 now go to the late stream or will it be tagged as having been
included into the state in the checkpoint.

I don't mind if Event 1 gets included in the window's "create event count
for timebox" output or the "update event count for timebox from late
events" output as long as it is always one and only one of those paths.


>
>
> Best,
>
> Dawid
> On 21/02/2019 14:18, Stephen Connolly wrote:
>
> Yes, it was the "watermarks for event time when no events for that shard"
> problem.
>
> I am now investigating whether we can use a blended watermark of
> max(lastEventTimestamp - 1min, System.currentTimeMillis() - 5min) to ensure
> idle shards do not cause excessive data retention.
>
> Is that the best solution?
>
> On Thu, 21 Feb 2019 at 08:30, Dawid Wysakowicz 
> wrote:
>
>> Hi Stephen,
>>
>> Watermark for a single operator is the minimum of Watermarks received
>> from all inputs, therefore if one of your shards/operators does not have
>> incoming data it will not produce Watermarks thus the Watermark of
>> WindowOperator will not progress. So this is sort of an expected behavior.
>>
>> I recommend reading the docs linked by Congxian, especially this
>> section[1].
>>
>> Best,
>>
>> Dawid
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/event_time.html#watermarks-in-parallel-streams
>> On 19/02/2019 14:31, Stephen Connolly wrote:
>>
>> Hmmm my suspicions are now quite high. I created a file source that just
>> replays the events straight then I get more results
>>
>> On Tue, 19 Feb 2019 at 11:50, Stephen Connolly <
>> stephen.alan.conno...@gmail.com> wrote:
>>
>>> Hmmm after expanding the dataset such that there was additional data
>>> that ended up on shard-0 (everything in my original dataset was
>>> coincidentally landing on shard-1) I am now getting output... should I
>>> expect this kind of behaviour if no data arrives at shard-0 ever?
>>>
>>> On Tue, 19 Feb 2019 at 11:14, Stephen Connolly <
>>> stephen.alan.conno...@gmail.com> wrote:
>>>
>>>> Hi, I’m having a strange situation and I would like to know where I
>>>> should start trying to debug.
>>>>
>>>> I have set up a configurable swap in source, with three implementations:
>>>>
>>>> 1. A mock implementation
>>>> 2. A Kafka consumer implementation
>>>> 3. A Kinesis consumer implementation
>>>>
>>>> From injecting a log and no-op map function I can see that all three
>>>> sources pass through the events correctly.
>>>>
>>>> I then have a window based on event time stamps… and from inspecting
>>>> the aggregation function I can see that the data is getting aggregated…,
>>>> I’m using the `.aggregate(AggregateFunction.WindowFunction)` variant so
>>>> that I can retrieve the key
>>>>
>>>> Here’s the strange thing, I only change the source (and each source
>>>> uses the same deserialization function) but:
>>>>
>>>>
>>>>- When I use either Kafka or my Mock source, the WindowFunction
>>>>gets called as events pass the end of the window
>>>>- When I use the Kinesis source, however, the window function never
>>>>gets called. I have even tried injecting events into kinesis with really
>>>>high timestamps to flush the watermarks in my
>>>>BoundedOutOfOrdernessTimestampExtractor... but nothing
>>>>
>>>> I cannot see how this source switching could result in such a different
>>>> behaviour:
>>>>
>>>> Properties sourceProperties = new Properties();
>>>> ConsumerFactory sourceFactory;
>>>> String sourceName = configParams.getRequired("source");
>>>&g

Can I make an Elasticsearch Sink effectively exactly once?

2019-02-21 Thread Stephen Connolly
>From how I understand it:

https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/elasticsearch.html#elasticsearch-sinks-and-fault-tolerance

the Flink Elasticsearch Sink guarantees at-least-once delivery of action
> requests to Elasticsearch clusters. It does so by waiting for all pending
> action requests in the BulkProcessor at the time of checkpoints. This
> effectively assures that all requests before the checkpoint was triggered
> have been successfully acknowledged by Elasticsearch, before proceeding to
> process more records sent to the sink.


So I am thinking:


   - If I put a .map(json -> json.set("_id", ElasticsearchId.generate()) in
   front of the Elasticsearch sink
   - If I have a ActionRequestFailureHandler that drops any ID conflicts on
   the floor

Would this give me exactly once output to Elasticsearch as the
BulkProcessor's checkpoint would include the "_id" and thus in the event of
a recovery the duplicates would be detected.

Or is there some additional concern I need to be aware of?

Thanks

-stephenc


Re: How to debug difference between Kinesis and Kafka

2019-02-21 Thread Stephen Connolly
Yes, it was the "watermarks for event time when no events for that shard"
problem.

I am now investigating whether we can use a blended watermark of
max(lastEventTimestamp - 1min, System.currentTimeMillis() - 5min) to ensure
idle shards do not cause excessive data retention.

Is that the best solution?

On Thu, 21 Feb 2019 at 08:30, Dawid Wysakowicz 
wrote:

> Hi Stephen,
>
> Watermark for a single operator is the minimum of Watermarks received from
> all inputs, therefore if one of your shards/operators does not have
> incoming data it will not produce Watermarks thus the Watermark of
> WindowOperator will not progress. So this is sort of an expected behavior.
>
> I recommend reading the docs linked by Congxian, especially this
> section[1].
>
> Best,
>
> Dawid
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/event_time.html#watermarks-in-parallel-streams
> On 19/02/2019 14:31, Stephen Connolly wrote:
>
> Hmmm my suspicions are now quite high. I created a file source that just
> replays the events straight then I get more results
>
> On Tue, 19 Feb 2019 at 11:50, Stephen Connolly <
> stephen.alan.conno...@gmail.com> wrote:
>
>> Hmmm after expanding the dataset such that there was additional data that
>> ended up on shard-0 (everything in my original dataset was coincidentally
>> landing on shard-1) I am now getting output... should I expect this kind of
>> behaviour if no data arrives at shard-0 ever?
>>
>> On Tue, 19 Feb 2019 at 11:14, Stephen Connolly <
>> stephen.alan.conno...@gmail.com> wrote:
>>
>>> Hi, I’m having a strange situation and I would like to know where I
>>> should start trying to debug.
>>>
>>> I have set up a configurable swap in source, with three implementations:
>>>
>>> 1. A mock implementation
>>> 2. A Kafka consumer implementation
>>> 3. A Kinesis consumer implementation
>>>
>>> From injecting a log and no-op map function I can see that all three
>>> sources pass through the events correctly.
>>>
>>> I then have a window based on event time stamps… and from inspecting the
>>> aggregation function I can see that the data is getting aggregated…, I’m
>>> using the `.aggregate(AggregateFunction.WindowFunction)` variant so that I
>>> can retrieve the key
>>>
>>> Here’s the strange thing, I only change the source (and each source uses
>>> the same deserialization function) but:
>>>
>>>
>>>- When I use either Kafka or my Mock source, the WindowFunction gets
>>>called as events pass the end of the window
>>>- When I use the Kinesis source, however, the window function never
>>>gets called. I have even tried injecting events into kinesis with really
>>>high timestamps to flush the watermarks in my
>>>BoundedOutOfOrdernessTimestampExtractor... but nothing
>>>
>>> I cannot see how this source switching could result in such a different
>>> behaviour:
>>>
>>> Properties sourceProperties = new Properties();
>>> ConsumerFactory sourceFactory;
>>> String sourceName = configParams.getRequired("source");
>>> switch (sourceName.toLowerCase(Locale.ENGLISH)) {
>>> case "kinesis":
>>> sourceFactory = FlinkKinesisConsumer::new;
>>> copyOptionalArg(configParams, "aws-region",
>>> sourceProperties, AWSConfigConstants.AWS_REGION);
>>> copyOptionalArg(configParams, "aws-endpoint",
>>> sourceProperties, AWSConfigConstants.AWS_ENDPOINT);
>>> copyOptionalArg(configParams, "aws-access-key",
>>> sourceProperties, AWSConfigConstants.AWS_ACCESS_KEY_ID);
>>> copyOptionalArg(configParams, "aws-secret-key",
>>> sourceProperties, AWSConfigConstants.AWS_SECRET_ACCESS_KEY);
>>> copyOptionalArg(configParams, "aws-profile",
>>> sourceProperties, AWSConfigConstants.AWS_PROFILE_NAME);
>>> break;
>>> case "kafka":
>>> sourceFactory = FlinkKafkaConsumer010::new;
>>> copyRequiredArg(configParams, "bootstrap-server",
>>> sourceProperties, "bootstrap.servers");
>>> copyOptionalArg(configParams, "group-id",
>>> sourceProperties, "group.id");
>>> break;
>>> case &q

Re: EXT :Re: How to debug difference between Kinesis and Kafka

2019-02-19 Thread Stephen Connolly
Though I am explicitly assigning watermarks with
DataStream.assignTimestampsAndWatermarks and I see all the data flowing
through that... so shouldn't that override the watermarks from the original
source?

On Tue, 19 Feb 2019 at 15:59, Martin, Nick  wrote:

> Yeah, that’s expected/known. Watermarks for the empty partition don’t
> advance, so the window in your window function never closes.
>
>
>
> There’s a ticket open to fix it (
> https://issues.apache.org/jira/browse/FLINK-5479) for the kafka
> connector, but in general any time one parallel instance of a source
> function isn’t getting data you have to watch out for this.
>
>
>
> *From:* Stephen Connolly [mailto:stephen.alan.conno...@gmail.com]
> *Sent:* Tuesday, February 19, 2019 6:32 AM
> *To:* user 
> *Subject:* EXT :Re: How to debug difference between Kinesis and Kafka
>
>
>
> Hmmm my suspicions are now quite high. I created a file source that just
> replays the events straight then I get more results
>
>
>
> On Tue, 19 Feb 2019 at 11:50, Stephen Connolly <
> stephen.alan.conno...@gmail.com> wrote:
>
> Hmmm after expanding the dataset such that there was additional data that
> ended up on shard-0 (everything in my original dataset was coincidentally
> landing on shard-1) I am now getting output... should I expect this kind of
> behaviour if no data arrives at shard-0 ever?
>
>
>
> On Tue, 19 Feb 2019 at 11:14, Stephen Connolly <
> stephen.alan.conno...@gmail.com> wrote:
>
> Hi, I’m having a strange situation and I would like to know where I should
> start trying to debug.
>
>
>
> I have set up a configurable swap in source, with three implementations:
>
>
>
> 1. A mock implementation
>
> 2. A Kafka consumer implementation
>
> 3. A Kinesis consumer implementation
>
>
>
> From injecting a log and no-op map function I can see that all three
> sources pass through the events correctly.
>
>
>
> I then have a window based on event time stamps… and from inspecting the
> aggregation function I can see that the data is getting aggregated…, I’m
> using the `.aggregate(AggregateFunction.WindowFunction)` variant so that I
> can retrieve the key
>
>
>
> Here’s the strange thing, I only change the source (and each source uses
> the same deserialization function) but:
>
>
>
>- When I use either Kafka or my Mock source, the WindowFunction gets
>called as events pass the end of the window
>- When I use the Kinesis source, however, the window function never
>gets called. I have even tried injecting events into kinesis with really
>high timestamps to flush the watermarks in my
>BoundedOutOfOrdernessTimestampExtractor... but nothing
>
> I cannot see how this source switching could result in such a different
> behaviour:
>
>
>
> Properties sourceProperties = new Properties();
>
> ConsumerFactory sourceFactory;
>
> String sourceName = configParams.getRequired("source");
>
> switch (sourceName.toLowerCase(Locale.ENGLISH)) {
>
> case "kinesis":
>
> sourceFactory = FlinkKinesisConsumer::new;
>
> copyOptionalArg(configParams, "aws-region",
> sourceProperties, AWSConfigConstants.AWS_REGION);
>
> copyOptionalArg(configParams, "aws-endpoint",
> sourceProperties, AWSConfigConstants.AWS_ENDPOINT);
>
> copyOptionalArg(configParams, "aws-access-key",
> sourceProperties, AWSConfigConstants.AWS_ACCESS_KEY_ID);
>
> copyOptionalArg(configParams, "aws-secret-key",
> sourceProperties, AWSConfigConstants.AWS_SECRET_ACCESS_KEY);
>
> copyOptionalArg(configParams, "aws-profile",
> sourceProperties, AWSConfigConstants.AWS_PROFILE_NAME);
>
> break;
>
> case "kafka":
>
> sourceFactory = FlinkKafkaConsumer010::new;
>
> copyRequiredArg(configParams, "bootstrap-server",
> sourceProperties, "bootstrap.servers");
>
> copyOptionalArg(configParams, "group-id",
> sourceProperties, "group.id");
>
> break;
>
> case "mock":
>
> sourceFactory = MockSourceFunction::new;
>
> break;
>
> default:
>
> throw new RuntimeException("Unknown source '" + sourceName
> + '\'');
>
> }
>
>
>
> // set up the streaming execution environment
>
> final StreamExecutionEnvironment env =
> StreamExecut

Re: How to debug difference between Kinesis and Kafka

2019-02-19 Thread Stephen Connolly
Hmmm my suspicions are now quite high. I created a file source that just
replays the events straight then I get more results

On Tue, 19 Feb 2019 at 11:50, Stephen Connolly <
stephen.alan.conno...@gmail.com> wrote:

> Hmmm after expanding the dataset such that there was additional data that
> ended up on shard-0 (everything in my original dataset was coincidentally
> landing on shard-1) I am now getting output... should I expect this kind of
> behaviour if no data arrives at shard-0 ever?
>
> On Tue, 19 Feb 2019 at 11:14, Stephen Connolly <
> stephen.alan.conno...@gmail.com> wrote:
>
>> Hi, I’m having a strange situation and I would like to know where I
>> should start trying to debug.
>>
>> I have set up a configurable swap in source, with three implementations:
>>
>> 1. A mock implementation
>> 2. A Kafka consumer implementation
>> 3. A Kinesis consumer implementation
>>
>> From injecting a log and no-op map function I can see that all three
>> sources pass through the events correctly.
>>
>> I then have a window based on event time stamps… and from inspecting the
>> aggregation function I can see that the data is getting aggregated…, I’m
>> using the `.aggregate(AggregateFunction.WindowFunction)` variant so that I
>> can retrieve the key
>>
>> Here’s the strange thing, I only change the source (and each source uses
>> the same deserialization function) but:
>>
>>
>>- When I use either Kafka or my Mock source, the WindowFunction gets
>>called as events pass the end of the window
>>- When I use the Kinesis source, however, the window function never
>>gets called. I have even tried injecting events into kinesis with really
>>high timestamps to flush the watermarks in my
>>BoundedOutOfOrdernessTimestampExtractor... but nothing
>>
>> I cannot see how this source switching could result in such a different
>> behaviour:
>>
>> Properties sourceProperties = new Properties();
>> ConsumerFactory sourceFactory;
>> String sourceName = configParams.getRequired("source");
>> switch (sourceName.toLowerCase(Locale.ENGLISH)) {
>> case "kinesis":
>> sourceFactory = FlinkKinesisConsumer::new;
>> copyOptionalArg(configParams, "aws-region",
>> sourceProperties, AWSConfigConstants.AWS_REGION);
>> copyOptionalArg(configParams, "aws-endpoint",
>> sourceProperties, AWSConfigConstants.AWS_ENDPOINT);
>> copyOptionalArg(configParams, "aws-access-key",
>> sourceProperties, AWSConfigConstants.AWS_ACCESS_KEY_ID);
>> copyOptionalArg(configParams, "aws-secret-key",
>> sourceProperties, AWSConfigConstants.AWS_SECRET_ACCESS_KEY);
>> copyOptionalArg(configParams, "aws-profile",
>> sourceProperties, AWSConfigConstants.AWS_PROFILE_NAME);
>> break;
>> case "kafka":
>> sourceFactory = FlinkKafkaConsumer010::new;
>> copyRequiredArg(configParams, "bootstrap-server",
>> sourceProperties, "bootstrap.servers");
>> copyOptionalArg(configParams, "group-id",
>> sourceProperties, "group.id");
>> break;
>> case "mock":
>> sourceFactory = MockSourceFunction::new;
>> break;
>> default:
>> throw new RuntimeException("Unknown source '" +
>> sourceName + '\'');
>> }
>>
>> // set up the streaming execution environment
>> final StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>
>> // poll watermark every second because using
>> BoundedOutOfOrdernessTimestampExtractor
>> env.getConfig().setAutoWatermarkInterval(1000L);
>> env.enableCheckpointing(5000);
>>
>> SplitStream eventsByType =
>> env.addSource(sourceFactory.create(
>> configParams.getRequired("topic"),
>> new ObjectNodeDeserializationSchema(),
>> sourceProperties
>> ))
>> .returns(ObjectNode.class) // the use of ConsumerFactory
>> erases the type info so add it back
>> .name("raw-events")
>> .assignTimestampsAndWatermarks(
>> new
>> ObjectNodeBoundedOutOfOrdernessT

Re: How to debug difference between Kinesis and Kafka

2019-02-19 Thread Stephen Connolly
Hmmm after expanding the dataset such that there was additional data that
ended up on shard-0 (everything in my original dataset was coincidentally
landing on shard-1) I am now getting output... should I expect this kind of
behaviour if no data arrives at shard-0 ever?

On Tue, 19 Feb 2019 at 11:14, Stephen Connolly <
stephen.alan.conno...@gmail.com> wrote:

> Hi, I’m having a strange situation and I would like to know where I should
> start trying to debug.
>
> I have set up a configurable swap in source, with three implementations:
>
> 1. A mock implementation
> 2. A Kafka consumer implementation
> 3. A Kinesis consumer implementation
>
> From injecting a log and no-op map function I can see that all three
> sources pass through the events correctly.
>
> I then have a window based on event time stamps… and from inspecting the
> aggregation function I can see that the data is getting aggregated…, I’m
> using the `.aggregate(AggregateFunction.WindowFunction)` variant so that I
> can retrieve the key
>
> Here’s the strange thing, I only change the source (and each source uses
> the same deserialization function) but:
>
>
>- When I use either Kafka or my Mock source, the WindowFunction gets
>called as events pass the end of the window
>- When I use the Kinesis source, however, the window function never
>gets called. I have even tried injecting events into kinesis with really
>high timestamps to flush the watermarks in my
>BoundedOutOfOrdernessTimestampExtractor... but nothing
>
> I cannot see how this source switching could result in such a different
> behaviour:
>
> Properties sourceProperties = new Properties();
> ConsumerFactory sourceFactory;
> String sourceName = configParams.getRequired("source");
> switch (sourceName.toLowerCase(Locale.ENGLISH)) {
> case "kinesis":
> sourceFactory = FlinkKinesisConsumer::new;
> copyOptionalArg(configParams, "aws-region",
> sourceProperties, AWSConfigConstants.AWS_REGION);
> copyOptionalArg(configParams, "aws-endpoint",
> sourceProperties, AWSConfigConstants.AWS_ENDPOINT);
> copyOptionalArg(configParams, "aws-access-key",
> sourceProperties, AWSConfigConstants.AWS_ACCESS_KEY_ID);
> copyOptionalArg(configParams, "aws-secret-key",
> sourceProperties, AWSConfigConstants.AWS_SECRET_ACCESS_KEY);
> copyOptionalArg(configParams, "aws-profile",
> sourceProperties, AWSConfigConstants.AWS_PROFILE_NAME);
> break;
> case "kafka":
> sourceFactory = FlinkKafkaConsumer010::new;
> copyRequiredArg(configParams, "bootstrap-server",
> sourceProperties, "bootstrap.servers");
> copyOptionalArg(configParams, "group-id",
> sourceProperties, "group.id");
> break;
> case "mock":
> sourceFactory = MockSourceFunction::new;
> break;
> default:
> throw new RuntimeException("Unknown source '" + sourceName
> + '\'');
> }
>
> // set up the streaming execution environment
> final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
> // poll watermark every second because using
> BoundedOutOfOrdernessTimestampExtractor
> env.getConfig().setAutoWatermarkInterval(1000L);
> env.enableCheckpointing(5000);
>
> SplitStream eventsByType =
> env.addSource(sourceFactory.create(
> configParams.getRequired("topic"),
> new ObjectNodeDeserializationSchema(),
> sourceProperties
> ))
> .returns(ObjectNode.class) // the use of ConsumerFactory
> erases the type info so add it back
> .name("raw-events")
> .assignTimestampsAndWatermarks(
> new
> ObjectNodeBoundedOutOfOrdernessTimestampExtractor("timestamp",
> Time.seconds(5))
> )
> .split(new JsonNodeOutputSelector("eventType"));
> ...
> eventsByType.select(...)
> .keyBy(new JsonNodeStringKeySelector("_key"))
>
> .window(TumblingEventOffsetPerKeyEventTimeWindows.of(Time.seconds(windowDuration),
> (KeySelector)
> TasksMain::offsetPerMaster))
> .trigger(EventTimeTrigger.create())
> .aggregate(new CountsAggregator<>(), new KeyTagger<>()) //
> < The CountsAggregator is seeing the data
> .print() // < HERE is where we get no output from
> Kinesis... but Kafka and my Mock are just fine!
>
>
>


How to debug difference between Kinesis and Kafka

2019-02-19 Thread Stephen Connolly
Hi, I’m having a strange situation and I would like to know where I should
start trying to debug.

I have set up a configurable swap in source, with three implementations:

1. A mock implementation
2. A Kafka consumer implementation
3. A Kinesis consumer implementation

>From injecting a log and no-op map function I can see that all three
sources pass through the events correctly.

I then have a window based on event time stamps… and from inspecting the
aggregation function I can see that the data is getting aggregated…, I’m
using the `.aggregate(AggregateFunction.WindowFunction)` variant so that I
can retrieve the key

Here’s the strange thing, I only change the source (and each source uses
the same deserialization function) but:


   - When I use either Kafka or my Mock source, the WindowFunction gets
   called as events pass the end of the window
   - When I use the Kinesis source, however, the window function never gets
   called. I have even tried injecting events into kinesis with really high
   timestamps to flush the watermarks in my
   BoundedOutOfOrdernessTimestampExtractor... but nothing

I cannot see how this source switching could result in such a different
behaviour:

Properties sourceProperties = new Properties();
ConsumerFactory sourceFactory;
String sourceName = configParams.getRequired("source");
switch (sourceName.toLowerCase(Locale.ENGLISH)) {
case "kinesis":
sourceFactory = FlinkKinesisConsumer::new;
copyOptionalArg(configParams, "aws-region",
sourceProperties, AWSConfigConstants.AWS_REGION);
copyOptionalArg(configParams, "aws-endpoint",
sourceProperties, AWSConfigConstants.AWS_ENDPOINT);
copyOptionalArg(configParams, "aws-access-key",
sourceProperties, AWSConfigConstants.AWS_ACCESS_KEY_ID);
copyOptionalArg(configParams, "aws-secret-key",
sourceProperties, AWSConfigConstants.AWS_SECRET_ACCESS_KEY);
copyOptionalArg(configParams, "aws-profile",
sourceProperties, AWSConfigConstants.AWS_PROFILE_NAME);
break;
case "kafka":
sourceFactory = FlinkKafkaConsumer010::new;
copyRequiredArg(configParams, "bootstrap-server",
sourceProperties, "bootstrap.servers");
copyOptionalArg(configParams, "group-id", sourceProperties,
"group.id");
break;
case "mock":
sourceFactory = MockSourceFunction::new;
break;
default:
throw new RuntimeException("Unknown source '" + sourceName
+ '\'');
}

// set up the streaming execution environment
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

// poll watermark every second because using
BoundedOutOfOrdernessTimestampExtractor
env.getConfig().setAutoWatermarkInterval(1000L);
env.enableCheckpointing(5000);

SplitStream eventsByType =
env.addSource(sourceFactory.create(
configParams.getRequired("topic"),
new ObjectNodeDeserializationSchema(),
sourceProperties
))
.returns(ObjectNode.class) // the use of ConsumerFactory
erases the type info so add it back
.name("raw-events")
.assignTimestampsAndWatermarks(
new
ObjectNodeBoundedOutOfOrdernessTimestampExtractor("timestamp",
Time.seconds(5))
)
.split(new JsonNodeOutputSelector("eventType"));
...
eventsByType.select(...)
.keyBy(new JsonNodeStringKeySelector("_key"))

.window(TumblingEventOffsetPerKeyEventTimeWindows.of(Time.seconds(windowDuration),
(KeySelector)
TasksMain::offsetPerMaster))
.trigger(EventTimeTrigger.create())
.aggregate(new CountsAggregator<>(), new KeyTagger<>()) //
< The CountsAggregator is seeing the data
.print() // < HERE is where we get no output from
Kinesis... but Kafka and my Mock are just fine!


Re: [ANNOUNCE] New Flink PMC member Thomas Weise

2019-02-12 Thread Stephen Connolly
Congratulations to Thomas. I see that this is not his first time in the PMC
rodeo... also somebody needs to update LDAP as he's not on
https://people.apache.org/phonebook.html?pmc=flink yet!

-stephenc

On Tue, 12 Feb 2019 at 09:59, Fabian Hueske  wrote:

> Hi everyone,
>
> On behalf of the Flink PMC I am happy to announce Thomas Weise as a new
> member of the Apache Flink PMC.
>
> Thomas is a long time contributor and member of our community.
> He is starting and participating in lots of discussions on our mailing
> lists, working on topics that are of joint interest of Flink and Beam, and
> giving talks on Flink at many events.
>
> Please join me in welcoming and congratulating Thomas!
>
> Best,
> Fabian
>


Re: Anyone tried to do blue-green topology deployments?

2019-02-11 Thread Stephen Connolly
On Mon, 11 Feb 2019 at 14:10, Stephen Connolly <
stephen.alan.conno...@gmail.com> wrote:

> Another possibility would be injecting pseudo events into the source and
> having a stateful filter.
>
> The event would be something like “key X is now owned by green”.
>
> I can do that because getting a list of keys seen in the past X minutes is
> cheap (we have it already)
>
> But it’s unclear what impact would be adding such state to the filter
>

Hmmm might not need to be quite so stateful, if the filter was implemented
as a BroadcastProcessFunction or a KeyedBroadcastProcessFunction, I could
run the key -> threshold and compare to the level from the Broadcast
context... that way the broadcast events wouldn't need to be associated
with any specific key and could just be {"level":56}


>
> On Mon 11 Feb 2019 at 13:33, Stephen Connolly <
> stephen.alan.conno...@gmail.com> wrote:
>
>>
>>
>> On Mon, 11 Feb 2019 at 13:26, Stephen Connolly <
>> stephen.alan.conno...@gmail.com> wrote:
>>
>>> I have my main application updating with a blue-green deployment
>>> strategy whereby a new version (always called green) starts receiving an
>>> initial fraction of the web traffic and then - based on the error rates -
>>> we progress the % of traffic until 100% of traffic is being handled by the
>>> green version. At which point we decommission blue and green is the new
>>> blue when the next version comes along.
>>>
>>> Applied to Flink, my initial thought is that you would run the two
>>> topologies in parallel, but the first action of each topology would be a
>>> filter based on the key.
>>>
>>> You basically would use a consistent transformation of the key into a
>>> number between 0 and 100 and the filter would be:
>>>
>>> (key) -> color == green ? f(key) < level : f(key) >= level
>>>
>>> Then I can use a suitable metric to determine if the new topology is
>>> working and ramp up or down the level.
>>>
>>> One issue I foresee is what happens if the level changes mid-window, I
>>> will have output from both topologies when the window ends.
>>>
>>> In the case of my output, which is aggregatable, I will get the same
>>> results from two rows as from one row *provided* that the switch from blue
>>> to green is synchronized between the two topologies. That sounds like a
>>> hard problem though.
>>>
>>>
>>> Another thought I had was to let the web front-end decide based on the
>>> same key vs level approach. Rather than submit the raw event, I would add
>>> the target topology to the event and the filter just selects based on
>>> whether it is the target topology. This has the advantage that I know each
>>> event will only ever be processed by one of green or blue. Heck I could
>>> even use the main web application's blue-green deployment to drive the
>>> flink blue green deployment
>>>
>>
>> In other words, if a blue web node receives an event upload it adds
>> "blue", whereas if a green web node receives an event upload it adds
>> "green" (not quite those strings but rather the web deployment sequence
>> number). This has the advantage that the web nodes do not need to parse the
>> event payload. The % of web traffic will result in the matching % of events
>> being sent to blue and green. Also this means that all keys get processed
>> at the target % during the deployment, which can help flush out bugs.
>>
>> I can therefore stop the old topology at > 1 window after the green web
>> node started getting 100% of traffic in order to allow any existing windows
>> in flight to flush all the way to the datastore...
>>
>> Out of order events would be tagged as green once green is 100% of
>> traffic, and so can be processed correctly...
>>
>> And I can completely ignore topology migration serialization issues...
>>
>> Sounding very tempting... there must be something wrong...
>>
>> (or maybe my data storage plan just allows me to make this kind of
>> optimization!)
>>
>>
>>> as due to the way I structure my results I don't care if I get two rows
>>> of counts for a time window or one row of counts, because I'm adding up the
>>> total counts across multiple rows and sum is sum!
>>>
>>
>>>
>>> Anyone else had to try and deal with this type of thing?
>>>
>>> -stephenc
>>>
>>>
>>> --
> Sent from my phone
>


Re: Anyone tried to do blue-green topology deployments?

2019-02-11 Thread Stephen Connolly
Another possibility would be injecting pseudo events into the source and
having a stateful filter.

The event would be something like “key X is now owned by green”.

I can do that because getting a list of keys seen in the past X minutes is
cheap (we have it already)

But it’s unclear what impact would be adding such state to the filter

On Mon 11 Feb 2019 at 13:33, Stephen Connolly <
stephen.alan.conno...@gmail.com> wrote:

>
>
> On Mon, 11 Feb 2019 at 13:26, Stephen Connolly <
> stephen.alan.conno...@gmail.com> wrote:
>
>> I have my main application updating with a blue-green deployment strategy
>> whereby a new version (always called green) starts receiving an initial
>> fraction of the web traffic and then - based on the error rates - we
>> progress the % of traffic until 100% of traffic is being handled by the
>> green version. At which point we decommission blue and green is the new
>> blue when the next version comes along.
>>
>> Applied to Flink, my initial thought is that you would run the two
>> topologies in parallel, but the first action of each topology would be a
>> filter based on the key.
>>
>> You basically would use a consistent transformation of the key into a
>> number between 0 and 100 and the filter would be:
>>
>> (key) -> color == green ? f(key) < level : f(key) >= level
>>
>> Then I can use a suitable metric to determine if the new topology is
>> working and ramp up or down the level.
>>
>> One issue I foresee is what happens if the level changes mid-window, I
>> will have output from both topologies when the window ends.
>>
>> In the case of my output, which is aggregatable, I will get the same
>> results from two rows as from one row *provided* that the switch from blue
>> to green is synchronized between the two topologies. That sounds like a
>> hard problem though.
>>
>>
>> Another thought I had was to let the web front-end decide based on the
>> same key vs level approach. Rather than submit the raw event, I would add
>> the target topology to the event and the filter just selects based on
>> whether it is the target topology. This has the advantage that I know each
>> event will only ever be processed by one of green or blue. Heck I could
>> even use the main web application's blue-green deployment to drive the
>> flink blue green deployment
>>
>
> In other words, if a blue web node receives an event upload it adds
> "blue", whereas if a green web node receives an event upload it adds
> "green" (not quite those strings but rather the web deployment sequence
> number). This has the advantage that the web nodes do not need to parse the
> event payload. The % of web traffic will result in the matching % of events
> being sent to blue and green. Also this means that all keys get processed
> at the target % during the deployment, which can help flush out bugs.
>
> I can therefore stop the old topology at > 1 window after the green web
> node started getting 100% of traffic in order to allow any existing windows
> in flight to flush all the way to the datastore...
>
> Out of order events would be tagged as green once green is 100% of
> traffic, and so can be processed correctly...
>
> And I can completely ignore topology migration serialization issues...
>
> Sounding very tempting... there must be something wrong...
>
> (or maybe my data storage plan just allows me to make this kind of
> optimization!)
>
>
>> as due to the way I structure my results I don't care if I get two rows
>> of counts for a time window or one row of counts, because I'm adding up the
>> total counts across multiple rows and sum is sum!
>>
>
>>
>> Anyone else had to try and deal with this type of thing?
>>
>> -stephenc
>>
>>
>> --
Sent from my phone


Re: Anyone tried to do blue-green topology deployments?

2019-02-11 Thread Stephen Connolly
On Mon, 11 Feb 2019 at 13:26, Stephen Connolly <
stephen.alan.conno...@gmail.com> wrote:

> I have my main application updating with a blue-green deployment strategy
> whereby a new version (always called green) starts receiving an initial
> fraction of the web traffic and then - based on the error rates - we
> progress the % of traffic until 100% of traffic is being handled by the
> green version. At which point we decommission blue and green is the new
> blue when the next version comes along.
>
> Applied to Flink, my initial thought is that you would run the two
> topologies in parallel, but the first action of each topology would be a
> filter based on the key.
>
> You basically would use a consistent transformation of the key into a
> number between 0 and 100 and the filter would be:
>
> (key) -> color == green ? f(key) < level : f(key) >= level
>
> Then I can use a suitable metric to determine if the new topology is
> working and ramp up or down the level.
>
> One issue I foresee is what happens if the level changes mid-window, I
> will have output from both topologies when the window ends.
>
> In the case of my output, which is aggregatable, I will get the same
> results from two rows as from one row *provided* that the switch from blue
> to green is synchronized between the two topologies. That sounds like a
> hard problem though.
>
>
> Another thought I had was to let the web front-end decide based on the
> same key vs level approach. Rather than submit the raw event, I would add
> the target topology to the event and the filter just selects based on
> whether it is the target topology. This has the advantage that I know each
> event will only ever be processed by one of green or blue. Heck I could
> even use the main web application's blue-green deployment to drive the
> flink blue green deployment
>

In other words, if a blue web node receives an event upload it adds "blue",
whereas if a green web node receives an event upload it adds "green" (not
quite those strings but rather the web deployment sequence number). This
has the advantage that the web nodes do not need to parse the event
payload. The % of web traffic will result in the matching % of events being
sent to blue and green. Also this means that all keys get processed at the
target % during the deployment, which can help flush out bugs.

I can therefore stop the old topology at > 1 window after the green web
node started getting 100% of traffic in order to allow any existing windows
in flight to flush all the way to the datastore...

Out of order events would be tagged as green once green is 100% of traffic,
and so can be processed correctly...

And I can completely ignore topology migration serialization issues...

Sounding very tempting... there must be something wrong...

(or maybe my data storage plan just allows me to make this kind of
optimization!)


> as due to the way I structure my results I don't care if I get two rows of
> counts for a time window or one row of counts, because I'm adding up the
> total counts across multiple rows and sum is sum!
>

>
> Anyone else had to try and deal with this type of thing?
>
> -stephenc
>
>
>


Anyone tried to do blue-green topology deployments?

2019-02-11 Thread Stephen Connolly
I have my main application updating with a blue-green deployment strategy
whereby a new version (always called green) starts receiving an initial
fraction of the web traffic and then - based on the error rates - we
progress the % of traffic until 100% of traffic is being handled by the
green version. At which point we decommission blue and green is the new
blue when the next version comes along.

Applied to Flink, my initial thought is that you would run the two
topologies in parallel, but the first action of each topology would be a
filter based on the key.

You basically would use a consistent transformation of the key into a
number between 0 and 100 and the filter would be:

(key) -> color == green ? f(key) < level : f(key) >= level

Then I can use a suitable metric to determine if the new topology is
working and ramp up or down the level.

One issue I foresee is what happens if the level changes mid-window, I will
have output from both topologies when the window ends.

In the case of my output, which is aggregatable, I will get the same
results from two rows as from one row *provided* that the switch from blue
to green is synchronized between the two topologies. That sounds like a
hard problem though.


Another thought I had was to let the web front-end decide based on the same
key vs level approach. Rather than submit the raw event, I would add the
target topology to the event and the filter just selects based on whether
it is the target topology. This has the advantage that I know each event
will only ever be processed by one of green or blue. Heck I could even use
the main web application's blue-green deployment to drive the flink blue
green deployment as due to the way I structure my results I don't care if I
get two rows of counts for a time window or one row of counts, because I'm
adding up the total counts across multiple rows and sum is sum!


Anyone else had to try and deal with this type of thing?

-stephenc


Re: Reduce one event under multiple keys

2019-02-11 Thread Stephen Connolly
On Mon, 11 Feb 2019 at 09:42, Fabian Hueske  wrote:

> Hi Stephen,
>
> A window is created with the first record that is assigned to it.
> If the windows are based on time and a key, than no window will be created
> (and not space be occupied) if there is not a first record for a key and
> time interval.
>
> Anyway, if tracking the number of open files & average opening time is
> your use case, you might want to implement the logic with a ProcessFunction
> instead of a window.
> The reason is that it is that time windows don't share state, i.e., the
> information about an opened but not yet closed file would not be "carried
> over" to the next window.
> However, if you use a ProcessFunction, you are responsible for cleaning up
> the state.
>

Ahh but I am cheating by ensuring the events are rich enough that I do not
need to match them.

I get the "open" (they are not really "open" events - I have mapped to an
analogy... it might be more like a build job start events... or not... I'm
not at liberty to say ;-) ) events because I need to count the number of
"open"s per time period.

I get the "close" events and they include the duration plus other
information that can then be transformed into the required metrics... yes I
could derive the "open" from the "close" by subtracting the duration but:

1. they would cross window boundaries quite often, leading to repeated
fetch-update-write operations on the backing data store
2. they wouldn't be as "live" and one of the things we need to know is how
many "open"s there are in the previous window... given some durations can
be many days, waiting for the "close" event to create the "open" metric
would not be a good plan.

Basically, I am pushing some of the calculations to the edge where there is
state that makes those calculations cheap and then the rich events are
*hopefully* easy to aggregate with just simple aggregation functions that
only need to maintain the running total... at least that's what the PoC I
am experimenting with Flink should show


>
> Hope this helps,
> Fabian
>
> Am So., 10. Feb. 2019 um 20:36 Uhr schrieb Stephen Connolly <
> stephen.alan.conno...@gmail.com>:
>
>>
>>
>> On Sun, 10 Feb 2019 at 09:09, Chesnay Schepler 
>> wrote:
>>
>>> This sounds reasonable to me.
>>>
>>> I'm a bit confused by this question: "*Additionally, I am (naïevely)
>>> hoping that if a window has no events for a particular key, the
>>> memory/storage costs are zero for that key.*"
>>>
>>> Are you asking whether a key that was received in window X (as part of
>>> an event) is still present in window x+1? If so, then the answer is no; a
>>> key will only be present in a given window if an event was received that
>>> fits into that window.
>>>
>>
>> To confirm:
>>
>> So let's say I'l tracking the average time a file is opened in folders.
>>
>> In window N we get the events:
>>
>> {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/README.txt"}
>>
>> {"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/README.txt","duration":"67"}
>> {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/User guide.txt"}
>> {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/Admin guide.txt"}
>>
>> So there will be aggregates stored for
>> ("ca:fe:ba:be","/"), ("ca:fe:ba:be","/foo"), ("ca:fe:ba:be","/foo/bar"),
>> ("ca:fe:ba:be","/foo/bar/README.txt"), etc
>>
>> In window N+1 we do not get any events at all.
>>
>> So the memory used by my aggregation functions from window N will be
>> freed and the storage will be effectively zero (modulo any follow on
>> processing that might be on a longer window)
>>
>> This seems to be what you are saying... in which case my naïeve hope was
>> not so naïve! w00t!
>>
>>
>>>
>>> On 08.02.2019 13:21, Stephen Connolly wrote:
>>>
>>> Ok, I'll try and map my problem into something that should be familiar
>>> to most people.
>>>
>>> Consider collection of PCs, each of which has a unique ID, e.g.
>>> ca:fe:ba:be, de:ad:be:ef, etc.
>>>
>>> Each PC has a tree of local files. Some of the file paths are
>>> coincidentally the same name

Re: Is there a windowing strategy that allows a different offset per key?

2019-02-11 Thread Stephen Connolly
On Mon, 11 Feb 2019 at 09:54, Fabian Hueske  wrote:

> Hi Stephen,
>
> First of all, yes, windows computing and emitting at the same time can
> cause pressure on the downstream system.
>
> There are a few ways how you can achieve this:
> * use a custom window assigner. A window assigner decides into which
> window a record is assigned. This is the approach you suggested.
>

Thanks for the link. Yes I think the custom window assigner is most
certainly the way to go for my use case. Even more specifically because the
offsets I want to use are going to be based on a subset of the assigned key
not the full assigned key (if you see my other mails this week, the key I
window is a composite key of (id,path) but I want to have all the offsets
for any specific id be the same, irrespective of the path, so the
theoretical need of access to the full key that was driving Rong's original
idea for an RFE to the WindowAssignerContext is not even necessary for my
case)


> * use a regular window and add an operator that buffers the window results
> and releases them with randomized delay.
> * use a ProcessFunction which allows you to control the timing of
> computations yourself.
>
> A few months ago, there was a similar discussion on the dev mailing list
> [1] (didn't read the thread) started by Rong (in CC).
> Maybe, he can share some ideas / experiences as well.
>

Would be awesome if Rong can share any learnings he has encountered since


>
> Cheers,
> Fabian
>
> [1]
> https://lists.apache.org/thread.html/0d1e41302b89378f88693bf4fdb52c23d4b240160b5a10c163d9c46c@%3Cdev.flink.apache.org%3E
>
>
> Am So., 10. Feb. 2019 um 21:03 Uhr schrieb Stephen Connolly <
> stephen.alan.conno...@gmail.com>:
>
>> Looking into the code in TumblingEventTimeWindows:
>>
>> @Override
>> public Collection assignWindows(Object element, long
>> timestamp, WindowAssignerContext context) {
>> if (timestamp > Long.MIN_VALUE) {
>> // Long.MIN_VALUE is currently assigned when no timestamp is present
>> long start = TimeWindow.getWindowStartWithOffset(timestamp, offset, size);
>> return Collections.singletonList(new TimeWindow(start, start + size));
>> } else {
>> throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no
>> timestamp marker). " +
>> "Is the time characteristic set to 'ProcessingTime', or did you forget to
>> call " +
>> "'DataStream.assignTimestampsAndWatermarks(...)'?");
>> }
>> }
>>
>> So I think I can just write my own where the offset is derived from
>> hashing the element using my hash function.
>>
>> Good plan or bad plan?
>>
>>
>> On Sun, 10 Feb 2019 at 19:55, Stephen Connolly <
>> stephen.alan.conno...@gmail.com> wrote:
>>
>>> I would like to process a stream of data firom different customers,
>>> producing output say once every 15 minutes. The results will then be loaded
>>> into another system for stoage and querying.
>>>
>>> I have been using TumblingEventTimeWindows in my prototype, but I am
>>> concerned that all the windows will start and stop at the same time and
>>> cause batch load effects on the back-end data store.
>>>
>>> What I think I would like is that the windows could have a different
>>> start offset for each key, (using a hash function that I would supply)
>>>
>>> Thus deterministically, key "ca:fe:ba:be" would always start based on an
>>> initail offset of 00:07 UTC while say key "de:ad:be:ef" would always start
>>> based on an initial offset of say 00:02 UTC
>>>
>>> Is this possible? Or do I just have to find some way of queuing up my
>>> writes using back-pressure?
>>>
>>> Thanks in advance
>>>
>>> -stephenc
>>>
>>> P.S. I can trade assistance with Flink for assistance with Maven or
>>> Jenkins if my questions are too wierysome!
>>>
>>


Re: Is there a windowing strategy that allows a different offset per key?

2019-02-10 Thread Stephen Connolly
Looking into the code in TumblingEventTimeWindows:

@Override
public Collection assignWindows(Object element, long timestamp,
WindowAssignerContext context) {
if (timestamp > Long.MIN_VALUE) {
// Long.MIN_VALUE is currently assigned when no timestamp is present
long start = TimeWindow.getWindowStartWithOffset(timestamp, offset, size);
return Collections.singletonList(new TimeWindow(start, start + size));
} else {
throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no
timestamp marker). " +
"Is the time characteristic set to 'ProcessingTime', or did you forget to
call " +
"'DataStream.assignTimestampsAndWatermarks(...)'?");
}
}

So I think I can just write my own where the offset is derived from hashing
the element using my hash function.

Good plan or bad plan?


On Sun, 10 Feb 2019 at 19:55, Stephen Connolly <
stephen.alan.conno...@gmail.com> wrote:

> I would like to process a stream of data firom different customers,
> producing output say once every 15 minutes. The results will then be loaded
> into another system for stoage and querying.
>
> I have been using TumblingEventTimeWindows in my prototype, but I am
> concerned that all the windows will start and stop at the same time and
> cause batch load effects on the back-end data store.
>
> What I think I would like is that the windows could have a different start
> offset for each key, (using a hash function that I would supply)
>
> Thus deterministically, key "ca:fe:ba:be" would always start based on an
> initail offset of 00:07 UTC while say key "de:ad:be:ef" would always start
> based on an initial offset of say 00:02 UTC
>
> Is this possible? Or do I just have to find some way of queuing up my
> writes using back-pressure?
>
> Thanks in advance
>
> -stephenc
>
> P.S. I can trade assistance with Flink for assistance with Maven or
> Jenkins if my questions are too wierysome!
>


Is there a windowing strategy that allows a different offset per key?

2019-02-10 Thread Stephen Connolly
I would like to process a stream of data firom different customers,
producing output say once every 15 minutes. The results will then be loaded
into another system for stoage and querying.

I have been using TumblingEventTimeWindows in my prototype, but I am
concerned that all the windows will start and stop at the same time and
cause batch load effects on the back-end data store.

What I think I would like is that the windows could have a different start
offset for each key, (using a hash function that I would supply)

Thus deterministically, key "ca:fe:ba:be" would always start based on an
initail offset of 00:07 UTC while say key "de:ad:be:ef" would always start
based on an initial offset of say 00:02 UTC

Is this possible? Or do I just have to find some way of queuing up my
writes using back-pressure?

Thanks in advance

-stephenc

P.S. I can trade assistance with Flink for assistance with Maven or Jenkins
if my questions are too wierysome!


Re: Reduce one event under multiple keys

2019-02-10 Thread Stephen Connolly
On Sun, 10 Feb 2019 at 09:09, Chesnay Schepler  wrote:

> This sounds reasonable to me.
>
> I'm a bit confused by this question: "*Additionally, I am (naïevely)
> hoping that if a window has no events for a particular key, the
> memory/storage costs are zero for that key.*"
>
> Are you asking whether a key that was received in window X (as part of an
> event) is still present in window x+1? If so, then the answer is no; a key
> will only be present in a given window if an event was received that fits
> into that window.
>

To confirm:

So let's say I'l tracking the average time a file is opened in folders.

In window N we get the events:

{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/README.txt"}
{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/README.txt","duration":"67"}
{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/User guide.txt"}
{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/Admin guide.txt"}

So there will be aggregates stored for
("ca:fe:ba:be","/"), ("ca:fe:ba:be","/foo"), ("ca:fe:ba:be","/foo/bar"),
("ca:fe:ba:be","/foo/bar/README.txt"), etc

In window N+1 we do not get any events at all.

So the memory used by my aggregation functions from window N will be freed
and the storage will be effectively zero (modulo any follow on processing
that might be on a longer window)

This seems to be what you are saying... in which case my naïeve hope was
not so naïve! w00t!


>
> On 08.02.2019 13:21, Stephen Connolly wrote:
>
> Ok, I'll try and map my problem into something that should be familiar to
> most people.
>
> Consider collection of PCs, each of which has a unique ID, e.g.
> ca:fe:ba:be, de:ad:be:ef, etc.
>
> Each PC has a tree of local files. Some of the file paths are
> coincidentally the same names, but there is no file sharing between PCs.
>
> I need to produce metrics about how often files are opened and how long
> they are open for.
>
> I need for every X minute tumbling window not just the cumulative averages
> for each PC, but the averages for each file as well as the cumulative
> averegaes for each folder and their sub-folders.
>
> I have a stream of events like
>
>
> {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/README.txt"}{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/README.txt","duration":"67"}{"source":"de:ad:be:ef","action":"open","path":"/foo/manchu/README.txt"}
> {"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/User
> guide.txt"}{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/Admin
> guide.txt"}{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/User
> guide.txt","duration":"97"}{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/Admin
> guide.txt","duration":"196"}
> {"source":"ca:fe:ba:be","action":"open","path":"/foo/manchu/README.txt"}
> {"source":"de:ad:be:ef","action":"open","path":"/bar/foo/README.txt"}
>
> So from that I would like to know stuff like:
>
> ca:fe:ba:be had 4/X opens per minute in the X minute window
> ca:fe:ba:be had 3/X closes per minute in the X minute window and the
> average time open was (67+97+197)/3=120... there is no guarantee that the
> closes will be matched with opens in the same window, which is why I'm only
> tracking them separately
> de:ad:be:ef had 2/X opens per minute in the X minute window
> ca:fe:ba:be /foo had 4/X opens per minute in the X minute window
> ca:fe:ba:be /foo had 3/X closes per minute in the X minute window and the
> average time open was 120
> de:ad:be:ef /foo had 1/X opens per minute in the X minute window
> de:ad:be:ef /bar had 1/X opens per minute in the X minute window
> de:ad:be:ef /foo/manchu had 1/X opens per minute in the X minute window
> de:ad:be:ef /bar/foo had 1/X opens per minute in the X minute window
> de:ad:be:ef /foo/manchu/README.txt had 1/X opens per minute in the X
> minute window
> de:ad:be:ef /bar/foo/R

Re: Can an Aggregate the key from a WindowedStream.aggregate()

2019-02-10 Thread Stephen Connolly
On Sun, 10 Feb 2019 at 09:48, Chesnay Schepler  wrote:

> There are also versions of WindowedStream#aggregate that accept an
> additional WindowFunction/ProcessWindowFunction, which do have access to
> the key via apply()/process() respectively. These functions are called
> post aggregation.
>

Cool I'll chase those down


>
> On 08.02.2019 18:24, stephen.alan.conno...@gmail.com wrote:
> > If I write my aggregation logic as a WindowFunction then I get access to
> the key as the first parameter in WindowFunction.apply(...) however the
> Javadocs for calling WindowedStream.apply(WindowFunction) state:
> >
> >> Note that this function requires that all data in the windows is
> buffered until the window
> >> is evaluated, as the function provides no means of incremental
> aggregation.
> > Which sounds bad.
> >
> > It seems the recommended alternative is to use one of the
> WindowFunction.aggregate(AggregateFunction) however I cannot see how to get
> access to the key...
> >
> > Is my only solution to transform my data into a Tuple if I need access
> to the key post aggregation?
> >
> > Thanks in advance
> >
> > -stephenc
> >
>
>


Reduce one event under multiple keys

2019-02-08 Thread Stephen Connolly
Ok, I'll try and map my problem into something that should be familiar to
most people.

Consider collection of PCs, each of which has a unique ID, e.g.
ca:fe:ba:be, de:ad:be:ef, etc.

Each PC has a tree of local files. Some of the file paths are
coincidentally the same names, but there is no file sharing between PCs.

I need to produce metrics about how often files are opened and how long
they are open for.

I need for every X minute tumbling window not just the cumulative averages
for each PC, but the averages for each file as well as the cumulative
averegaes for each folder and their sub-folders.

I have a stream of events like

{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/README.txt"}{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/README.txt","duration":"67"}{"source":"de:ad:be:ef","action":"open","path":"/foo/manchu/README.txt"}
{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/User
guide.txt"}{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/Admin
guide.txt"}{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/User
guide.txt","duration":"97"}{"source":"ca:fe:ba:be","action":"close","path":"/foo/bar/Admin
guide.txt","duration":"196"}
{"source":"ca:fe:ba:be","action":"open","path":"/foo/manchu/README.txt"}
{"source":"de:ad:be:ef","action":"open","path":"/bar/foo/README.txt"}

So from that I would like to know stuff like:

ca:fe:ba:be had 4/X opens per minute in the X minute window
ca:fe:ba:be had 3/X closes per minute in the X minute window and the
average time open was (67+97+197)/3=120... there is no guarantee that the
closes will be matched with opens in the same window, which is why I'm only
tracking them separately
de:ad:be:ef had 2/X opens per minute in the X minute window
ca:fe:ba:be /foo had 4/X opens per minute in the X minute window
ca:fe:ba:be /foo had 3/X closes per minute in the X minute window and the
average time open was 120
de:ad:be:ef /foo had 1/X opens per minute in the X minute window
de:ad:be:ef /bar had 1/X opens per minute in the X minute window
de:ad:be:ef /foo/manchu had 1/X opens per minute in the X minute window
de:ad:be:ef /bar/foo had 1/X opens per minute in the X minute window
de:ad:be:ef /foo/manchu/README.txt had 1/X opens per minute in the X minute
window
de:ad:be:ef /bar/foo/README.txt had 1/X opens per minute in the X minute
window
etc

What I think I want to do is turn each event into a series of events with
different keys, so that

{"source":"ca:fe:ba:be","action":"open","path":"/foo/bar/README.txt"}

gets sent under the keys:

("ca:fe:ba:be","/")
("ca:fe:ba:be","/foo")
("ca:fe:ba:be","/foo/bar")
("ca:fe:ba:be","/foo/bar/README.txt")

Then I could use a window aggregation function to just:

* count the "open" events
* count the "close" events and sum their duration

Additionally, I am (naïevely) hoping that if a window has no events for a
particular key, the memory/storage costs are zero for that key.

>From what I can see, to achieve what I am trying to do, I could use a
flatMap followed by a keyBy

In other words I take the events and flat map them based on the path split
on '/' returning a Tuple of the (to be) key and the event. Then I can use
keyBy to key based on the Tuple 0.

My ask:

Is the above design a good design? How would you achieve the end game
better? Do I need to worry about many paths that are accessed rarely and
would have an accumulator function that stays at 0 unless there are events
in that window... or are the accumulators for each distinct key eagerly
purged after each fire trigger.

What gotcha's do I need to look for.

Thanks in advance and appologies for the length

-stephenc