Re: Throttling/effective back-pressure on a Kafka sink

2019-05-30 Thread Derek VerLee

  
  
Was any progress ever made on this?  We have seen the same issue
  in the past.  What I do remember is, whatever I set max.block.ms
  to, is when the job crashes.
  I am going to attempt to reproduce the issue again and will report
  back.



On 3/28/19 3:27 PM, Konstantin Knauf
  wrote:


  
  
Hi Marc, 



the Kafka Producer should be able to create backpressure.
  Could you try to increase max.block.ms to Long.MAX_VALUE?



The exceptions you shared for the failure case don't look
  like the root causes of the problem. Could you share the full
  stacktraces or even full logs for this time frame. Feel free
  to send these logs to me directly, if you don't want to share
  them on the list.


Best, 



Konstantin








  On Thu, Mar 28, 2019 at 2:04
PM Marc Rooding 
wrote:
  
  

  Hi


We’ve got a job producing to a Kafka sink. The
  Kafka topics have a retention of 2 weeks. When doing a
  complete replay, it seems like Flink isn’t able to
  back-pressure or throttle the amount of messages going
  to Kafka, causing the following error:


org.apache.flink.streaming.connectors.kafka.FlinkKafkaException:
  Failed to send data to Kafka: Expiring 8396 record(s)
  for topic-1:12 ms has passed since batch creation


We’re running on Flink 1.7.2 with
  flink-connector-kafka:1.7.2. Our Kafka cluster is
  running version 2.1.1. The Kafka producer uses all
  default settings except from:


compression.type = snappy
max.in.flight.requests.per.connection = 1
acks = all
client.dns.lookup = use_all_dns_ips
  
  
I tried playing around with the buffer and batch
settings, increasing timeouts, but none seem to be what
we need. Increasing the delivery.timeout.ms and request.timeout.ms solves
the initial error, but causes the Flink job to fail
entirely due to:


Caused by:
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
  Could not forward element to next operator
Caused by: java.lang.RuntimeException: Buffer pool
  is destroyed.


My assumption is that the Kafka producer will start
  blocking since it notices that it can't handle the
  batches, and Flink eventually runs out of buffers for
  the operator.


What really baffles me is that the backpressure tab
  shows that everything is OK. The entire job pipeline
  (which reads from 4 different topics, unions them all
  and sinks towards 1 topic) pushes all the messages
  through to the sink stage, resulting in 18 million
  incoming stage messages, even though Kafka is in no
  way possible to keep up with this.


I searched for others facing the same issue but
  can't find anything similar. I'm hoping that someone
  here could guide me in the right direction.


Thanks in advance


  

  



-- 

  

  
  

Konstantin Knauf | Solutions Architect
+49 160 91394525
  
  
  
  Follow us @VervericaData
  --
  Join Flink Forward - The Apache Flink Conference
  Stream Processing | Event Driven | Real Time
  --
  Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
  --
  Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen    
  


  

  

  



local disk cleanup after crash

2019-03-07 Thread Derek VerLee

  
  
I think that effort is put in to have task managers clean up
  their folders, however I have noticed that in some cases local
  folders are not cleaned up and can build up, eventually causing
  problems due to a full disk.  As far as I know this only happens
  with crashes and other out-of-happy-path scenarios.

I am thinking of writing a script to clean up local folders that
  runs before task-manager starts between restarts in the case of a
  crash.
Assuming local recovery is not configured, what should I delete
  and what should I leave around?

What should I keep if local recovery is configured?


Under the "taskmanager.tmp.dirs" I see:
blobStore-*
  flink-dist-cache-*
  flink-io-*
  localState/*
  rocksdb-lib-*


Thanks

  



Re: Flink 1.7 jobmanager tries to lookup taskmanager by its hostname in k8s environment

2019-01-02 Thread Derek VerLee

  
  
I dealt with this issue by making the taskmanagers a statefulset.

By itself, this doesn't solve the issue, because the
  taskmanager's `hostname` will not be a resovable FQDN on its own,
  you need to append the rest of the FQDN for the statefulset's
  "serviceName" to make it resolvable.  I handle this by passing the
  fully qualified serviceName in as an environment variable  and
  using this to overwriting taskmanager.host in flink.conf in the
  containers entrypoint script.
It's a kludge, but it works. Using statefulsets brings along a
  lot of "baggage" that may be overkill for taskmanagers.  However
  it does have an unrelated benefit for jobs with large state, in
  that you can attach dedicated disks in the form of PVCs, rather
  than using up the host's root disk.



On 12/12/18 8:20 AM, Chesnay Schepler
  wrote:

This is
  a known issue, see
  https://issues.apache.org/jira/browse/FLINK-11127.
  
  
  I'm not aware of a workaround.
  
  
  On 12.12.2018 14:07, Sergei Poganshev wrote:
  
  When I to deploy Flink 1.7 job to
Kubernetes, the job itself runs, but upon visiting Flink UI I
can see no metrics and there are WARN messages in jobmanager's
log:


[flink-metrics-14] WARN akka.remote.ReliableDeliverySupervisor
flink-metrics-akka.remote.default-remote-dispatcher-3 -
Association with remote system
[akka.tcp://flink-metrics@adhoc-historical-taskmanager-d4b65dfd4-h5nrx:44491]
has failed, address is now gated for [50] ms. Reason:
[Association failed with
[akka.tcp://flink-metrics@adhoc-historical-taskmanager-d4b65dfd4-h5nrx:44491]]
Caused by: [adhoc-historical-taskmanager-d4b65dfd4-h5nrx: Name
or service not known]


Note: adhoc-historical-taskmanager-d4b65dfd4-h5nrx is a hostname
of a pod on which taskmanager is running.


So, jobmanager tries to resolve taskmanager's hostname (which
probably got to it from taskmanager itself) on a random port.
How can this be mitigated?



  
  

  



Re: Problem with metrics inside Kubernetes

2019-01-02 Thread Derek VerLee

  
  
See my reply I just posted to the thread "Flink 1.7 jobmanager
  tries to lookup taskmanager by its hostname in k8s environment".

On 1/2/19 11:19 AM, Steven Nelson
  wrote:


  
  

  I have been working with Flink under Kubernetes recently
and I have run into some problems with metrics. I think I
have it figured out though. It appears that it's trying to
use hostname resolution for the jobmanagers. This causes
this error:
  
  
  Association with remote system
[akka.tcp://flink@flink-taskmanager-7dffcf7975-vb2pc:42028]
has failed, address is now gated for [50] ms. Reason:
[Association failed with
[akka.tcp://flink@flink-taskmanager-7dffcf7975-vb2pc:42028]]
Caused by: [flink-taskmanager-7dffcf7975-vb2pc]
  
  
  
  I noticed that if I put hosts file entries on the
jobmanager for each of the task managers then everything
started working. Is there a way to specify the hostname of
taskmanager like you can with the jobmanager?
  
  
  -Steve

  

  



Re: long lived standalone job session cluster in kubernetes

2018-12-05 Thread Derek VerLee

  
  
Sounds good.
Is someone working on this automation today?
If not, although my time is tight, I may be able to work on a PR
  for getting us started down the path Kubernetes native cluster
  mode.



On 12/4/18 5:35 AM, Till Rohrmann
  wrote:


  
  
Hi Derek,
  
  
  what I would recommend to use is to trigger the cancel
with savepoint command [1]. This will create a savepoint and
terminate the job execution. Next you simply need to respawn
the job cluster which you provide with the savepoint to
resume from.
  
  
  [1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#cancel-job-with-savepoint
  
  
  Cheers,
  Till

  
  
  
On Tue, Dec 4, 2018 at 10:30 AM Andrey Zagrebin
  <and...@data-artisans.com>
  wrote:

Hi Derek,
  
  I think your automation steps look good. 
  Recreating deployments should not take long 
  and as you mention, this way you can avoid unpredictable
  old/new version collisions.
  
  Best,
  Andrey
  
  > On 4 Dec 2018, at 10:22, Dawid Wysakowicz <dwysakow...@apache.org> wrote:
  > 
  > Hi Derek,
  > 
  > I am not an expert in kubernetes, so I will cc Till, who
  should be able
  > to help you more.
  > 
  > As for the automation for similar process I would
  recommend having a
  > look at dA platform[1] which is built on top of
  kubernetes.
  > 
  > Best,
  > 
  > Dawid
  > 
  > [1] https://data-artisans.com/platform-overview
  > 
  > On 30/11/2018 02:10, Derek VerLee wrote:
  >> 
  >> I'm looking at the job cluster mode, it looks great
  and I and
  >> considering migrating our jobs off our "legacy"
  session cluster and
  >> into Kubernetes.
  >> 
  >> I do need to ask some questions because I haven't
  found a lot of
  >> details in the documentation about how it works yet,
  and I gave up
  >> following the the DI around in the code after a
  while.
  >> 
  >> Let's say I have a deployment for the job "leader" in
  HA with ZK, and
  >> another deployment for the taskmanagers.
  >> 
  >> I want to upgrade the code or configuration and start
  from a
  >> savepoint, in an automated way.
  >> 
  >> Best I can figure, I can not just update the
  deployment resources in
  >> kubernetes and allow the containers to restart in an
  arbitrary order.
  >> 
  >> Instead, I expect sequencing is important, something
  along the lines
  >> of this:
  >> 
  >> 1. issue savepoint command on leader
  >> 2. wait for savepoint
  >> 3. destroy all leader and taskmanager containers
  >> 4. deploy new leader, with savepoint url
  >> 5. deploy new taskmanagers
  >> 
  >> 
  >> For example, I imagine old taskmanagers (with an old
  version of my
  >> job) attaching to the new leader and causing a
  problem.
  >> 
  >> Does that sound right, or am I overthinking it?
  >> 
  >> If not, has anyone tried implementing any automation
  for this yet?
  >> 
  > 
  

  

  



long lived standalone job session cluster in kubernetes

2018-11-29 Thread Derek VerLee

  
  
I'm looking at the job cluster mode, it looks great and I and
  considering migrating our jobs off our "legacy" session cluster
  and into Kubernetes.

I do need to ask some questions because I haven't found a lot of
  details in the documentation about how it works yet, and I gave up
  following the the DI around in the code after a while.

Let's say I have a deployment for the job "leader" in HA with ZK,
  and another deployment for the taskmanagers.
I want to upgrade the code or configuration and start from a
  savepoint, in an automated way.

Best I can figure, I can not just update the deployment resources
  in kubernetes and allow the containers to restart in an arbitrary
  order.
Instead, I expect sequencing is important, something along the
  lines of this:
1. issue savepoint command on leader
  2. wait for savepoint
  3. destroy all leader and taskmanager containers
  4. deploy new leader, with savepoint url
  5. deploy new taskmanagers



 For example, I imagine old taskmanagers (with an old version of
  my job) attaching to the new leader and causing a problem.
Does that sound right, or am I overthinking it? 

If not, has anyone tried implementing any automation for this
  yet?
  



strange behavior with jobmanager.rpc.address on standalone HA cluster

2018-05-05 Thread Derek VerLee

  
  
Two things:

1. It would be beneficial I think to drop a line somewhere in the
  docs (probably on the production ready checklist as well as the HA
  page) explaining that enabling zookeeper "highavailability" allows
  for your jobs to restart automatically after a jobmanager crash or
  restart.  We had spent some cycles trying to implement job
  restarting and watchdogs (poorly) when I discoverd this from a
  flink forward presentation on youtube.

2. I seem to have found some odd behavior with HA and then found
  something that works, but I can't explain why.  The clifnotes
  version is that I took an existing standalone cluster with a
  single JM and modified with high availability zookeeper mode.  The
  same flink-conf.yaml file is used on all nodes (including JM).
  This seemed to work fine, I restarted the JM (jm0) and the jobs
  relaunched when it came back.  Easy!  Then I deployed a second JM
  (jm1).  Once I modified `masters`, set the HA rpc port range and
  opened those ports on the firewall for both jobmanagers, but left
  `jobmanager.rpc.address` the original value, `jm0` on all nodes. 
  I then observed that jm0 worked fine, taskmanagers connected to it
  and jobs ran.  jm1 did not 301 me to jm0 however, it displayed a
  dashboard (no jobs, no tm).  When I stopped jm0, the jobs show up
  on jm1 as RESTARTING, but the taskmanagers never attach to jm1.  
  In the logs, all nodes, including jm1, had messages about trying
  to reach jm0.  From the documentation and various comments I've
  seen, `jobmanager.rpc.address` should be ignored.  However,
  commenting it out entirely lead to jobmanagers crashing at boot,
  setting to `localhost` caused all the taskmanagers to log messages
  about trying to connect to the jobmanager at localhost.  What
  finally worked was to set the value to the hostname where the
  flink-conf.yaml was individually, even on the taskmanagers.  

Does this seem like a bug?
Just a hunch, but is there something called an "akka leader" that
  is different from the jobmanager leader, and could it be somehow
  defaulting its value over to jobmanager.rpc.address?


  



Re: intentional back-pressure (or a poor man's side-input)

2018-05-03 Thread Derek VerLee

  
  
Thanks for the thoughts Piotr.
Seems I have a talent for asking (nearly) the same question as
  someone else at the same time, and the check-pointing was raised
  in that thread as well.
I guess one way to conceptualize it is that you have is a stream
  job that has "phases" and transitions between those phases.  Maybe
  there would be a new type of barrier to indicate a change between
  phases?  But now I'm way outside the bounds of hoping to have a
  "quick and dirty" version of a proper side input implementation.
I'm chewing on two new ideas now:  Using a "union" stream instead
  of two streams, and custom source backed by two different sources
  under the hood, so the "state machine" logic transitioning from
  initialization to normal operation all happen in the same
  operator.  Or, running a batch or "bounded stream" job first to
  generate a "cache state", and then launching the main streaming
  job, which loads this initial state load in open()... not sure how
  to work out the keying.
I'll post back if I get anywhere with these ideas.



On 5/3/18 10:49 AM, Piotr Nowojski
  wrote:


  Maybe it could work with Flink’s 1.5 credit base flow control. But you would need a way to express state “block one input side of the CoProcessFunction”, pass this information up to the input gate and handle it probably similar to how `org.apache.flink.streaming.runtime.io.CachedBufferBlocker` blocks inputs in case of checkpoint barrier. You can not just block inside `processElement1` method.

However I haven’t thought it through and maybe there could be some issues regarding checkpointing (what should happen to checkpoint barriers if you are blocking one side of the input? Should this block checkpoint barrier as well? Should you cancel checkpoint?).

Piotrek


  
On 2 May 2018, at 16:31, Derek VerLee <derekver...@gmail.com> wrote:


I was just thinking about about letting a coprocessfunction "block" or cause back pressure on one of it's streams?
Has this been discussed as an option?
Does anyone know a way to effectively accomplish this?

I think I could get a lot of mileage out of something like that without needing a full implementation of FLIP-17 (which I would eagerly await still). 

As mentioned on another thread, one could use a liststate to buffer the main input until the "side input" was sufficiently processed.  However the downside of this is that I have no way to control the size of those buffers, whereas with backpressure, the system will naturally take care of it.

  
  



  



Re: Migration to Flip6 Kubernetes

2018-05-02 Thread Derek VerLee

  
  
Is anyone actively working on direct Kubernetes support?
I'd be excited to see this get in sooner rather than later, I'd
  be happy to start a PR.


On 3/22/18 10:37 AM, Till Rohrmann
  wrote:


  Hi Edward and Eron,


you're right that there is currently no
  JobClusterEntrypoint implementation for Kubernetes. How this
  entrypoint looks like mostly depends on how the job is stored
  and retrieved. There are multiple ways conceivable:


- The entrypoint connects to an external system from which
  it fetches the JobGraph
- The entrypoint contains the serialized JobGraph similar
  to how the YarnJobClusterEntrypoint works, but this would mean
  that you have a separate image per job
- The entrypoint actually executes a user jar which
  generates the JobGraph similar to what happens on the client
  when you submit a job


I'm not a Kubernetes expert and therefore I don't know
  what's the most idiomatic approach to it. But once we have
  figured this out, it should not be too difficult to write the
  Kubernetes JobClusterEntrypoint.


If we say that Kubernetes is responsible for assigning new
  resources, then we need a special KubernetesResourceManager
  which automatically assigns all registered slots to the single
  JobMaster. This JobMaster would then accept all slots and
  scale the job to how many slots it got offered. That way we
  could easily let K8 control the resources.


If there is a way to communicate with K8 from within Flink,
  then we could also implement a mode which is similar to
  Flink's Yarn integration. The K8RM would then ask for new pods
  to be started if the JM needs more slots.


The per-job mode on K8 won't unfortunately make it into
  Flink 1.5. But I'm confident that the community will address
  this issue with Flink 1.6.


Cheers,
Till


  
  
On Wed, Mar 21, 2018 at 4:08 PM, Eron
  Wright 
  wrote:
  
It would be helpful to expand on how, in job
  mode, the job graph would be produced.  The phrase 'which
  contains the single job you want to execute' has a few
  meanings; I believe Till means a serialized job graph, not
  an executable JAR w/ main method.  Till is that correct?

  

  On Tue, Mar 20, 2018 at 2:16
AM, Till Rohrmann 
wrote:

  Hi Edward,


you're right that Flink's Kubernetes
  documentation has not been updated with
  respect to Flip-6. This will be one of the
  tasks during the Flink 1.5 release testing and
  is still pending.


A Flink cluster can be run in two modes:
  session mode vs per-job mode. The former
  starts a cluster to which you can submit
  multiple jobs. The cluster shares the same
  ResourceManager and a Dispatcher which is
  responsible for spawning JobMasters which
  execute a single job each. The latter starts a
  Flink cluster which is pre-initialized with a
  JobGraph and only runs this job. Here we also
  start a ResourceManager and a MiniDispatcher
  whose job it is to simply start a single
  JobMaster with the pre-initialized JobGraph.


StandaloneSessionClusterEntrypoint is
  the entrypoint for the session mode.


The JobClusterEntrypoint is the entrypoint
  for the per-job mode. Take a look at
  YarnJobClusterEntrypoint to see how the
  entrypoint retrieves the JobGraph from HDFS
  and then automatically starts executing it.
  There is no script which directly starts this
  entrypoint, but the YarnClusterDescriptor uses
  it when `deployJobCluster` is called.


   

intentional back-pressure (or a poor man's side-input)

2018-05-02 Thread Derek VerLee

  
  

I was just thinking about about letting a coprocessfunction "block"
or cause back pressure on one of it's streams?
Has this been discussed as an option?
Does anyone know a way to effectively accomplish this?

I think I could get a lot of mileage out of something like that
without needing a full implementation of FLIP-17 (which I would
eagerly await still). 

As mentioned on another thread, one could use a liststate to buffer
the main input until the "side input" was sufficiently processed. 
However the downside of this is that I have no way to control the
size of those buffers, whereas with backpressure, the system will
naturally take care of it.
  



Clarification on slots and efficiency

2018-04-11 Thread Derek VerLee

  
  


From the docs (
https://ci.apache.org/projects/flink/flink-docs-master/concepts/runtime.html
  )


  By adjusting the number of task slots, users can define how
subtasks are isolated from each other. Having one slot per
TaskManager means each task group runs in a separate JVM (which
can be started in a
separate container, for example). Having multiple slots
means more subtasks share the same JVM. Tasks in the same JVM
share TCP connections (via multiplexing) and
heartbeat messages. They may also share data sets and data
structures, thus reducing the per-task overhead.

Does this mean that if the same task and job is running in two
  slots on the same task-manager, that messages that happen to move
  between these slots will do so more efficiently, and avoid
  serialization overhead?



  



substantial realistic and idiomatic example applications

2017-12-12 Thread Derek VerLee

  
  


We are new to working with Flink and now that we have some basics
  down, we are looking for some codebases for Flink applications of
  real-world complexity and size, that could additionally be
  considered idiomatic and generally good code.

Can anyone recommend such a codebase?

Thanks,
_derek

  



Re: Streaming : a way to "key by partition id" without redispatching data

2017-11-10 Thread Derek VerLee

  
  
I was about to ask this question myself.  I find myself re-keying
  by the same keys repeatedly.  I think in principle you could
  always just roll more work into one window operation with a more
  complex series of maps/folds/windowfunctions or processfunction. 
  However this doesn't always feel the most clean or convenient, or
  composible.  It would be great if there was a way to just express
  that you want to keep the same partitions as the last window, or
  that the new key is 1-to-1 with the previous one.  Even more
  generally, if the new key is "based" off the old key in a way that
  is one to one or one to many, in either case it may not be
  necessary to send data over the wire, although in the later case,
  there is a risk of hot-spotting , I suppose.

On 11/10/17 12:01 PM, Gwenhael
  Pasquiers wrote:


  
  
  
  
I think I finally found
a way to “simulate” a Timer thanks to the the
processWatermark function of the AbstractStreamOperator.
 
Sorry for the monologue.
 

  
From: Gwenhael
Pasquiers [mailto:gwenhael.pasqui...@ericsson.com]

Sent: vendredi 10 novembre 2017 16:02
To: 'user@flink.apache.org'

Subject: RE: Streaming : a way to "key by
partition id" without redispatching data
  

 
Hello,
 
Finally, even after
creating my operator, I still get the error : “Timers can
only be used on keyed operators”.
 
Isn’t there any way
around this ? A way to “key” my stream without shuffling the
data ?
 

  
From: Gwenhael
Pasquiers

Sent: vendredi 10 novembre 2017 11:42
To: Gwenhael Pasquiers ;
'user@flink.apache.org' 
Subject: RE: Streaming : a way to "key by
partition id" without redispatching data
  

 
Maybe you don’t need to
bother with that question.
 
I’m currently
discovering AbstractStreamOperator, OneInputStreamOperator
and Triggerable.
 
That should do it :-)
 

  
From: Gwenhael
Pasquiers [mailto:gwenhael.pasqui...@ericsson.com]

Sent: jeudi 9 novembre 2017 18:00
To: 'user@flink.apache.org' 
Subject: Streaming : a way to "key by partition
id" without redispatching data
  

 
Hello,
 
(Flink 1.2.1)
 
For performances reasons
I’m trying to reduce the volume of data of my stream as soon
as possible by windowing/folding it for 15 minutes before
continuing to the rest of the chain that contains keyBys and
windows that will transfer data everywhere.
 
Because of the huge
volume of data, I want to avoid “moving” the data between
partitions as much as possible (not like a naïve KeyBy
does). I wanted to create a custom ProcessFunction (using
timer and state to fold data for X minutes) in order to fold
my data over itself before keying the stream but even
ProcessFunction needs a keyed stream…
 
Is there a specific
“key” value that would ensure me that my data won’t be moved
to another taskmanager (that it’s hashcode will match the
partition it is already in) ? I thought about the subtask id
but I doubt I’d be that lucky :-) 
 
Suggestions

  ·
  Wouldn’t
it be useful to be able to do a “partitionnedKeyBy” that
would not move data between nodes, for windowing operations
that can be parallelized.

  o  
  Something
like kafka => partitionnedKeyBy(0) => first folding
=> keyBy(0) => second folding => ….

  ·
  Finally,
aren’t all streams keyed ? Even if they’re keyed by a
totally arbitrary partition id until the user chooses its
own key, shouldn’t we be able to do a window (not windowAll)
or process over any normal Stream’s partition ?
 
B.R.
 
Gwenhaël PASQUIERS
  


  



Re: Generate watermarks per key in a KeyedStream

2017-11-09 Thread Derek VerLee

  
  
We are contending with the same issue, as it happens.  We have
  dozens, and potentially down the line, may need to deal with
  thousands of different "time systems" as you put it, and may not
  be know at compile time or job start time.  In a practical sense,
  how could such a system be composed?  


On 11/9/17 5:52 AM, Shailesh Jain
  wrote:


  
Thanks for your reply, Xingcan.


  

  On Wed, Nov 8, 2017 at 10:42 PM,
Xingcan Cui 
wrote:

  
Hi
  Shailesh,


actually,
  the watermarks are generated per partition, but
  all of them will be forcibly aligned to the
  minimum one during processing. That is decided by
  the semantics of watermark and KeyedStream, i.e.,
  the watermarks belong to a whole stream and a
  stream is made up of different partitions (one per
  key).


If
  the physical devices work in different time
  systems due to delay, the event streams from them
  should be treated separately.


Hope
  that helps.


Best,
Xingcan
  
  

  
On Wed, Nov 8, 2017 at
  11:48 PM, Shailesh Jain 
  wrote:
  

  

  
Hi,
  

I'm working on implementing a use
case wherein different physical
devices are sending events, and due
to network/power issues, there can
be a delay in receiving events at
Flink source. One of the operators
within the flink job is the Pattern
operator, and there are certain
patterns which are time sensitive,
so I'm using Event time
characteristic. But the problem
comes when there are unpredictable
delays in events from a particular
device(s), which causes those events
to be dropped (as I cannot really
define a static bound to allow for
lateness).

  
  Since I'm using a KeyedStream, keyed
  on the source device ID, is there a
  way to allow each CEP operator
  instance (one per key) to progress its
  time based on the event time in the
  corresponding stream partition. Or in
  other words, is there a way to
  generate watermarks per partition in a
  KeyedStream?
  

Thanks,
  
  Shailesh

  


  

  

  
  

  

  


  



Re: Do timestamps and watermarks exist after window evaluation?

2017-11-09 Thread Derek VerLee

  
  
This new documentation seems to answer my question directly. 
  It's good to know my intuitions where not wildly off.  Also thank
  you for continuing to improve the already good documentation.
Funny enough, some of the other questions I have, where also
  asked by other users in the last couple days, so I'll just reply
  on those threads if necessary.
Thanks!


On 11/9/17 9:32 AM, Aljoscha Krettek
  wrote:


  
  Hi,
  
  
  This new section in the windowing documentation will
help answer your question: https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/windows.html#working-with-window-results
  
  
  Please let me know if you have any further
questions. :-)
  
  
  Best,
  Aljoscha

  
On 8. Nov 2017, at 18:54, Derek VerLee <derekver...@gmail.com>
  wrote:


  When composing ("chaining") multiple
windowing operations on the same
stream are watermarks transmitted down stream after
window evaluation,
and are the records emitted from WindowFunctions given
timestamps? Do I
need to or should I always assignTimestampsAndWatermarks
to the
outputsof window evaluations if I want to window again?
If automatically
assigned, how should I think about them in an event time
context? Would
the event time of a record resulting from a
WindowFunction be the
window's end time in the case of a TimeWindow?

  

  


  


  



Do timestamps and watermarks exist after window evaluation?

2017-11-08 Thread Derek VerLee
When composing ("chaining") multiple windowing operations on the same
stream are watermarks transmitted down stream after window evaluation,
and are the records emitted from WindowFunctions given timestamps? Do I
need to or should I always assignTimestampsAndWatermarks to the
outputsof window evaluations if I want to window again? If automatically
assigned, how should I think about them in an event time context? Would
the event time of a record resulting from a WindowFunction be the
window's end time in the case of a TimeWindow?



RocksDB usage for broad slow data

2017-10-14 Thread Derek VerLee

  
  


We have a data which is broad and slow; hundreds of thousands of
  keys, a small number will get an event every few seconds, some get
  an event every few days, and the vast majority will get an event
  in a few times an hour.  Let's say then that keeping this data in
  heap for the last couple days is not a challenge.  However, one
  additional challenge is that we can receive late events or
  corrective data, going back indefinitely, and while infrequent, we
  need to be able to handle this gracefully.  Lets say that the
  total data-set grows too large to keep in memory economically.
One approach of course is a "lambda" type, where sufficiently
  late events are noted to a side channel, perhaps triggering some
  batch job to be scheduled.

However I'm pondering a simpler solution,   I understand that
  with the RocksDB, the state size can exceed the heap.  Would it be
  a plausible approach in this situation to never purge windows,
  keeping computation state back to "the beginning", so that an
  arbitrarily old window (years, potentially), could re-emit a
  corrected value?
Thanks
_Derek

  



Re: Enriching data from external source with cache

2017-10-02 Thread Derek VerLee

  
  
Thanks Timo, watching the video now.
I did try out the method with iteration in a simple prototype and
  it works.  But you are right, combining it with the other
  requirements into a single process function has so far resulted in
  more complexity than I'd like, and it's always nice to leave
  something easily understood later.  

On the contribution, I was wondering there was some scary with async
and keyed state going on that prevented this from having happened
already.  I'll have a look and see if I can find where the current
non keyed implementation logic resides in the project.

Thanks

On 10/2/17 6:07 AM, Timo Walther wrote:

Hi
  Derek,
  
  
  maybe the following talk can inspire you, how to do this with
  joins and async IO: https://www.youtube.com/watch?v=Do7C4UJyWCM
  (around the 17th min). Basically, you split the stream and wait
  for an Async IO result in a downstream operator.
  
  
  But I think having a transient guava cache is not a bad idea,
  since it is only a cache it does not need to be checkpointed and
  can be recovered at any time.
  
  
  Implementing you own logic in a ProcessFunction is always a way,
  but might require more implementation effort.
  
  
  Btw. if you feel brave enough, you could also think of
  contributing a stateful async IO. It should not be too much effort
  to make this work.
  
  
  Regards,
  
  Timo
  
  
  
  
  Am 9/29/17 um 8:39 PM schrieb Derek VerLee:
  
  My basic problem will sound familiar I
think, I need to enrich incoming data using a REST call to an
external system for slowly evolving metadata. and some cache
based lag is acceptable, so to reduce load on the external
system and to process more efficiently, I would like to
implement a cache.  The cache would by key, and I am already
doing a keyBy for the same key in the job.


Please correct me if I'm wrong:

* Keyed State would be great to store my metadata "cache", Async
I/O is ideal for pulling from the external system,

but AsyncFunction can not access keyed state ( "Exception: State
is not supported in rich async functions.") and operators can
not share state between them.


This leaves me wondering, since side inputs are not here yet,
what the best (and perhaps most idiomatic) way to approach my
problem?


I'd rather keep changes to existing systems minimal for this
iteration and just minimize impact on them during peaks best I
can... systemic refactoring and re-architecture will be coming
soon (so I'm happy to hear thoughts on that as well).


Approaches considered:


1. AsyncFunction with a transient guava cache.  Not ideal ...
but maybe good enough to get by

2. Using compound message types (oh, if only java had real
algebraic data types...) and send cache miss messages from some
CacheEnrichmentMapper (keyed) to some AsyncCacheLoader (not
keyed) which then backfeeds cache updates to the former via
iteration ... i don't know why this couldn't work but it feels
like a hot mess unless there is some way I am not thinking of to
do it cleanly

3. One user mentioned on a similar thread loading the data in as
another DataStream and then using joins, but I'm confused about
how this would work, it seems to me that joins happen on
windows, windows pertain to (some notion of) time, what would be
my notion of time for the slow (maybe years old in some cases)
meta-data?

4. Forget about async I/O

5. implement my own "async i/o" in using a process function or
similar  .. is this a valid pattern

  
  
  


  



Enriching data from external source with cache

2017-09-29 Thread Derek VerLee

  
  
My basic problem will sound familiar I think, I need to enrich
incoming data using a REST call to an external system for slowly
evolving metadata. and some cache based lag is acceptable, so to
reduce load on the external system and to process more efficiently,
I would like to implement a cache.  The cache would by key, and I am
already doing a keyBy for the same key in the job.

Please correct me if I'm wrong:
* Keyed State would be great to store my metadata "cache", Async I/O
is ideal for pulling from the external system, 
but AsyncFunction can not access keyed state ( "Exception: State is
not supported in rich async functions.") and operators can not share
state between them.

This leaves me wondering, since side inputs are not here yet, what
the best (and perhaps most idiomatic) way to approach my problem?

I'd rather keep changes to existing systems minimal for this
iteration and just minimize impact on them during peaks best I
can... systemic refactoring and re-architecture will be coming soon
(so I'm happy to hear thoughts on that as well).

Approaches considered:

1. AsyncFunction with a transient guava cache.  Not ideal ... but
maybe good enough to get by
2. Using compound message types (oh, if only java had real algebraic
data types...) and send cache miss messages from some
CacheEnrichmentMapper (keyed) to some AsyncCacheLoader (not keyed)
which then backfeeds cache updates to the former via iteration ... i
don't know why this couldn't work but it feels like a hot mess
unless there is some way I am not thinking of to do it cleanly
3. One user mentioned on a similar thread loading the data in as
another DataStream and then using joins, but I'm confused about how
this would work, it seems to me that joins happen on windows,
windows pertain to (some notion of) time, what would be my notion of
time for the slow (maybe years old in some cases) meta-data?
4. Forget about async I/O
5. implement my own "async i/o" in using a process function or
similar  .. is this a valid pattern