Re: flink kubernetes flink autoscale behavior

2024-06-24 Thread Rion Williams
Hi Eric,I believe you might be referring to use of the adaptive scheduler which should support these “in-place” scaling operations via:jobmanager.scheduler: adaptiveYou can see the documentation for Elastic Scaling here for additional details and configuration.On Jun 24, 2024, at 11:56 PM, Enric Ott <243816...@qq.com> wrote:Hello,Community:  I’ve recently started using the Flink Kubernetes Operator,and I'd like to know if CPU and Job Parallelism autoscaling are supported without restarting the whole job,if it’s supported, please tell me how to configure and deploy it.    Thanks.

Re: Using Custom JSON Formatting with Flink Operator

2024-02-22 Thread Rion Williams
Hi again Dominik,

So I was able to verify that this particular layout was being copied into
the image within the Dockerfile (specifically into
/flink/lib/log4j-layout-template-json-2.17.1.jar). Typically we've copied
over the actual jar that was built in the image to the appropriate volume
for the FlinkDeployment so that I can point to a clean location as seen
below as an initContainer for the deployment:

podTemplate:
  apiVersion: v1
  kind: Pod
  metadata:
name: task-manager-pod-template
  spec:
initContainers:
  - name: copy-job-contents
image: {{ $.Values.flinkImage }}
volumeMounts:
  - mountPath: /opt/flink/jars
name: jars
command: ["/bin/sh", "-c"]
args:
- cp /opt/flink-vvp-image/my-jar.jar /opt/flink/jars/my-jar.jar

I've tried to adjust the initContainer to copy over the jar to the lib
directory in a similar manner to be available for the operator, but I'm
still running into the same error that was previously seen via something
like:

- cp $myImage/flink/lib/log4j-layout-template-json-2.17.1.jar
$flinkImage/flink/lib/log4j-layout-template.json-2.17.1.jar

However I feel like in this case I'd need to share the lib directory for
the image running the job to copy it over from the original image. Does
that make sense? Is there a better way to handle this in this scenario?

On Thu, Feb 22, 2024 at 6:31 AM Rion Williams  wrote:

> Correct! Building a custom image for the deployment and then copying over
> the jar to a specific directory for the FlinkDeployment to use (as the
> image contains the legacy Flink jobs/jars as well as those newer ones for
> the operator).
>
> On Feb 22, 2024, at 6:18 AM, dominik.buen...@swisscom.com wrote:
>
> 
>
> Hi Rion
>
>
>
> I guess you’re building your own docker image for the deployment right?
>
>
>
> For switching to Logback I’m doing the following command (sbt-docker) when
> building the image.
>
>
>
> val eclasspath = (*Compile */ *externalDependencyClasspath*).value
> val logbackClassicJar = eclasspath.files.find(file =>
> file.getName.contains("logback-classic"))
> logbackClassicJar.foreach(logback => add(logback, «opt/flink/lib»))
>
>
>
> Given the error message that you provided I think the dependency is
> missing in the lib folder (don’t confuse this with the usrlib folder).
>
>
>
> Kind Regards
>
> Dominik
>
> *From: *Rion Williams 
> *Date: *Thursday, 22 February 2024 at 13:09
> *To: *Bünzli Dominik, INI-DNA-INF 
> *Cc: *user@flink.apache.org 
> *Subject: *Re: Using Custom JSON Formatting with Flink Operator
>
> *Be aware:* This is an external email.
>
>
>
> Hi Dominick,
>
>
>
> In this case the jobs are running using application-mode. All of these
> were previously working as expected for the legacy jobs using the same
> configuration (however those were running via Ververica Platform and
> targeting Flink 1.15.2). I had somewhat expected similar behaviors but it
> seems there’s something that is missing.
>
>
>
> Thanks,
>
>
>
> Rion
>
>
>
> On Feb 22, 2024, at 1:15 AM, dominik.buen...@swisscom.com wrote:
>
> 
>
> Good morning Rion,
>
>
>
> Are you in session job mode or application mode? I’ve had some similar
> issues (logback) lately and it turned out that I also needed to add the
> additional dependencies (I guess JsonTemplateLayout is one of them) to
> the lib folder of the deployment.
>
>
>
> Kind regards
>
> Dominik
>
>
>
> *From: *Rion Williams 
> *Date: *Thursday, 22 February 2024 at 00:46
> *To: *Flink User List 
> *Subject: *Using Custom JSON Formatting with Flink Operator
>
>
> Be aware: This is an external email.
>
>
>
> Hey Flinkers,
>
> Recently I’ve been in the process of migrating a series of older Flink
> jobs to use the official operator and have run into a snag on the logging
> front.
>
> I’ve attempted to use the following configuration for the job:
>
> ```
> logConfiguration:
>   log4j-console.properties: |+
> rootLogger.level = INFO
> rootLogger.appenderRef.console.ref = ConsoleAppender
> rootLogger.appenderRef.rolling.ref = RollingFileAppender
> ...
> appender.console.name = ConsoleAppender
> appender.console.type = Console
> appender.console.layout.type = JsonTemplateLayout
> appender.console.layout.eventTemplateUri = classpath:GcpLayout.json
> ```
>
> However once the job begins running, I’m met with the following errors in
> the logs:
>
> ```
> ERROR Unable to locate plugin type for JsonTemplateLayout
> ERROR Unable to locate plugin for JsonTemplateLayout
> ERROR Could not create plugin of type class
> 

Re: Using Custom JSON Formatting with Flink Operator

2024-02-22 Thread Rion Williams
Correct! Building a custom image for the deployment and then copying over the 
jar to a specific directory for the FlinkDeployment to use (as the image 
contains the legacy Flink jobs/jars as well as those newer ones for the 
operator).

> On Feb 22, 2024, at 6:18 AM, dominik.buen...@swisscom.com wrote:
> 
> 
> Hi Rion
>  
> I guess you’re building your own docker image for the deployment right?
>  
> For switching to Logback I’m doing the following command (sbt-docker) when 
> building the image.
>  
> val eclasspath = (Compile / externalDependencyClasspath).value
> val logbackClassicJar = eclasspath.files.find(file => 
> file.getName.contains("logback-classic"))
> logbackClassicJar.foreach(logback => add(logback, «opt/flink/lib»))
>  
> Given the error message that you provided I think the dependency is missing 
> in the lib folder (don’t confuse this with the usrlib folder).
>  
> Kind Regards
> Dominik
> From: Rion Williams 
> Date: Thursday, 22 February 2024 at 13:09
> To: Bünzli Dominik, INI-DNA-INF 
> Cc: user@flink.apache.org 
> Subject: Re: Using Custom JSON Formatting with Flink Operator
> 
> Be aware: This is an external email.
>  
> Hi Dominick,
>  
> In this case the jobs are running using application-mode. All of these were 
> previously working as expected for the legacy jobs using the same 
> configuration (however those were running via Ververica Platform and 
> targeting Flink 1.15.2). I had somewhat expected similar behaviors but it 
> seems there’s something that is missing.
>  
> Thanks,
>  
> Rion
> 
> 
> On Feb 22, 2024, at 1:15 AM, dominik.buen...@swisscom.com wrote:
> 
> 
> Good morning Rion,
>  
> Are you in session job mode or application mode? I’ve had some similar issues 
> (logback) lately and it turned out that I also needed to add the additional 
> dependencies (I guess JsonTemplateLayout is one of them) to the lib folder of 
> the deployment.
>  
> Kind regards
> Dominik
>  
> From: Rion Williams 
> Date: Thursday, 22 February 2024 at 00:46
> To: Flink User List 
> Subject: Using Custom JSON Formatting with Flink Operator
> 
> 
> Be aware: This is an external email.
> 
> 
> 
> Hey Flinkers,
> 
> Recently I’ve been in the process of migrating a series of older Flink jobs 
> to use the official operator and have run into a snag on the logging front.
> 
> I’ve attempted to use the following configuration for the job:
> 
> ```
> logConfiguration:
>   log4j-console.properties: |+
> rootLogger.level = INFO
> rootLogger.appenderRef.console.ref = ConsoleAppender
> rootLogger.appenderRef.rolling.ref = RollingFileAppender
> ...
> appender.console.name = ConsoleAppender
> appender.console.type = Console
> appender.console.layout.type = JsonTemplateLayout
> appender.console.layout.eventTemplateUri = classpath:GcpLayout.json
> ```
> 
> However once the job begins running, I’m met with the following errors in the 
> logs:
> 
> ```
> ERROR Unable to locate plugin type for JsonTemplateLayout
> ERROR Unable to locate plugin for JsonTemplateLayout
> ERROR Could not create plugin of type class 
> org.apache.logging.log4j.core.appender.ConsoleAppender for element Console: 
> java.lang.NullPointerException java.lang.NullPointerException
> ```
> 
> I believe that all of the appropriate references are correct in the actual 
> shaded jar itself as I can see things like the JsonTemplateLayout inside of 
> it (under org.apache.logging.log4j.template.json.JsonTemplateLayout ) as well 
> as the GcpLayout that I’m targeting in the root of the shaded jar as well 
> (including trying several adjustments to shade exceptions, adding a log4j 
> specific shade transformer, etc.)
> 
> I’ve tried adjusting several different knobs/configurations but I’m still 
> continually getting this same error. I’d be happy to share any additional 
> configuration for the job any/or the FlinkDeployment where applicable.
> 
> Just a bit stumped here on something that feels like it should just work.


Re: Using Custom JSON Formatting with Flink Operator

2024-02-22 Thread Rion Williams
Hi Dominick,

In this case the jobs are running using application-mode. All of these were 
previously working as expected for the legacy jobs using the same configuration 
(however those were running via Ververica Platform and targeting Flink 1.15.2). 
I had somewhat expected similar behaviors but it seems there’s something that 
is missing.

Thanks,

Rion

> On Feb 22, 2024, at 1:15 AM, dominik.buen...@swisscom.com wrote:
> 
> 
> Good morning Rion,
>  
> Are you in session job mode or application mode? I’ve had some similar issues 
> (logback) lately and it turned out that I also needed to add the additional 
> dependencies (I guess JsonTemplateLayout is one of them) to the lib folder of 
> the deployment.
>  
> Kind regards
> Dominik
>  
> From: Rion Williams 
> Date: Thursday, 22 February 2024 at 00:46
> To: Flink User List 
> Subject: Using Custom JSON Formatting with Flink Operator
> 
> 
> Be aware: This is an external email.
> 
> 
> 
> Hey Flinkers,
> 
> Recently I’ve been in the process of migrating a series of older Flink jobs 
> to use the official operator and have run into a snag on the logging front.
> 
> I’ve attempted to use the following configuration for the job:
> 
> ```
> logConfiguration:
>   log4j-console.properties: |+
> rootLogger.level = INFO
> rootLogger.appenderRef.console.ref = ConsoleAppender
> rootLogger.appenderRef.rolling.ref = RollingFileAppender
> ...
> appender.console.name = ConsoleAppender
> appender.console.type = Console
> appender.console.layout.type = JsonTemplateLayout
> appender.console.layout.eventTemplateUri = classpath:GcpLayout.json
> ```
> 
> However once the job begins running, I’m met with the following errors in the 
> logs:
> 
> ```
> ERROR Unable to locate plugin type for JsonTemplateLayout
> ERROR Unable to locate plugin for JsonTemplateLayout
> ERROR Could not create plugin of type class 
> org.apache.logging.log4j.core.appender.ConsoleAppender for element Console: 
> java.lang.NullPointerException java.lang.NullPointerException
> ```
> 
> I believe that all of the appropriate references are correct in the actual 
> shaded jar itself as I can see things like the JsonTemplateLayout inside of 
> it (under org.apache.logging.log4j.template.json.JsonTemplateLayout ) as well 
> as the GcpLayout that I’m targeting in the root of the shaded jar as well 
> (including trying several adjustments to shade exceptions, adding a log4j 
> specific shade transformer, etc.)
> 
> I’ve tried adjusting several different knobs/configurations but I’m still 
> continually getting this same error. I’d be happy to share any additional 
> configuration for the job any/or the FlinkDeployment where applicable.
> 
> Just a bit stumped here on something that feels like it should just work.


Using Custom JSON Formatting with Flink Operator

2024-02-21 Thread Rion Williams
Hey Flinkers,

Recently I’ve been in the process of migrating a series of older Flink jobs to 
use the official operator and have run into a snag on the logging front.

I’ve attempted to use the following configuration for the job:

```
logConfiguration:
  log4j-console.properties: |+
rootLogger.level = INFO
rootLogger.appenderRef.console.ref = ConsoleAppender
rootLogger.appenderRef.rolling.ref = RollingFileAppender
...
appender.console.name = ConsoleAppender
appender.console.type = Console
appender.console.layout.type = JsonTemplateLayout
appender.console.layout.eventTemplateUri = classpath:GcpLayout.json
```

However once the job begins running, I’m met with the following errors in the 
logs:

```
ERROR Unable to locate plugin type for JsonTemplateLayout
ERROR Unable to locate plugin for JsonTemplateLayout
ERROR Could not create plugin of type class 
org.apache.logging.log4j.core.appender.ConsoleAppender for element Console: 
java.lang.NullPointerException java.lang.NullPointerException
```

I believe that all of the appropriate references are correct in the actual 
shaded jar itself as I can see things like the JsonTemplateLayout inside of it 
(under org.apache.logging.log4j.template.json.JsonTemplateLayout ) as well as 
the GcpLayout that I’m targeting in the root of the shaded jar as well 
(including trying several adjustments to shade exceptions, adding a log4j 
specific shade transformer, etc.)

I’ve tried adjusting several different knobs/configurations but I’m still 
continually getting this same error. I’d be happy to share any additional 
configuration for the job any/or the FlinkDeployment where applicable. 

Just a bit stumped here on something that feels like it should just work.

Handling Batched Failures in ElasticsearchSink

2023-03-23 Thread Rion Williams
Hi all,

I have a pipeline that is currently reading from Kafka and writing to 
Elasticsearch. I recently was doing some testing for how it handles failures 
and was wondering if there’s a best practice or recommendation for doing so. 
Specifically, if I have a batch of 100 records being sent via a BulkProcessor 
call (internally from the sink), and a single record in the batch is bad (for 
whatever reason), how I might handle this.

Ideally, I’d be able to only retry the message(s) in the batch that failed, but 
that may require access to the BulkProcessor instance directly (if possible at 
all). I don’t see a way to easily discern or govern how reindexing should be 
handled within the onFailure handler, or if I would need access to the 
afterBulk handler on the processor specifically.

Just trying to leverage the batching without making a potentially large 
additional bulk request to Elastic due to one bad record in a batch.

Any recommendations on how I might handle this? It doesn’t seem like disabling 
batching (I.e. send one record at at time) is anywhere near performance enough 
and fails under large volumes.

Rion

(dev+user for reach)

Re: Handling JSON Serialization without Kryo

2023-03-22 Thread Rion Williams
Hi Ken,I’m going to profile the job today to try and get a better handle on where the bottleneck is. The job currently just passes around JsonObjects between the operators, which are relying on Kryo. The job also writes to Postgres, Kafka, and Elasticsearch so it’s possible that one of those is causing the back-pressure.I’m a bit shocked at the stunningly low speeds as well. Initially, the job would perform fine but checkpointing sizes would gradually build up (as would durations for them) until performance degraded to the borderline unusable 1-2 records/second.On Mar 21, 2023, at 2:35 PM, Ken Krugler  wrote:Hi Rion,I’m using Gson to deserialize to a Map.1-2 records/second sounds way too slow, unless each record is enormous.— KenOn Mar 21, 2023, at 6:18 AM, Rion Williams <rionmons...@gmail.com> wrote:Hi Ken,Thanks for the response. I hadn't tried exploring the use of the Record class, which I'm assuming you're referring to a flink.types.Record, to read the JSON into. Did you handle this via using a mapper to read the properties in (e.g. Gson, Jackson) as fields or take a different approach? Additionally, how has your experience been with performance? Kryo with the existing job leveraging JsonObjects (via Gson) is horrific (~1-2 records/second) and can't keep up with the speed of the producers, which is the impetus behind reevaluating the serialization.I'll explore this a bit more.Thanks,RionOn Mon, Mar 20, 2023 at 10:28 PM Ken Krugler <kkrugler_li...@transpac.com> wrote:Hi Rion,For my similar use case, I was able to make a simplifying assumption that my top-level JSON object was a record.I then registered a custom Kryo serde that knew how to handle the handful of JsonPrimitive types for the record entries.I recently looked at extending that to support arrays and nested records, but haven’t had to do that.— KenOn Mar 20, 2023, at 6:56 PM, Rion Williams <rionmons...@gmail.com> wrote:Hi Shammon,Unfortunately it’s a data stream job. I’ve been exploring a few options but haven’t found anything I’ve decided on yet. I’m currently looking at seeing if I can leverage some type of partial serialization to bind to the properties that I know the job will use and retain the rest as a JSON blob. I’ve also consider trying to store the fields as a large map of string-object pairs and translating thay into a string prior to writing to the sinks.Still accepting any/all ideas that I come across to see if I can handle this in an efficient, reasonable way.Thanks,RionOn Mar 20, 2023, at 8:40 PM, Shammon FY <zjur...@gmail.com> wrote:Hi RionIs your job datastream or table/sql? If it is a table/sql job, and you can define all the fields in json you need, then you can directly use json format [1] to parse the data. You can also customize udf functions to parse json data into struct data, such as map, row and other types supported by flink[1] https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/json/Best,Shammon FYOn Sun, Mar 19, 2023 at 7:44 AM Rion Williams <rionmons...@gmail.com> wrote:Hi all,

I’m reaching out today for some suggestions (and hopefully a solution) for a Flink job that I’m working on. The job itself reads JSON strings from a Kafka topic and reads those into JSONObjects (currently via Gson), which are then operated against, before ultimately being written out to Kafka again.

The problem here is that the shape of the data can vary wildly and dynamically. Some records may have properties unique to only that record, which makes defining a POJO difficult. In addition to this, the JSONObjects fall by to Kryo serialization which is leading to atrocious throughput.

I basically need to read in JSON strings, enrich properties on these objects, and ultimately write them to various sinks.  Is there some type of JSON-based class or library or an approach I could use to accomplish this in an efficient manner? Or if possibly a way to partially write a POJO that would allow me to interact with sections/properties of the JSON while retaining other properties that might be dynamically present or unique to the message?

Any advice or suggestions would be welcome! I’ll also be happy to provide any additional context if it would help!

Thanks,

Rion

(cross-posted to users+dev for reach)

--Ken Kruglerhttp://www.scaleunlimited.comCustom big data solutionsFlink, Pinot, Solr, Elasticsearch




--Ken Kruglerhttp://www.scaleunlimited.comCustom big data solutionsFlink, Pinot, Solr, Elasticsearch




Re: Handling JSON Serialization without Kryo

2023-03-20 Thread Rion Williams
Hi Shammon,Unfortunately it’s a data stream job. I’ve been exploring a few options but haven’t found anything I’ve decided on yet. I’m currently looking at seeing if I can leverage some type of partial serialization to bind to the properties that I know the job will use and retain the rest as a JSON blob. I’ve also consider trying to store the fields as a large map of string-object pairs and translating thay into a string prior to writing to the sinks.Still accepting any/all ideas that I come across to see if I can handle this in an efficient, reasonable way.Thanks,RionOn Mar 20, 2023, at 8:40 PM, Shammon FY  wrote:Hi RionIs your job datastream or table/sql? If it is a table/sql job, and you can define all the fields in json you need, then you can directly use json format [1] to parse the data. You can also customize udf functions to parse json data into struct data, such as map, row and other types supported by flink[1] https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/json/Best,Shammon FYOn Sun, Mar 19, 2023 at 7:44 AM Rion Williams <rionmons...@gmail.com> wrote:Hi all,

I’m reaching out today for some suggestions (and hopefully a solution) for a Flink job that I’m working on. The job itself reads JSON strings from a Kafka topic and reads those into JSONObjects (currently via Gson), which are then operated against, before ultimately being written out to Kafka again.

The problem here is that the shape of the data can vary wildly and dynamically. Some records may have properties unique to only that record, which makes defining a POJO difficult. In addition to this, the JSONObjects fall by to Kryo serialization which is leading to atrocious throughput.

I basically need to read in JSON strings, enrich properties on these objects, and ultimately write them to various sinks.  Is there some type of JSON-based class or library or an approach I could use to accomplish this in an efficient manner? Or if possibly a way to partially write a POJO that would allow me to interact with sections/properties of the JSON while retaining other properties that might be dynamically present or unique to the message?

Any advice or suggestions would be welcome! I’ll also be happy to provide any additional context if it would help!

Thanks,

Rion

(cross-posted to users+dev for reach)


Handling JSON Serialization without Kryo

2023-03-18 Thread Rion Williams
Hi all,

I’m reaching out today for some suggestions (and hopefully a solution) for a 
Flink job that I’m working on. The job itself reads JSON strings from a Kafka 
topic and reads those into JSONObjects (currently via Gson), which are then 
operated against, before ultimately being written out to Kafka again.

The problem here is that the shape of the data can vary wildly and dynamically. 
Some records may have properties unique to only that record, which makes 
defining a POJO difficult. In addition to this, the JSONObjects fall by to Kryo 
serialization which is leading to atrocious throughput.

I basically need to read in JSON strings, enrich properties on these objects, 
and ultimately write them to various sinks.  Is there some type of JSON-based 
class or library or an approach I could use to accomplish this in an efficient 
manner? Or if possibly a way to partially write a POJO that would allow me to 
interact with sections/properties of the JSON while retaining other properties 
that might be dynamically present or unique to the message?

Any advice or suggestions would be welcome! I’ll also be happy to provide any 
additional context if it would help!

Thanks,

Rion

(cross-posted to users+dev for reach)

Handling JSON Serialization without Kryo

2023-03-18 Thread Rion Williams
Hi all,

I’m reaching out today for some suggestions (and hopefully a solution) for a 
Flink job that I’m working on. The job itself reads JSON strings from a Kafka 
topic and reads those into JSONObjects (currently via Gson), which are then 
operated against, before ultimately being written out to Kafka again.

The problem here is that the shape of the data can vary wildly and dynamically. 
Some records may have properties unique to only that record, which makes 
defining a POJO difficult. In addition to this, the JSONObjects fall by to Kryo 
serialization which is leading to atrocious throughput.

I basically need to read in JSON strings, enrich properties on these objects, 
and ultimately write them to various sinks.  Is there some type of JSON-based 
class or library or an approach I could use to accomplish this in an efficient 
manner? Or if possibly a way to partially write a POJO that would allow me to 
interact with sections/properties of the JSON while retaining other properties 
that might be dynamically present or unique to the message?

Any advice or suggestions would be welcome! I’ll also be happy to provide any 
additional context if it would help!

Thanks,

Rion

(cross-posted to users+dev for reach)

Flink Forward Session Question

2022-12-31 Thread Rion Williams
Hey Flinkers,

Firstly, early Happy New Year’s to everyone in the community. I’ve been digging 
a bit into exactly-once processing with Flink and Pinot and I came across this 
session from Flink Foward last year:

- 
https://www.slideshare.net/FlinkForward/exactlyonce-financial-data-processing-at-scale-with-flink-and-pinot

I was curious if anyone knew if this session was recording as the deck itself 
seemed to have quite a bit of value. I figured the mailing list would be a 
reasonable place to ask.

Thanks in advance,

Rion

Re: Question Regarding State Migrations in Ververica Platform

2022-08-31 Thread Rion Williams
+dev

> On Aug 30, 2022, at 11:20 AM, Rion Williams  wrote:
> 
> 
> Hi all,
> 
> I wasn't sure if this would be the best audience, if not, please advise if 
> you know of a better place to ask it. I figured that at least some folks here 
> either work for Ververica or might have used their platform.
> 
> tl;dr; I'm trying to migrate an existing stateful Flink job to run in 
> Ververica Platform (Community) and I'm noticing that it doesn't seem that all 
> of the state is being properly handed off (only _metadata).
> 
> I'm currently in the process of migrating an existing Flink job that is 
> running in Kubernetes on its own to run within the Ververica platform. The 
> issue here is that the job itself is stateful, so I want to ensure I can 
> migrate over that state so when the new job kicks off, it's a fairly seamless 
> transition.
> 
> Basically, what I've done up to this point is create a script as part of the 
> Ververica platform deployment that will:
> Check for the existence of any of the known jobs that have been migrated.
> If one is found, it will stop the job, taking a full savepoint, and store the 
> savepoint path within a configmap for that job used solely for migration 
> purposes.
> If one is not found, it will assume the job has been migrated.
> Create a Deployment for each of the new jobs, pointing to the appropriate 
> configuration, jars, etc.
> Check for the presence of one of the previous migration configmaps and issue 
> a request to create a savepoint for that deployment.
> This involves using the Ververica REST API to grab the appropriate deployment 
> information and issuing a request to the Savepoints endpoint of the same REST 
> API to "add" the savepoint.
> I've confirmed the above "works" and indeed stops any legacy jobs, creates 
> the resources (i.e. configmaps) used for the migration, starts up the new job 
> within Ververica and I can see evidence within the UI that a savepoint was 
> "COPIED" for that deployment.
> 
> However, when comparing (in GCS) the previous savepoint for the old job and 
> the one now managed by Ververica for the job, I notice that the new one only 
> contains a single _metadata file:
> 
> 
> 
> Whereas the previous contained a metadata file and another related data file:
> 
> 
> This leads me to believe that the new job might not know about any items 
> previously stored in state, which could be problematic.
> 
> When reviewing over the documentation for "manually adding a savepoint" for 
> Ververica Platform 2.6, I noticed that the payload to the Savepoints endpoint 
> looked like the following, which was what I used:
> metadata:
>   deploymentId: ${deploymentId}
>   annotations:
> com.dataartisans.appmanager.controller.deployment.spec.version: 
> ${deploymentSpecVersion}
>   type: ${type} (used FULL in my case)
> spec:
>   savepointLocation:  ${savepointLocation}
>   flinkSavepointId: ----
> status:
>   state: COMPLETED
> 
> The empty UUID was a bit concerning and I was curious if that might be the 
> reason my additional data files didn't come across from the savepoint as well 
> (I noticed in 2.7 this is an optional argument in the payload). I don't see 
> much more for any additional configuration that would otherwise specify to 
> pull everything including _metadata.
> 
> Any ideas or guidance would be helpful. 
> 
> Rion
> 
> 
> 
> 


Question Regarding State Migrations in Ververica Platform

2022-08-30 Thread Rion Williams
Hi all,

I wasn't sure if this would be the best audience, if not, please advise if
you know of a better place to ask it. I figured that at least some folks
here either work for Ververica or might have used their platform.

*tl;dr; I'm trying to migrate an existing stateful Flink job to run in
Ververica Platform (Community) and I'm noticing that it doesn't seem that
all of the state is being properly handed off (only _metadata).*

I'm currently in the process of migrating an existing Flink job that is
running in Kubernetes on its own to run within the Ververica platform. The
issue here is that the job itself is stateful, so I want to ensure I can
migrate over that state so when the new job kicks off, it's a fairly
seamless transition.

Basically, what I've done up to this point is create a script as part of
the Ververica platform deployment that will:

   1. Check for the existence of any of the known jobs that have been
   migrated.
  - If one is found, it will stop the job, taking a full savepoint, and
  store the savepoint path within a configmap for that job used solely for
  migration purposes.
  - If one is not found, it will assume the job has been migrated.
   2. Create a Deployment for each of the new jobs, pointing to the
   appropriate configuration, jars, etc.
   3. Check for the presence of one of the previous migration configmaps
   and issue a request to create a savepoint for that deployment.
  1. This involves using the Ververica REST API to grab the appropriate
  deployment information and issuing a request to the Savepoints
endpoint of
  the same REST API to "add" the savepoint.

I've confirmed the above "works" and indeed stops any legacy jobs, creates
the resources (i.e. configmaps) used for the migration, starts up the new
job within Ververica and I can see evidence within the UI that a savepoint
was "COPIED" for that deployment.

However, when comparing (in GCS) the previous savepoint for the old job and
the one now managed by Ververica for the job, I notice that the new one
only contains a single _metadata file:

[image: image.png]

Whereas the previous contained a metadata file and another related data
file:

[image: image.png]
This leads me to believe that the new job might not know about any items
previously stored in state, which could be problematic.

When reviewing over the documentation for "manually adding a savepoint" for
Ververica Platform 2.6
,
I noticed that the payload to the Savepoints endpoint looked like the
following, which was what I used:

metadata:
  deploymentId: ${deploymentId}
  annotations:
com.dataartisans.appmanager.controller.deployment.spec.version:
${deploymentSpecVersion}
  type: ${type} (used FULL in my case)spec:
  savepointLocation:  ${savepointLocation}
  flinkSavepointId: ----status:
  state: COMPLETED


The empty UUID was a bit concerning and I was curious if that might be the
reason my additional data files didn't come across from the savepoint as
well (I noticed in 2.7 this is an optional argument in the payload). I
don't see much more for any additional configuration that would otherwise
specify to pull everything including _metadata.

Any ideas or guidance would be helpful.

Rion


Exception Handling in ElasticsearchSink

2022-04-21 Thread Rion Williams
Hi all,

I've recently been encountering some issues that I've noticed in the logs
of my Flink job that handles writing to an Elasticsearch index. I was
hoping to leverage some of the metrics that Flink exposes (or piggyback on
them) to update metric counters when I encounter specific kinds of errors.

val builder = ElasticsearchSink.Builder(...)

builder.setFailureHandler { actionRequest, throwable, _, _ ->
// Log error here (and update metrics via metricGroup.counter(...)
}

return builder.build()

Is there a way to handle this currently? My specific implementation has a
process function that manages multiple sinks (so I can create these
dynamically), but in the case of these errors, it doesn't look like I can
access the metric group within the setFailureHandler at present.

My initial thought was in my parent process function, I could pass in the
context to the child sinks so that I'd have context for the
exceptions/metrics:

class DynamicElasticsearchSink>(
/**
 * Defines a router that maps an element to its corresponding
ElasticsearchSink instance
 * @param sinkRouter A [ElasticSinkRouter] that takes an element
of type [ElementT], a string-based route
 * defined as [RouteT] which is used for caching sinks, and
finally the sink itself as [ElasticsearchSink]
 */
private val sinkRouter: ElasticsearchSinkRouter
) : RichSinkFunction(), CheckpointedFunction {

// Store a reference to all of the current routes
private val sinkRoutes: MutableMap = ConcurrentHashMap()
private lateinit var configuration: Configuration

override fun open(parameters: Configuration) {
configuration = parameters
}

override fun invoke(value: ElementT, context: SinkFunction.Context) {
val route = sinkRouter.getRoute(value)
var sink = sinkRoutes[route]
if (sink == null) {
// Here's where the sink is constructed when an exception occurs
sink = sinkRouter.createSink(route, value)
sink.runtimeContext = runtimeContext
sink.open(configuration)
sinkRoutes[route] = sink
}

sink.invoke(value, context)
}
}

I'd imagine within the open call for this function, I could store the
metrics group and pass it into my createSink() call so the child sinks
would have a reference to it. Does that seem feasible or is there another
way to handle this?

Thanks all,

Rion


Re: [ANNOUNCE] Apache Flink 1.14.0 released

2021-09-29 Thread Rion Williams
Great news all! Looking forward to it!

> On Sep 29, 2021, at 10:43 AM, Theo Diefenthal 
>  wrote:
> 
> 
> Awesome, thanks for the release.
> 
> - Ursprüngliche Mail -
> Von: "Dawid Wysakowicz" 
> An: "dev" , "user" , 
> annou...@apache.org
> Gesendet: Mittwoch, 29. September 2021 15:59:47
> Betreff: [ANNOUNCE] Apache Flink 1.14.0 released
> 
> The Apache Flink community is very happy to announce the release of
> Apache Flink 1.14.0.
>  
> Apache Flink® is an open-source stream processing framework for
> distributed, high-performing, always-available, and accurate data
> streaming applications.
>  
> The release is available for download at:
> https://flink.apache.org/downloads.html
>  
> Please check out the release blog post for an overview of the
> improvements for this bugfix release:
> https://flink.apache.org/news/2021/09/29/release-1.14.0.html
>  
> The full release notes are available in Jira:
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12349614
>  
> We would like to thank all contributors of the Apache Flink community
> who made this release possible!
>  
> Regards,
> Xintong, Joe, Dawid


Re: Dynamic Cluster/Index Routing for Elasticsearch Sink

2021-09-04 Thread Rion Williams
Hi again David et al,

I managed to push an initial pull request for the implementations for the
DynamicElasticsearchSink and related ElasticsearchSinkRouter last week
<https://github.com/apache/flink/pull/17061> and made some minor updates
today with regards to the Javadocs (included code examples, etc.) along
with a few tests that came to mind. I was hoping to get a few more eyes on
it and figure out what else might be worth adding/changing/documenting in
hopes of wrapping this feature up.

Thanks again to everyone in this incredible community for their assistance
with this, a local implementation of it for a project of mine is working
like a charm, so I'm hoping it's something that others will be able to
leverage for their own needs.

Rion

On Thu, Aug 26, 2021 at 11:45 AM David Morávek  wrote:

> Hi Rion,
>
> personally I'd start with unit test in the base module using a test sink
> implementation. There is already *DummyElasticsearchSink* that you may be
> able to reuse (just note that we're trying to get rid of Mockito based
> tests such as this one).
>
> I'm bit unsure that integration test would actually test anything extra
> that the unit test doesn't in this case, so I'd recommend it as the next
> step (I'm also bit concerned that this test would take a long time to
> execute / be resource intensive as it would need to spawn more elastic
> clusters?).
>
> Best,
> D.
>
> On Thu, Aug 26, 2021 at 5:47 PM Rion Williams 
> wrote:
>
>> Just chiming in on this again.
>>
>> I think I have the pieces in place regarding the implementation (both a
>> DynamicElasticsearchSink class and ElasticsearchSinkRouter interface) added
>> to the elasticsearch-base module. I noticed that HttpHost wasn't available
>> within that module/in the tests, so I'd suspect that I'd need to add a
>> dependency similar to those found within the specific ES implementations
>> (5/6/7). I'd also assume that it may be best to just provide a dummy sink
>> similar to the other patterning to handle writing the unit tests or would
>> you recommend separate Elasticsearch integration tests using a
>> TestContainer of each supported version (5/6/7) similar to those within the
>> ElasticsearchSinkITCase files under each module?
>>
>> Any advice / recommendations on this front would be helpful. I want to
>> write some tests surrounding this that demonstrate the most common
>> use-cases, but also don't want to go overkill.
>>
>> Thanks again for all of your help,
>>
>> Rion
>>
>> On Wed, Aug 25, 2021 at 2:10 PM Rion Williams 
>> wrote:
>>
>>> Thanks again David,
>>>
>>> I've spun up a JIRA issue for the ticket
>>> <https://issues.apache.org/jira/browse/FLINK-23977> while I work on
>>> getting things into the proper state. If someone with the
>>> appropriate privileges could assign it to me, I'd be appreciative. I'll
>>> likely need some assistance at a few points to ensure things look as
>>> expected, but I'm happy to help with this contribution.
>>>
>>> Rion
>>>
>>> On Wed, Aug 25, 2021 at 11:37 AM David Morávek  wrote:
>>>
>>>> AFAIK there are currently no other sources in Flink that can treat
>>>> "other sources" / "destination" as data. Most complete generic work on this
>>>> topic that I'm aware of are Splittable DoFn based IOs in Apache Beam.
>>>>
>>>> I think the best module for the contribution would be
>>>> "elasticsearch-base", because this could be easily reused for all ES
>>>> versions that we currently support.
>>>>
>>>> Best,
>>>> D.
>>>>
>>>> On Wed, Aug 25, 2021 at 4:58 PM Rion Williams 
>>>> wrote:
>>>>
>>>>> Hi David,
>>>>>
>>>>> That was perfect and it looks like this is working as I'd expected. I
>>>>> put together some larger integration tests for my specific use-case
>>>>> (multiple Elasticsearch clusters running in TestContainers) and verified
>>>>> that messages were being routed dynamically to the appropriate sinks. I
>>>>> forked the Flink repo last night and was trying to figure out the best
>>>>> place to start adding these classes in (I noticed that there were three
>>>>> separate ES packages targeting 5/6/7 respectively). I was going to try to
>>>>> start fleshing the initial implementation for this, but wanted to make 
>>>>> sure
>>>>> that I was starting in the right place.
>>>>>
>>>>>

Re: Dynamic Cluster/Index Routing for Elasticsearch Sink

2021-08-26 Thread Rion Williams
Just chiming in on this again.

I think I have the pieces in place regarding the implementation (both a
DynamicElasticsearchSink class and ElasticsearchSinkRouter interface) added
to the elasticsearch-base module. I noticed that HttpHost wasn't available
within that module/in the tests, so I'd suspect that I'd need to add a
dependency similar to those found within the specific ES implementations
(5/6/7). I'd also assume that it may be best to just provide a dummy sink
similar to the other patterning to handle writing the unit tests or would
you recommend separate Elasticsearch integration tests using a
TestContainer of each supported version (5/6/7) similar to those within the
ElasticsearchSinkITCase files under each module?

Any advice / recommendations on this front would be helpful. I want to
write some tests surrounding this that demonstrate the most common
use-cases, but also don't want to go overkill.

Thanks again for all of your help,

Rion

On Wed, Aug 25, 2021 at 2:10 PM Rion Williams  wrote:

> Thanks again David,
>
> I've spun up a JIRA issue for the ticket
> <https://issues.apache.org/jira/browse/FLINK-23977> while I work on
> getting things into the proper state. If someone with the
> appropriate privileges could assign it to me, I'd be appreciative. I'll
> likely need some assistance at a few points to ensure things look as
> expected, but I'm happy to help with this contribution.
>
> Rion
>
> On Wed, Aug 25, 2021 at 11:37 AM David Morávek  wrote:
>
>> AFAIK there are currently no other sources in Flink that can treat "other
>> sources" / "destination" as data. Most complete generic work on this topic
>> that I'm aware of are Splittable DoFn based IOs in Apache Beam.
>>
>> I think the best module for the contribution would be
>> "elasticsearch-base", because this could be easily reused for all ES
>> versions that we currently support.
>>
>> Best,
>> D.
>>
>> On Wed, Aug 25, 2021 at 4:58 PM Rion Williams 
>> wrote:
>>
>>> Hi David,
>>>
>>> That was perfect and it looks like this is working as I'd expected. I
>>> put together some larger integration tests for my specific use-case
>>> (multiple Elasticsearch clusters running in TestContainers) and verified
>>> that messages were being routed dynamically to the appropriate sinks. I
>>> forked the Flink repo last night and was trying to figure out the best
>>> place to start adding these classes in (I noticed that there were three
>>> separate ES packages targeting 5/6/7 respectively). I was going to try to
>>> start fleshing the initial implementation for this, but wanted to make sure
>>> that I was starting in the right place.
>>>
>>> Additionally, do you know of anything that might be similar to this even
>>> within other sinks? Just trying to think of something to model this after.
>>> Once I get things started, I'll spin up a JIRA issue for it and go from
>>> there.
>>>
>>> Thanks so much for your help!
>>>
>>> Rion
>>>
>>> On Tue, Aug 24, 2021 at 1:45 AM David Morávek  wrote:
>>>
>>>> Hi Rion,
>>>>
>>>> you just need to call *sink.setRuntimeContext(getRuntimeContext())*
>>>> before opening the child sink. Please see *AbstractRichFunction* [1]
>>>> (that EleasticsearchSink extends) for more details.
>>>>
>>>> One more note, instead of starting with integration test, I'd recommend
>>>> writing a unit test using *operator test harness* [2] first. This
>>>> should help you to discover / debug many issues upfront. You can use
>>>> *ElasticsearchSinkBaseTest* [3] as an example.
>>>>
>>>> [1]
>>>> https://github.com/apache/flink/blob/release-1.13.2/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java#L52
>>>> [2]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/testing/#unit-testing-stateful-or-timely-udfs--custom-operators
>>>> [3]
>>>> https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java
>>>>
>>>> Best,
>>>> D.
>>>>
>>>> On Tue, Aug 24, 2021 at 12:03 AM Rion Williams 
>>>> wrote:
>>>>
>>>>> Hi David,
>>>>>
>>>>> Thanks again for the response, I believe that I'm getting pretty close
>>>>> for at least a POC-level

Re: Dynamic Cluster/Index Routing for Elasticsearch Sink

2021-08-25 Thread Rion Williams
Thanks again David,

I've spun up a JIRA issue for the ticket
<https://issues.apache.org/jira/browse/FLINK-23977> while I work on getting
things into the proper state. If someone with the
appropriate privileges could assign it to me, I'd be appreciative. I'll
likely need some assistance at a few points to ensure things look as
expected, but I'm happy to help with this contribution.

Rion

On Wed, Aug 25, 2021 at 11:37 AM David Morávek  wrote:

> AFAIK there are currently no other sources in Flink that can treat "other
> sources" / "destination" as data. Most complete generic work on this topic
> that I'm aware of are Splittable DoFn based IOs in Apache Beam.
>
> I think the best module for the contribution would be
> "elasticsearch-base", because this could be easily reused for all ES
> versions that we currently support.
>
> Best,
> D.
>
> On Wed, Aug 25, 2021 at 4:58 PM Rion Williams 
> wrote:
>
>> Hi David,
>>
>> That was perfect and it looks like this is working as I'd expected. I put
>> together some larger integration tests for my specific use-case (multiple
>> Elasticsearch clusters running in TestContainers) and verified that
>> messages were being routed dynamically to the appropriate sinks. I forked
>> the Flink repo last night and was trying to figure out the best place to
>> start adding these classes in (I noticed that there were three separate ES
>> packages targeting 5/6/7 respectively). I was going to try to start
>> fleshing the initial implementation for this, but wanted to make sure that
>> I was starting in the right place.
>>
>> Additionally, do you know of anything that might be similar to this even
>> within other sinks? Just trying to think of something to model this after.
>> Once I get things started, I'll spin up a JIRA issue for it and go from
>> there.
>>
>> Thanks so much for your help!
>>
>> Rion
>>
>> On Tue, Aug 24, 2021 at 1:45 AM David Morávek  wrote:
>>
>>> Hi Rion,
>>>
>>> you just need to call *sink.setRuntimeContext(getRuntimeContext())*
>>> before opening the child sink. Please see *AbstractRichFunction* [1]
>>> (that EleasticsearchSink extends) for more details.
>>>
>>> One more note, instead of starting with integration test, I'd recommend
>>> writing a unit test using *operator test harness* [2] first. This
>>> should help you to discover / debug many issues upfront. You can use
>>> *ElasticsearchSinkBaseTest* [3] as an example.
>>>
>>> [1]
>>> https://github.com/apache/flink/blob/release-1.13.2/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java#L52
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/testing/#unit-testing-stateful-or-timely-udfs--custom-operators
>>> [3]
>>> https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java
>>>
>>> Best,
>>> D.
>>>
>>> On Tue, Aug 24, 2021 at 12:03 AM Rion Williams 
>>> wrote:
>>>
>>>> Hi David,
>>>>
>>>> Thanks again for the response, I believe that I'm getting pretty close
>>>> for at least a POC-level implementation of this. Currently, I'm working
>>>> with JsonObject instances throughout the pipeline, so I wanted to try this
>>>> out and simply stored the routing information within the element itself for
>>>> simplicity's sake right now, so it has a shape that looks something like
>>>> this:
>>>>
>>>> {
>>>> "route": {
>>>> "hosts": "...",
>>>> "index": "...",
>>>> ...
>>>> },
>>>> "all-other-fields-here"
>>>> }
>>>>
>>>> And I've stripped back several of the layers of the routers (since I
>>>> already have all of the information in the element at that point). I tried
>>>> using something like this:
>>>>
>>>> class DynamicElasticsearchSink: RichSinkFunction(), 
>>>> CheckpointedFunction {
>>>> private val sinkRoutes: MutableMap>>> ElasticsearchSink> = ConcurrentHashMap()
>>>> private lateinit var configuration: Configuration
>>>>
>>>> override fun open(parameters: Configuration) {
>>>> configurati

Re: Dynamic Cluster/Index Routing for Elasticsearch Sink

2021-08-25 Thread Rion Williams
Hi David,

That was perfect and it looks like this is working as I'd expected. I put
together some larger integration tests for my specific use-case (multiple
Elasticsearch clusters running in TestContainers) and verified that
messages were being routed dynamically to the appropriate sinks. I forked
the Flink repo last night and was trying to figure out the best place to
start adding these classes in (I noticed that there were three separate ES
packages targeting 5/6/7 respectively). I was going to try to start
fleshing the initial implementation for this, but wanted to make sure that
I was starting in the right place.

Additionally, do you know of anything that might be similar to this even
within other sinks? Just trying to think of something to model this after.
Once I get things started, I'll spin up a JIRA issue for it and go from
there.

Thanks so much for your help!

Rion

On Tue, Aug 24, 2021 at 1:45 AM David Morávek  wrote:

> Hi Rion,
>
> you just need to call *sink.setRuntimeContext(getRuntimeContext())*
> before opening the child sink. Please see *AbstractRichFunction* [1]
> (that EleasticsearchSink extends) for more details.
>
> One more note, instead of starting with integration test, I'd recommend
> writing a unit test using *operator test harness* [2] first. This should
> help you to discover / debug many issues upfront. You can use
> *ElasticsearchSinkBaseTest* [3] as an example.
>
> [1]
> https://github.com/apache/flink/blob/release-1.13.2/flink-core/src/main/java/org/apache/flink/api/common/functions/AbstractRichFunction.java#L52
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/testing/#unit-testing-stateful-or-timely-udfs--custom-operators
> [3]
> https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java
>
> Best,
> D.
>
> On Tue, Aug 24, 2021 at 12:03 AM Rion Williams 
> wrote:
>
>> Hi David,
>>
>> Thanks again for the response, I believe that I'm getting pretty close
>> for at least a POC-level implementation of this. Currently, I'm working
>> with JsonObject instances throughout the pipeline, so I wanted to try this
>> out and simply stored the routing information within the element itself for
>> simplicity's sake right now, so it has a shape that looks something like
>> this:
>>
>> {
>> "route": {
>> "hosts": "...",
>> "index": "...",
>> ...
>> },
>> "all-other-fields-here"
>> }
>>
>> And I've stripped back several of the layers of the routers (since I
>> already have all of the information in the element at that point). I tried
>> using something like this:
>>
>> class DynamicElasticsearchSink: RichSinkFunction(), 
>> CheckpointedFunction {
>> private val sinkRoutes: MutableMap> ElasticsearchSink> = ConcurrentHashMap()
>> private lateinit var configuration: Configuration
>>
>> override fun open(parameters: Configuration) {
>> configuration = parameters
>> }
>>
>> override fun invoke(element: JsonObject, context: SinkFunction.Context) {
>> val route = getHost(element)
>> // Check if we already have a router for this cluster
>> var sink = sinkRoutes[route]
>> if (sink == null) {
>> // If not, create one
>> sink = buildSinkFromRoute(element)
>> sink.open(configuration)
>> sinkRoutes[route] = sink
>> }
>>
>> sink.invoke(element, context)
>> }
>>
>> override fun initializeState(context: FunctionInitializationContext) {
>> // No-op.
>> }
>>
>> override fun snapshotState(context: FunctionSnapshotContext) {
>> // This is used only to flush pending writes.
>> for (sink in sinkRoutes.values) {
>> sink.snapshotState(context)
>> }
>> }
>>
>> override fun close() {
>> for (sink in sinkRoutes.values) {
>> sink.close()
>> }
>> }
>>
>> private fun buildSinkFromRoute(element: JsonObject, ho): 
>> ElasticsearchSink {
>> val builder = ElasticsearchSink.Builder(
>> buildHostsFromElement(element),
>> ElasticsearchRoutingFunction()
>> )
>>
>> builder.setBulkFlushMaxActions(1)
>>
>> // TODO: Configure authorization if available
>> //

Re: Dynamic Cluster/Index Routing for Elasticsearch Sink

2021-08-23 Thread Rion Williams
quot;The runtime context has not
been initialized". *I had assumed this would be alright since the context
for the "wrapper" should have already been initialized, but maybe there's
something that I'm missing.

Also, please forgive any hastily written or nasty code as this is purely a
POC to see if I could get this to work using a single object. I have the
hopes of cleaning it up and genericizing it after I am confident that it
actually works.

Thanks so much again,

Rion

On Mon, Aug 23, 2021 at 11:12 AM David Morávek  wrote:

> Hi Rion,
>
> Sorry for late reply, I've missed your previous message. Thanks Arvid for
> the reminder <3.
>
> something like a MessageWrapper and pass those
>> elements to the sink, which would create the tenant-specific Elastic
>> connection from the ConfigurationT element and handle caching it and
>> then just grab the element and send it on it's way?
>
>
> Yes, this is exactly what I had in mind. There should be almost no
> overhead as sink can be easily chained with your join
> (KeyedCoProcessFunction) function.
>
>-
>-
>>
>>The shape of the elements being evicted from the process function (Is
>>a simple wrapper with the configuration for the sink enough here? Do I 
>> need
>>to explicitly initialize the sink within this function? Etc.)
>
>-
>- To write an element you need a configuration for the destination and
>the element itself, so a tuple of *(ElasticConfiguration, Element)*
>should be enough (that's basically your MessageWrapperConfigurationT> class).
>
>
>-
>-
>>
>>The actual use of the *DynamicElasticsearchSink* class (Would it just
>>be something like an *.addSink(**DynamicElasticSearchSink<**String,
>>Configuration>())* or perhaps something else entirely?)
>
>-
>
> I guess it could look something like the snippet below. It would be
> definitely good to play around with the *DynamicElasticSearchSink* API
> and make it more meaningful / user friendly (the gist I've shared was just
> a very rough prototype to showcase the idea).
>
>
>- static class Destination {
>
>private final List httpHosts;
>
>Destination(List httpHosts) {
>this.httpHosts = httpHosts;
>}
>}
>-
>- final DataStream> toWrite = ...;
>toWrite.addSink(
>new DynamicElasticsearchSink<>(
>new SinkRouter<
>Tuple2,
>String,
>ElasticsearchSinkString>>>() {
>
>@Override
>public String getRoute(Tuple2
>element) {
>- // Construct a deterministic unique caching
>key for the destination... (this could be cheaper if you know the data)
>return element.f0.httpHosts.stream()
>.map(HttpHost::toHostString)
>.collect(Collectors.joining(","));
>}
>
>@Override
>public ElasticsearchSinkString>> createSink(
>String cacheKey, Tuple2String> element) {
>return new ElasticsearchSink.Builder<>(
>element.f0.httpHosts,
>(ElasticsearchSinkFunction<
>
>Tuple2>)
>(el, ctx, indexer) -> {
>// Construct index
>request.
>final IndexRequest
>request = ...;
>
>indexer.add(request);
>})
>.build();
>}
>}));
>
>
> I hope this helps ;)
>
> Best,
> D.
>
>
> On Mon, Aug 16, 2021 at 5:18 PM Rion Williams 
> wrote:
>
>> Thanks for this suggestion David, it's extremely helpful.
>>
>> Since this will vary depending on the elements retrieved from a separate
>> stream, I'm guessing something like the following would be roughly the
>> avenue to continue down:
>>
>> fun main(args: Array) {
>> val parameters = mergeParametersFromProperties(args)
>> val stream = StreamExecutionEnvironment.getExecutionEnvironment()
>>
>> // Get the stream for tenant-specific Elastic configurations
>> val connectionStream = stream
>

Re: Handling HTTP Requests for Keyed Streams

2021-08-17 Thread Rion Williams
Hi Caizhi,

I don’t mind the request being synchronous (or not using the Async I/O 
connectors). Assuming I go down that route would this be the appropriate way to 
handle this? Specifically creating an HttpClient and storing the result in 
state and on a keyed stream if the state was empty?

It makes sense to me, just wondering if there are any gotchas or 
recommendations in terms of a client that might support things like retries and 
if this a good pattern to accomplish this.

Thanks,

Rion

> On Aug 16, 2021, at 11:57 PM, Caizhi Weng  wrote:
> 
> 
> Hi!
> 
> As you mentioned that the configuration fetching is very infrequent, why 
> don't you use a blocking approach to send HTTP requests and receive 
> responses? This seems like a more reasonable solution to me.
> 
> Rion Williams  于2021年8月17日周二 上午4:00写道:
>> Hi all,
>> 
>> I've been exploring a few different options for storing tenant-specific 
>> configurations within Flink state based on the messages I have flowing 
>> through my job. Initially I had considered creating a source that would 
>> periodically poll an HTTP API and connect that stream to my original event 
>> stream.
>> 
>> However, I realized that this configuration information would basically 
>> never change and thus it doesn't quite make sense to poll so frequently. My 
>> next approach would be to have a function that would be keyed (by tenant) 
>> and storing the configuration for that tenant in state (and issue an HTTP 
>> call when I did not have it). Something like this:
>> 
>> class ConfigurationLookupFunction: KeyedProcessFunction> JsonObject>() {
>> // Tenant specific configuration
>> private lateinit var httpClient: HttpClient
>> private lateinit var configuration: ValueState
>> 
>> override fun open(parameters: Configuration) {
>> super.open(parameters)
>> httpClient = HttpClient.newHttpClient()
>> }
>> 
>> override fun processElement(message: JsonObject, context: Context, out: 
>> Collector) {
>> if (configuration.value() == null){
>> // Issue a request to the appropriate API to load the 
>> configuration
>> val url = 
>> HttpRequest.newBuilder(URI.create(".../${context.currentKey}")).build()
>> httpClient.send(..., {
>> // Store the configuration info within state here
>> configuration.update(...)
>> })
>> 
>> out.collect(message)
>> }
>> else {
>> // Get the configuration information and pass it downstream to 
>> be used by the sink
>> out.collect(message)
>> }
>> }
>> }
>> I didn't see any support for using the Async I/O functions from a keyed 
>> context, otherwise I'd imagine that would be ideal. The requests themselves 
>> should be very infrequent (initial call per tenant) and I'd imagine after 
>> that the necessary configuration could be pulled/stored within the state for 
>> that key.
>> 
>> Is there a good way of handling this that I might be overlooking with an 
>> existing Flink construct or function? I'd love to be able to leverage the 
>> Async I/O connectors as they seem pretty well thought out.
>> 
>> Thanks in advance!
>> 
>> Rion
>> 
>> 


Handling HTTP Requests for Keyed Streams

2021-08-16 Thread Rion Williams
Hi all,

I've been exploring a few different options for storing tenant-specific
configurations within Flink state based on the messages I have flowing
through my job. Initially I had considered creating a source that would
periodically poll an HTTP API and connect that stream to my original event
stream.

However, I realized that this configuration information would basically
never change and thus it doesn't quite make sense to poll so frequently. My
next approach would be to have a function that would be keyed (by tenant)
and storing the configuration for that tenant in state (and issue an HTTP
call when I did not have it). Something like this:

class ConfigurationLookupFunction: KeyedProcessFunction() {
// Tenant specific configuration
private lateinit var httpClient: HttpClient
private lateinit var configuration: ValueState

override fun open(parameters: Configuration) {
super.open(parameters)
httpClient = HttpClient.newHttpClient()
}

override fun processElement(message: JsonObject, context: Context,
out: Collector) {
if (configuration.value() == null){
// Issue a request to the appropriate API to load the configuration
val url =
HttpRequest.newBuilder(URI.create(".../${context.currentKey}")).build()
httpClient.send(..., {
// Store the configuration info within state here
configuration.update(...)
})

out.collect(message)
}
else {
// Get the configuration information and pass it
downstream to be used by the sink
out.collect(message)
}
}
}

I didn't see any support for using the Async I/O functions from a keyed
context, otherwise I'd imagine that would be ideal. The requests themselves
should be very infrequent (initial call per tenant) and I'd imagine after
that the necessary configuration could be pulled/stored within the state
for that key.

Is there a good way of handling this that I might be overlooking with an
existing Flink construct or function? I'd love to be able to leverage the
Async I/O connectors as they seem pretty well thought out.

Thanks in advance!

Rion


Re: Dynamic Cluster/Index Routing for Elasticsearch Sink

2021-08-16 Thread Rion Williams
Thanks for this suggestion David, it's extremely helpful.

Since this will vary depending on the elements retrieved from a separate
stream, I'm guessing something like the following would be roughly the
avenue to continue down:

fun main(args: Array) {
val parameters = mergeParametersFromProperties(args)
val stream = StreamExecutionEnvironment.getExecutionEnvironment()

// Get the stream for tenant-specific Elastic configurations
val connectionStream = stream
.fromSource(
KafkaSource.of(parameters, listOf("elastic-configs")),
WatermarkStrategy.noWatermarks(),
"elastic-configs"
)

// Get the stream of incoming messages to be routed to Elastic
stream
.fromSource(
KafkaSource.of(parameters, listOf("messages")),
WatermarkStrategy.noWatermarks(),
"messages"
)
.keyBy { message ->
// Key by the tenant in the message
message.getTenant()
}
.connect(
// Connect the messages stream with the configurations
connectionStream
)
.process(object : KeyedCoProcessFunction() {
// For this key, we need to store all of the previous
messages in state
// in the case where we don't have a given mapping for
this tenant yet
lateinit var messagesAwaitingConfigState: ListState
lateinit var configState: ValueState

override fun open(parameters: Configuration) {
super.open(parameters)
// Initialize the states
messagesAwaitingConfigState =
runtimeContext.getListState(awaitingStateDesc)
configState = runtimeContext.getState(configStateDesc)
}

// When an element is received
override fun processElement1(message: String, context:
Context, out: Collector) {
// Check if we have a mapping
if (configState.value() == null){
// We don't have a mapping for this tenant, store
messages until we get it
messagesAwaitingConfigState.add(message)
}
else {
// Output the record with some indicator of the route?
out.collect(message)
}
}

override fun processElement2(config: String, context:
Context, out: Collector) {
// If this mapping is for this specific tenant, store
it and flush the pending
// records in state
if (config.getTenant() == context.currentKey){
configState.update(config)
val messagesToFlush = messagesAwaitingConfigState.get()
messagesToFlush.forEach { message ->
out.collect(message)
}
}
}

// State descriptors
val awaitingStateDesc = ListStateDescriptor(
"messages-awaiting-config",
TypeInformation.of(String::class.java)
)

val configStateDesc = ValueStateDescriptor(
"elastic-config",
TypeInformation.of(String::class.java)
)
})

stream.executeAsync("$APPLICATION_NAME-job")
}

Basically, connect my tenant-specific configuration stream with my incoming
messages (keyed by tenant) and buffer them until I have a corresponding
configuration (to avoid race-conditions). However, I'm guessing what will
happen here is rather than directly outputting the messages from this
process function, I'd construct some type of wrapper here with the
necessary routing/configuration for the message (obtained via the
configuration stream) along with the element, which might be something like
a MessageWrapper and pass those elements to the
sink, which would create the tenant-specific Elastic connection from the
ConfigurationT element and handle caching it and then just grab the element
and send it on it's way?

Those are really the only bits I'm stuck on at the moment:

   1. The shape of the elements being evicted from the process function (Is
   a simple wrapper with the configuration for the sink enough here? Do I need
   to explicitly initialize the sink within this function? Etc.)
   2. The actual use of the DynamicElasticsearchSink class (Would it just
   be something like an .addSink(DynamicElasticSearchSink()) or perhaps something else entirely?)

Thanks again so much for the advice thus far David, it's greatly
appreciated.

Rion

On Fri, Aug 13, 2021 at 9:04 AM David Morávek  wrote:

> To give you a better idea, in high-level I think could look something like
> this <https://gist.github.com/dmvk/3f8124d585cd33e52cacd4a38b80f8c8> [1].
>
> [1] https://gist.github.com/dmvk/3f8124d585cd33e52cacd4a38b80f8c8
&g

Re: Dynamic Cluster/Index Routing for Elasticsearch Sink

2021-08-13 Thread Rion Williams
Hi David,

Thanks for your response! I think there are currently quite a few unknowns in 
my end in terms of what a production loads look like but I think the number of 
clusters shouldn’t be too large (and will either rarely change or have new 
entries come in at runtime, but it needs to support that).

I think the dynamic approach might be a good route to explore with actual 
changes to the Elasticsearch sink as a longer term option. I’m not sure what 
the dynamic one would look like at the moment though, perhaps that’s something 
you’d be able to advise on?

Given that all the records are keyed for a given tenant and I would have the 
mappings stored in state, is it possible that within the open() function for 
this dynamic route to access the state and initialize the client there? Or 
maybe there’s some other approach (such as grouping by clusters and dynamically 
handling indices)?

I’d be happy to give a shot at making the appropriate changes to the sink as 
well, although I’m far from an Elastic expert. If you point me in the right 
direction, I may be able to help out.

Thanks much!

Rion

> On Aug 13, 2021, at 6:52 AM, David Morávek  wrote:
> 
> 
> Hi Rion,
> 
> As you probably already know, for dynamic indices, you can simply implement 
> your own ElasticsearchSinkFunction [1], where you can create any request that 
> elastic client supports.
> 
> The tricky part is how to implement dynamic routing into multiple clusters. 
> - If the elastic clusters are known upfront (before submitting job), you can 
> easily create multiple elastic sinks and prepend them with a simple filter 
> (this is basically what split operator does).
> - If you discover elastics clusters at runtime, this would require some 
> changes of the current ElasticsearchSink implementation. I think this may be 
> actually as simple as introducing something like DynamicElasticsearchSink, 
> that could dynamically create and managed "child" sinks. This would probably 
> require some thoughts about how to manage consumed resources (memory), 
> because number of child sink could be potentially unbounded. This could be of 
> course simplified if underlying elastic client already supports that, which 
> I'm not aware of. If you'd like to take this path, it would definitely be a 
> great contribution to Flink (I'm able to provide some guidance). 
> 
> [1] 
> https://github.com/apache/flink/blob/release-1.13.2/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java
> 
> Best,
> D.
> 
>> On Sun, Aug 8, 2021 at 4:24 PM Rion Williams  wrote:
>> Hi folks,
>> 
>> I have a use-case that I wanted to initially pose to the mailing list as I’m 
>> not terribly familiar with the Elasticsearch connector to ensure I’m not 
>> going down the wrong path trying to accomplish this in Flink (or if 
>> something downstream might be a better option).
>> 
>> Basically, I have the following pieces to the puzzle:
>> A stream of tenant-specific events
>> An HTTP endpoint containing mappings for tenant-specific Elastic cluster 
>> information (as each tenant has its own specific Elastic cluster/index)
>> What I’m hoping to accomplish is the following:
>> One stream will periodically poll the HTTP endpoint and store these cluster 
>> mappings in state (keyed by tenant with cluster info as the value)
>> The event stream will be keyed by tenant and connected to the cluster 
>> mappings stream.
>> I’ll need to an Elasticsearch sink that can route the tenant-specific event 
>> data to its corresponding cluster/index from the mapping source.
>> I know that the existing Elasticsearch sink supports dynamic indices, 
>> however I didn’t know if it’s possible to adjust the cluster like I would 
>> need on a per-tenant basis or if there’s a better approach here? 
>> 
>> Any advice would be appreciated.
>> 
>> Thanks,
>> 
>> Rion


Dynamic Cluster/Index Routing for Elasticsearch Sink

2021-08-08 Thread Rion Williams
Hi folks,

I have a use-case that I wanted to initially pose to the mailing list as I’m 
not terribly familiar with the Elasticsearch connector to ensure I’m not going 
down the wrong path trying to accomplish this in Flink (or if something 
downstream might be a better option).

Basically, I have the following pieces to the puzzle:
A stream of tenant-specific events
An HTTP endpoint containing mappings for tenant-specific Elastic cluster 
information (as each tenant has its own specific Elastic cluster/index)
What I’m hoping to accomplish is the following:
One stream will periodically poll the HTTP endpoint and store these cluster 
mappings in state (keyed by tenant with cluster info as the value)
The event stream will be keyed by tenant and connected to the cluster mappings 
stream.
I’ll need to an Elasticsearch sink that can route the tenant-specific event 
data to its corresponding cluster/index from the mapping source.
I know that the existing Elasticsearch sink supports dynamic indices, however I 
didn’t know if it’s possible to adjust the cluster like I would need on a 
per-tenant basis or if there’s a better approach here? 

Any advice would be appreciated.

Thanks,

Rion

Re: Dead Letter Queue for JdbcSink

2021-08-03 Thread Rion Williams
Thanks, I figured that was the preferred approach. I’ve noticed that at times 
it doesn’t seem like it’s catching the exceptions (just have the wrapped sink 
wrapped with a try-catch block at the moment).

Did you have to do anything special on that front? I’d assumed that catching 
the IOException after retries had been exhausted was probably the best bet, 
just wasn’t sure if anything might be missing.

Thanks,

Rion

> On Aug 3, 2021, at 11:16 AM, Maciej Bryński  wrote:
> 
> Hi Rion,
> We're using plain Kafka producer to send records to DLQ.
> 
> Regards,
> Maciek
> 
> wt., 3 sie 2021 o 18:07 Rion Williams  napisał(a):
>> 
>> Thanks Maciek,
>> 
>> It looks like my initial issue had another problem with a bad interface that 
>> was being used (or an improper one), but after changing that and ensuring 
>> all of the fields were implemented it worked as expected.
>> 
>> I know in my particular case, I'm planning on writing to Kafka, however my 
>> wrapped function isn't a process function and thus it isn't as simple as 
>> supplying a side-output and sending those to Kafka. I'm guessing in this 
>> scenario, it'd be sufficient to have a plain Kafka producer (created within 
>> the open() function) and just use that as opposed to constructing a sink and 
>> explicitly invoking it.
>> 
>> ```
>> catch (exception: Exception) {
>> // I'd imagine that the context here would require a second level of 
>> mapping to ensure that
>> // we have the proper context for the sink itself
>> dlqSink.invoke(value, context)
>> 
>> // Or this would be the alternative
>> dlqProducer.send(..., value)
>> }
>> ```
>> 
>> I don't know if you have the same scenario (or are leveraging Kafka), but if 
>> so, I'd be curious of the approach that you took.
>> 
>> Thanks much,
>> 
>> Rion
>> 
>>> On 2021/08/03 08:45:07, Maciej Obuchowski  
>>> wrote:
>>> Hey.
>>> 
>>> As far as I see, you're not overriding functions like open,
>>> setRuntimeContext, snapshotState, initializeState - the calls needs to
>>> be passed to the inner sink function.
>>> 
>>> pon., 2 sie 2021 o 19:31 Rion Williams  napisał(a):
>>>> 
>>>> Hi again Maciek (and all),
>>>> 
>>>> I just recently returned to start investigating this approach, however I 
>>>> can't seem to get the underlying invocation to work as I would normally 
>>>> expect. I'll try to share a bit more as what I currently have and perhaps 
>>>> I'm just missing something minor that someone may be able to spot.
>>>> 
>>>> To reiterate - what I'm attempting to do is take a stream of events 
>>>> flowing through, specific types of entities are extracted from these 
>>>> events into multiple side-outputs, and these side-outputs are passed to a 
>>>> sync that will write them via JDBC using logic specific to that entity. 
>>>> What I am aiming to achieve is being able to capture a single record that 
>>>> may be problematic and avoid a poison pill to throw onto a dead-letter 
>>>> queue (Kafka). I understand this would mean limiting batching sizes to a 
>>>> single record, however I'm assuming that the connections themselves could 
>>>> be pooled possibly to avoid opening up a new connection per call. If this 
>>>> isn't the case, is there a way to handle that (or would I need to 
>>>> implement my own sync).
>>>> 
>>>> ```
>>>> val users = Tags.users
>>>>parsedChangelogs
>>>>.getSideOutput(users)
>>>>.addSink(PostgresSink.fromEntityType(users.typeInfo, 
>>>> parameters))
>>>>.uid("sync-${users.id}-to-postgres")
>>>>.name("sync-${users.id}-to-postgres")
>>>> 
>>>> val addresses = Tags.addresses
>>>>parsedChangelogs
>>>>.getSideOutput(addresses)
>>>>.addSink(PostgresSink.fromEntityType(addresses.typeInfo, 
>>>> parameters))
>>>>.uid("sync-${addresses.id}-to-postgres")
>>>>.name("sync-${addresses.id}-to-postgres")
>>>> ```
>>>> 
>>>> And the dynamic sink (that would associate a given entity to the necessary 
>>>> calls made to the database) looks a bit like this:
>>>> 
>>>> ```
>>>> fun  fromEntityType(typeInfo: TypeInformatio

Re: Dead Letter Queue for JdbcSink

2021-08-03 Thread Rion Williams
Thanks Maciek, 

It looks like my initial issue had another problem with a bad interface that 
was being used (or an improper one), but after changing that and ensuring all 
of the fields were implemented it worked as expected.

I know in my particular case, I'm planning on writing to Kafka, however my 
wrapped function isn't a process function and thus it isn't as simple as 
supplying a side-output and sending those to Kafka. I'm guessing in this 
scenario, it'd be sufficient to have a plain Kafka producer (created within the 
open() function) and just use that as opposed to constructing a sink and 
explicitly invoking it.

```
catch (exception: Exception) {
 // I'd imagine that the context here would require a second level of 
mapping to ensure that
 // we have the proper context for the sink itself
 dlqSink.invoke(value, context)

 // Or this would be the alternative
 dlqProducer.send(..., value)
}
```

I don't know if you have the same scenario (or are leveraging Kafka), but if 
so, I'd be curious of the approach that you took.

Thanks much,

Rion

On 2021/08/03 08:45:07, Maciej Obuchowski  wrote: 
> Hey.
> 
> As far as I see, you're not overriding functions like open,
> setRuntimeContext, snapshotState, initializeState - the calls needs to
> be passed to the inner sink function.
> 
> pon., 2 sie 2021 o 19:31 Rion Williams  napisał(a):
> >
> > Hi again Maciek (and all),
> >
> > I just recently returned to start investigating this approach, however I 
> > can't seem to get the underlying invocation to work as I would normally 
> > expect. I'll try to share a bit more as what I currently have and perhaps 
> > I'm just missing something minor that someone may be able to spot.
> >
> > To reiterate - what I'm attempting to do is take a stream of events flowing 
> > through, specific types of entities are extracted from these events into 
> > multiple side-outputs, and these side-outputs are passed to a sync that 
> > will write them via JDBC using logic specific to that entity. What I am 
> > aiming to achieve is being able to capture a single record that may be 
> > problematic and avoid a poison pill to throw onto a dead-letter queue 
> > (Kafka). I understand this would mean limiting batching sizes to a single 
> > record, however I'm assuming that the connections themselves could be 
> > pooled possibly to avoid opening up a new connection per call. If this 
> > isn't the case, is there a way to handle that (or would I need to implement 
> > my own sync).
> >
> > ```
> > val users = Tags.users
> > parsedChangelogs
> > .getSideOutput(users)
> > .addSink(PostgresSink.fromEntityType(users.typeInfo, 
> > parameters))
> > .uid("sync-${users.id}-to-postgres")
> > .name("sync-${users.id}-to-postgres")
> >
> > val addresses = Tags.addresses
> > parsedChangelogs
> > .getSideOutput(addresses)
> > .addSink(PostgresSink.fromEntityType(addresses.typeInfo, 
> > parameters))
> > .uid("sync-${addresses.id}-to-postgres")
> > .name("sync-${addresses.id}-to-postgres")
> > ```
> >
> > And the dynamic sink (that would associate a given entity to the necessary 
> > calls made to the database) looks a bit like this:
> >
> > ```
> > fun  fromEntityType(typeInfo: TypeInformation, parameters: 
> > ParameterTool): SinkFunction {
> > val metadata = getQueryMetadataFromType(typeInfo)
> >
> > return JdbcSink
> > .sink(
> > metadata.query,
> > metadata.statement,
> > getJdbcExecutionOptions(parameters),
> > JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
> > .withDriverName("org.postgresql.Driver")
> > .withUrl(buildConnectionString(parameters))
> > .build(),
> > )
> > }
> > ```
> >
> > I've tried several, a naive wrapper approach that I attempted looked 
> > something like this:
> >
> > ```
> > class DlqWrapper(private val sink: SinkFunction, val parameters: 
> > ParameterTool): SinkFunction {
> > private val logger = LoggerFactory.getLogger(DlqSink::class.java)
> > private val dlqSink: SinkFunction = ...
> >
> > override fun invoke(value: T, context: SinkFunction.Context) {
> > try {
> > sink.invoke(value, context)
> > }
> > catch (ex: Exception) {
> > logger.erro

Re: Dead Letter Queue for JdbcSink

2021-08-02 Thread Rion Williams
Hi again Maciek (and all),

I just recently returned to start investigating this approach, however I can't 
seem to get the underlying invocation to work as I would normally expect. I'll 
try to share a bit more as what I currently have and perhaps I'm just missing 
something minor that someone may be able to spot.

To reiterate - what I'm attempting to do is take a stream of events flowing 
through, specific types of entities are extracted from these events into 
multiple side-outputs, and these side-outputs are passed to a sync that will 
write them via JDBC using logic specific to that entity. What I am aiming to 
achieve is being able to capture a single record that may be problematic and 
avoid a poison pill to throw onto a dead-letter queue (Kafka). I understand 
this would mean limiting batching sizes to a single record, however I'm 
assuming that the connections themselves could be pooled possibly to avoid 
opening up a new connection per call. If this isn't the case, is there a way to 
handle that (or would I need to implement my own sync).

```
val users = Tags.users
parsedChangelogs
.getSideOutput(users)
.addSink(PostgresSink.fromEntityType(users.typeInfo, parameters))
.uid("sync-${users.id}-to-postgres")
.name("sync-${users.id}-to-postgres")

val addresses = Tags.addresses
parsedChangelogs
.getSideOutput(addresses)
.addSink(PostgresSink.fromEntityType(addresses.typeInfo, 
parameters))
.uid("sync-${addresses.id}-to-postgres")
.name("sync-${addresses.id}-to-postgres")
```

And the dynamic sink (that would associate a given entity to the necessary 
calls made to the database) looks a bit like this:

```
fun  fromEntityType(typeInfo: TypeInformation, parameters: 
ParameterTool): SinkFunction {
val metadata = getQueryMetadataFromType(typeInfo)

return JdbcSink
.sink(
metadata.query,
metadata.statement,
getJdbcExecutionOptions(parameters),
JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withDriverName("org.postgresql.Driver")
.withUrl(buildConnectionString(parameters))
.build(),
)
}
```

I've tried several, a naive wrapper approach that I attempted looked something 
like this:

```
class DlqWrapper(private val sink: SinkFunction, val parameters: 
ParameterTool): SinkFunction {
private val logger = LoggerFactory.getLogger(DlqSink::class.java)
private val dlqSink: SinkFunction = ...

override fun invoke(value: T, context: SinkFunction.Context) {
try {
sink.invoke(value, context)
}
catch (ex: Exception) {
logger.error("Encountered sink exception. Sending message to dead 
letter queue. Value: $value. Exception: ${ex.message}")
val payload = Gson().toJsonTree(value).asJsonObject
payload.addProperty("exception", ex.message)

dlqSink.invoke("$payload", context)
}
}
}
```

After doing this, it doesn't look like when the invoke calls are made that it's 
actually attempting to perform the JDBC calls to insert the records into those 
sources. I'm not entirely sure if this is related specifically for how the 
JdbcSink is wrapped (via the GenericJdbcSink, etc.).

I had seen several posts around involving the use of an 
InvocationHandler/Proxy, etc. but I'm not sure if that should be necessary for 
handling this type of functionality. Any ideas/thoughts/examples would be 
greatly appreciated.

Thanks,

Rion

On 2021/07/14 15:47:18, Maciej Bryński  wrote: 
> This is the idea.
> Of course you need to wrap more functions like: open, close,
> notifyCheckpointComplete, snapshotState, initializeState and
> setRuntimeContext.
> 
> The problem is that if you want to catch problematic record you need
> to set batch size to 1, which gives very bad performance.
> 
> Regards,
> Maciek
> 
> śr., 14 lip 2021 o 17:31 Rion Williams  napisał(a):
> >
> > Hi Maciej,
> >
> > Thanks for the quick response. I wasn't aware of the idea of using a 
> > SinkWrapper, but I'm not quite certain that it would suit this specific use 
> > case (as a SinkFunction / RichSinkFunction doesn't appear to support 
> > side-outputs). Essentially, what I'd hope to accomplish would be to pick up 
> > when a bad record could not be written to the sink and then offload that 
> > via a side-output somewhere else.
> >
> > Something like this, which is a very, very naive idea:
> >
> > class PostgresSinkWrapper(private val sink: SinkFunction): 
> > RichSinkFunction() {
> > private val logger = 
> > LoggerFactory.getLogger(PostgresSin

Re: Dead Letter Queue for JdbcSink

2021-07-14 Thread Rion Williams
Hi Maciej,

Thanks for the quick response. I wasn't aware of the idea of using a
SinkWrapper, but I'm not quite certain that it would suit this specific use
case (as a SinkFunction / RichSinkFunction doesn't appear to support
side-outputs). Essentially, what I'd hope to accomplish would be to pick up
when a bad record could not be written to the sink and then offload that
via a side-output somewhere else.

Something like this, which is a very, very naive idea:

class PostgresSinkWrapper(private val sink: SinkFunction):
RichSinkFunction() {
private val logger =
LoggerFactory.getLogger(PostgresSinkWrapper::class.java)

override fun invoke(value: T, context: SinkFunction.Context) {
try {
sink.invoke(value, context)
}
catch (exception: Exception){
logger.error("Encountered a bad record, offloading to
dead-letter-queue")
// Offload bad record to DLQ
}
}
}

But I think that's basically the gist of it. I'm just not sure how I could
go about doing this aside from perhaps writing a custom process function
that wraps another sink function (or just completely rewriting my own
JdbcSink?)

Thanks,

Rion





On Wed, Jul 14, 2021 at 9:56 AM Maciej Bryński  wrote:

> Hi Rion,
> We have implemented such a solution with Sink Wrapper.
>
>
> Regards,
> Maciek
>
> śr., 14 lip 2021 o 16:21 Rion Williams  napisał(a):
> >
> > Hi all,
> >
> > Recently I've been encountering an issue where some external
> dependencies or process causes writes within my JDBCSink to fail (e.g.
> something is being inserted with an explicit constraint that never made
> it's way there). I'm trying to see if there's a pattern or recommendation
> for handling this similar to a dead-letter queue.
> >
> > Basically - if I experience a given number of failures (> max retry
> attempts) when writing to my JDBC destination, I'd like to take the record
> that was attempted and throw it into a Kafka topic or some other
> destination so that it can be evaluated at a later time.
> >
> > Are there any well defined patterns or recommended approaches around
> this?
> >
> > Thanks,
> >
> > Rion
>
>
>
> --
> Maciek Bryński
>


Dead Letter Queue for JdbcSink

2021-07-14 Thread Rion Williams
Hi all,

Recently I've been encountering an issue where some external dependencies
or process causes writes within my JDBCSink to fail (e.g. something is
being inserted with an explicit constraint that never made it's way there).
I'm trying to see if there's a pattern or recommendation for handling this
similar to a dead-letter queue.

Basically - if I experience a given number of failures (> max retry
attempts) when writing to my JDBC destination, I'd like to take the record
that was attempted and throw it into a Kafka topic or some other
destination so that it can be evaluated at a later time.

Are there any well defined patterns or recommended approaches around this?

Thanks,

Rion


Handling Large Broadcast States

2021-06-16 Thread Rion Williams
Hey Flink folks,

I was discussing the use of the Broadcast Pattern with some colleagues today 
for a potential enrichment use-case and noticed that it wasn’t currently backed 
by RocksDB. This seems to indicate that it would be solely limited to the 
memory allocated, which might not support a large enrichment data set that our 
use case might run into (thousands of tenants with users and various other 
entities to enrich by).

Are there any plans to eventually add support for BroadcastState to be backed 
by a non-memory source? Or perhaps some technical limitations that might not 
make that possible? If the latter is true, is there a preferred pattern for 
handling enrichment/lookups for a very large set of data that may not be 
memory-bound?

Any advice or thoughts would be welcome!

Rion

Guidance for Integration Tests with External Technologies

2021-05-18 Thread Rion Williams
Hey all,

I’ve been taking a very TDD-oriented approach to developing many of the Flink 
apps I’ve worked on, but recently I’ve encountered a problem that has me 
scratching my head.

A majority of my integration tests leverage a few external technologies such as 
Kafka and typically a relational database like Postgres. I’ve found 
in-memory/embedded versions of these that have worked well in the past to allow 
me to:

- send messages into a kafka topic
- run my exact Flink job asynchronously 
- verify my results / assertions in Postgres via awaitility

Recently, I had a use case for Broadcast state for a job and found that my 
tests would run successfully when executed directly but multiple tests run in 
sequence (in the same file), it seems that Flink would fail to consume from the 
topics and eventually fail the assertion. 

I’ve tried several approaches including:
- ensuring that each Flink job is passed a unique consumer.id / group.id / 
application.id
- ensuring each test has brand new Kafka topics specific for it
- spinning up a new Flink cluster / Kafka cluster / Postgres instance per test

I’m not entirely sure what could be causing the problem but it only occurs for 
Flink jobs that read from two topics and leverage broadcast state. All other 
integration tests that use Kafka/Flink/Postgres still pass and can be run in 
sequence.

Any advice / examples / recommendations would be helpful. l’d be happy to 
elaborate and provide code whenever possible as well.

Thanks,

Rion



Re: Handling "Global" Updating State

2021-05-17 Thread Rion Williams
Hi Yun,

That’s very helpful and good to know that the problem/use-case has been thought 
about. Since my need is probably shorter-term than later, I’ll likely need to 
explore a workaround.

Do you know of an approach that might not require the use of check pointing and 
restarting? I was looking into exploring initializeState within my 
broadcast-side stream to get it current and then simply listening to the Kafka 
topic as records come in. I’d imagine this would work, but that may be a bit of 
a naive approach.

Thanks!

Rion 

> On May 17, 2021, at 1:36 AM, Yun Gao  wrote:
> 
> 
> Hi Rion, 
> 
> I think FLIP-150[1] should be able to solve this scenario.
> 
> Since FLIP-150 is still under discussion, for now a temporary method come 
> to me might be
> 1. Write a first job to read the kafka and update the broadcast state of some 
> operator. The job
> would keep the source alive after all the data are emit (like sleep forever), 
> and when all the data 
> are processed, then stop the job with savepoint. 
> 2. Use the savepoint to start the original job. For the operator required the 
> broadcast state, it could
> set the same uid and same state name with the corresponding operator in the 
> first job, so it could
> acqure the state content on startup.
> 
> Yun,
> Best
> 
> [1] 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source
> 
> --Original Mail --
> Sender:Rion Williams 
> Send Date:Mon May 17 07:00:03 2021
> Recipients:user 
> Subject:Re: Handling "Global" Updating State
>> Hey folks,
>> 
>> After digging into this a bit it does seem like Broadcast State would fit 
>> the bill for this scenario and keeping the downstream operators up-to-date 
>> as messages arrived in my Kafka topic.
>> 
>> My question is - is there a pattern for pre-populating the state initially? 
>> In my case, I need to have loaded all of my “lookup” topic into state before 
>> processing any records in the other stream.
>> 
>> My thought initially is to do something like this, if it’s possible:
>> 
>> - Create a KafkaConsumer on startup to read the lookup topic in its entirety 
>> into some collection like a hashmap (prior to executing the Flink pipeline 
>> to ensure synchronicity)
>> - Use this to initialize the state of my broadcast stream (if possible)
>> - At this point that stream would be broadcasting any new records coming in, 
>> so I “should” stay up to date at that point.
>> 
>> Is this an oversimplification or is there an obviously better / well known 
>> approach to handling this?
>> 
>> Thanks,
>> 
>> Rion
>> 
>> On May 14, 2021, at 9:51 AM, Rion Williams  wrote:
>> 
>> 
>> Hi all,
>> 
>> I've encountered a challenge within a Flink job that I'm currently working 
>> on. The gist of it is that I have a job that listens to a series of events 
>> from a Kafka topic and eventually sinks those down into Postgres via the 
>> JDBCSink.
>> 
>> A requirement recently came up for the need to filter these events based on 
>> some configurations that are currently being stored within another Kafka 
>> topic. I'm wondering what the best approach might be to handle this type of 
>> problem.
>> 
>> My initial naive approach was:
>> When Flink starts up, use a regular Kafka Consumer and read all of the 
>> configuration data from that topic in its entirety.
>> Store the messages from that topic in some type of thread-safe collection 
>> statically accessible by the operators downstream.
>> Expose the thread-safe collection within the operators to actually perform 
>> the filtering.
>> This doesn't seem right though. I was reading about BroadcastState which 
>> seems like it might fit the bill (e.g. keep those mappings in Broadcast 
>> state so that all of the downstream operations would have access to them, 
>> which I'd imagine would handle keeping things up to date). 
>> 
>> Does Flink have a good pattern / construct to handle this? Basically, I have 
>> a series of mappings that I want to keep relatively up to date in a Kafka 
>> topic, and I'm consuming from another Kafka topic that will need those 
>> mappings to filter against.
>> 
>> I'd be happy to share some of the approaches I currently have or elaborate a 
>> bit more if that isn't super clear.
>> 
>> Thanks much,
>> 
>> Rion
>> 


Re: Handling "Global" Updating State

2021-05-16 Thread Rion Williams
Hey folks,

After digging into this a bit it does seem like Broadcast State would fit the 
bill for this scenario and keeping the downstream operators up-to-date as 
messages arrived in my Kafka topic.

My question is - is there a pattern for pre-populating the state initially? In 
my case, I need to have loaded all of my “lookup” topic into state before 
processing any records in the other stream.

My thought initially is to do something like this, if it’s possible:

- Create a KafkaConsumer on startup to read the lookup topic in its entirety 
into some collection like a hashmap (prior to executing the Flink pipeline to 
ensure synchronicity)
- Use this to initialize the state of my broadcast stream (if possible)
- At this point that stream would be broadcasting any new records coming in, so 
I “should” stay up to date at that point.

Is this an oversimplification or is there an obviously better / well known 
approach to handling this?

Thanks,

Rion

> On May 14, 2021, at 9:51 AM, Rion Williams  wrote:
> 
> 
> Hi all,
> 
> I've encountered a challenge within a Flink job that I'm currently working 
> on. The gist of it is that I have a job that listens to a series of events 
> from a Kafka topic and eventually sinks those down into Postgres via the 
> JDBCSink.
> 
> A requirement recently came up for the need to filter these events based on 
> some configurations that are currently being stored within another Kafka 
> topic. I'm wondering what the best approach might be to handle this type of 
> problem.
> 
> My initial naive approach was:
> When Flink starts up, use a regular Kafka Consumer and read all of the 
> configuration data from that topic in its entirety.
> Store the messages from that topic in some type of thread-safe collection 
> statically accessible by the operators downstream.
> Expose the thread-safe collection within the operators to actually perform 
> the filtering.
> This doesn't seem right though. I was reading about BroadcastState which 
> seems like it might fit the bill (e.g. keep those mappings in Broadcast state 
> so that all of the downstream operations would have access to them, which I'd 
> imagine would handle keeping things up to date). 
> 
> Does Flink have a good pattern / construct to handle this? Basically, I have 
> a series of mappings that I want to keep relatively up to date in a Kafka 
> topic, and I'm consuming from another Kafka topic that will need those 
> mappings to filter against.
> 
> I'd be happy to share some of the approaches I currently have or elaborate a 
> bit more if that isn't super clear.
> 
> Thanks much,
> 
> Rion
> 


Handling "Global" Updating State

2021-05-14 Thread Rion Williams
Hi all,

I've encountered a challenge within a Flink job that I'm currently working
on. The gist of it is that I have a job that listens to a series of events
from a Kafka topic and eventually sinks those down into Postgres via the
JDBCSink.

A requirement recently came up for the need to filter these events based on
some configurations that are currently being stored within another Kafka
topic. I'm wondering what the best approach might be to handle this type of
problem.

My initial naive approach was:

   - When Flink starts up, use a regular Kafka Consumer and read all of the
   configuration data from that topic in its entirety.
   - Store the messages from that topic in some type of thread-safe
   collection statically accessible by the operators downstream.
   - Expose the thread-safe collection within the operators to actually
   perform the filtering.

This doesn't seem right though. I was reading about BroadcastState which
seems like it might fit the bill (e.g. keep those mappings in Broadcast
state so that all of the downstream operations would have access to them,
which I'd imagine would handle keeping things up to date).

Does Flink have a good pattern / construct to handle this? *Basically, I
have a series of mappings that I want to keep relatively up to date in a
Kafka topic, and I'm consuming from another Kafka topic that will need
those mappings to filter against.*

I'd be happy to share some of the approaches I currently have or elaborate
a bit more if that isn't super clear.

Thanks much,

Rion


Capturing Statement Execution / Results within JdbcSink

2021-03-19 Thread Rion Williams
Hey all,

I've been working with JdbcSink and it's really made my life much easier,
but I had two questions about it that folks might be able to answer or
provide some clarity around.

*Accessing Statement Execution / Results*

Is there any mechanism in place (or out of the box) to support reading the
results of statements executed by the JdbcSink or would I need to implement
my own to support it?

The problem that I'm trying to solve relates to observability (i.e.
metrics) and incrementing specific counters based on the response from a
given statement executing. One example might be if I need to upsert 40
widgets that are coming in, although some may be the same widget, I only
want to increment my metric if the widget didn't already exist, which I
could get via the response from the underlying queries.

*Batching Mechanisms (withBatchIntervalMs & withBatchSize)*

This was another great feature that I was happy to see since I didn't want
to handle writing my own windowing logic for something as trivial as this.
I noticed some odd behaviors when I attempted to implement this being
driven by configuration:

private fun getJdbcExecutionOptions(parameters: ParameterTool):
JdbcExecutionOptions {
var executionOptions = JdbcExecutionOptions.builder()
if (parameters.getBoolean("database.batching.enabled", false)){
if (parameters.has("database.batching.ms")){
val batchingIntervalMs = parameters.getLong("database.batching.ms")
executionOptions = executionOptions
.withBatchIntervalMs(batchingIntervalMs)
}

if (parameters.has("database.batching.records")){
val batchingRecords = parameters.getInt("database.batching.records")
executionOptions = executionOptions
.withBatchSize(batchingRecords)
}
}

return executionOptions.build()
}

With the settings of 6 (batchIntervalMs) and 100 (batchSize), it was
around 7-8 minutes prior to a write to the destination taking place,
however when previously just using the batchIntervalMs configuration, I'd
see it consistently write out one a minute.

I was looking through the source and it seems the time-based emissions are
scheduled asynchronously. I may have missed something, but I didn't
explicitly see something where a records-based emission would affect the
scheduled emission.

I'm just trying to get confirmation if these work together as an OR
operation (i.e. flush the pending records once a given number of records
have been seen or once a time interval has elasped).

Thanks so much, you folks have been an incredible community in my short
time here and I've enjoyed working with Flink, contributing, and I hope to
continue to do much more!

Rion


Re: Unit Testing for Custom Metrics in Flink

2021-03-16 Thread Rion Williams
I've made a handful of tweaks to it to try and get them to pick up as
expected (i.e. adding logging to every available overload for the
interceptors, etc) using something similar to the following:

fun create(): InterceptingTaskMetricGroup {
val operatorGroup = object: InterceptingOperatorMetricGroup() {
override fun addGroup(name: Int): MetricGroup {
// Include logging here...
}

override fun addGroup(name: String?): MetricGroup {
// Include logging here...
}

// Repeat ad nauseum
}

return object: InterceptingTaskMetricGroup() {
override fun getOrAddOperator(id: OperatorID, name: String):
OperatorMetricGroup {
return operatorGroup
}
}
}

It still looks like it's only ever registering the built-in metrics and not
hitting any of those for the TestHarness execution. I've even included a
simple test metric for the function during the open() call to ensure that
it wasn't some other unrelated issue for something happening in the
processFunction() calls / dynamic metrics.

Said differently - I can see the logs being hit in the
InterceptingOperatorMetricGroup.addGroup()
calls, but only for the internal metrics from the Task/JobManagers
respectively, nothing custom.

Rion


On Tue, Mar 16, 2021 at 11:00 AM Chesnay Schepler 
wrote:

> Actually you'd have to further subclass the operatorMetricGroup such that
> addGroup works as expected.
> This is admittedly a bit of a drag :/
>
> On 3/16/2021 4:35 PM, Chesnay Schepler wrote:
>
> The test harness is fully independent of the MiniClusterResource; it isn't
> actually running a job. That's why your metrics never arrive at the
> reporter.
>
> You can either:
> a) use the test harness with a custom MetricGroup implementation that
> intercepts registered metrics, set in the MockEnvironment
> b) use the function as part of a job with the custom reporter approach.
> (essentially, fromElements -> function -> discarding sink)
>
> The following would work for a), but it must be noted that this relies on
> quite a few things that are internal to Flink:
>
> ...
>
> InterceptingOperatorMetricGroup operatorMetricGroup =
> new InterceptingOperatorMetricGroup();InterceptingTaskMetricGroup 
> taskMetricGroup =
> new InterceptingTaskMetricGroup() {
> @Overridepublic OperatorMetricGroup 
> getOrAddOperator(OperatorID id, String name) {
> return operatorMetricGroup;}
> };new MockEnvironmentBuilder()
> .setMetricGroup(taskMetricGroup)
>
> ...
>
>
> On 3/16/2021 3:42 PM, Rion Williams wrote:
>
> In this case, I was using a harness to test the function. Although, I
> could honestly care less about the unit-test surrounding metrics, I'm much
> more concerned with having something that will actually run and work as
> intended within a job. The only real concern I have or problem that I want
> to solve is building metrics that may vary based on the data coming in from
> a "label" perspective (e.g. keeping track of the events I've seen for a
> given tenant, or some other properties).
>
> Something like:
>
> _events_seen { tenant = "tenant-1" } 1.0
> _events_seen { tenant = "tenant-2" } 200.0
>
> If that makes sense. I've used the Prometheus client previously to
> accomplish these types of metrics, but since I'm fairly new to the Flink
> world, I was trying to use the built-in constructs available (thus the
> dynamic groups / metrics being added).
>
> On Tue, Mar 16, 2021 at 9:36 AM Chesnay Schepler 
> wrote:
>
>> Are you actually running a job, or are you using a harness for testing
>> your function?
>>
>> On 3/16/2021 3:24 PM, Rion Williams wrote:
>>
>> Hi Chesnay,
>>
>> Thanks for the prompt response and feedback, it's very much appreciated.
>> Please see the inline responses below to your questions:
>>
>> *Was there anything in the logs (ideally on debug)?*
>>
>>
>> I didn't see anything within the logs that seemed to indicate anything
>> out of the ordinary. I'm currently using a MiniClusterResources for this
>> and attempted to set the logging levels to pick up everything (i.e. ALL),
>> but if there's a way to expose more, I'm not aware of it.
>>
>> *Have you debugged the execution and followed the counter() calls all the
>>> way to the reporter?*
>>
>>
>> With the debugger, I traced one of the counter initializations and it
>> seems that no reporters were being found within the register call in the
>> MetricsRegistryImpl (i.e. this.reporters has no registered reporters):
>>
>> if (this.reporters != null) {
&g

Re: Unit Testing for Custom Metrics in Flink

2021-03-16 Thread Rion Williams
In this case, I was using a harness to test the function. Although, I could
honestly care less about the unit-test surrounding metrics, I'm much more
concerned with having something that will actually run and work as intended
within a job. The only real concern I have or problem that I want to solve
is building metrics that may vary based on the data coming in from a
"label" perspective (e.g. keeping track of the events I've seen for a given
tenant, or some other properties).

Something like:

_events_seen { tenant = "tenant-1" } 1.0
_events_seen { tenant = "tenant-2" } 200.0

If that makes sense. I've used the Prometheus client previously to
accomplish these types of metrics, but since I'm fairly new to the Flink
world, I was trying to use the built-in constructs available (thus the
dynamic groups / metrics being added).

On Tue, Mar 16, 2021 at 9:36 AM Chesnay Schepler  wrote:

> Are you actually running a job, or are you using a harness for testing
> your function?
>
> On 3/16/2021 3:24 PM, Rion Williams wrote:
>
> Hi Chesnay,
>
> Thanks for the prompt response and feedback, it's very much appreciated.
> Please see the inline responses below to your questions:
>
> *Was there anything in the logs (ideally on debug)?*
>
>
> I didn't see anything within the logs that seemed to indicate anything out
> of the ordinary. I'm currently using a MiniClusterResources for this and
> attempted to set the logging levels to pick up everything (i.e. ALL), but
> if there's a way to expose more, I'm not aware of it.
>
> *Have you debugged the execution and followed the counter() calls all the
>> way to the reporter?*
>
>
> With the debugger, I traced one of the counter initializations and it
> seems that no reporters were being found within the register call in the
> MetricsRegistryImpl (i.e. this.reporters has no registered reporters):
>
> if (this.reporters != null) {
> for(int i = 0; i < this.reporters.size(); ++i) {
> MetricRegistryImpl.ReporterAndSettings reporterAndSettings = 
> (MetricRegistryImpl.ReporterAndSettings)this.reporters.get(i);
>
> try {
> if (reporterAndSettings != null) {
> FrontMetricGroup front = new 
> FrontMetricGroup(reporterAndSettings.getSettings(), group);
> reporterAndSettings.getReporter().notifyOfAddedMetric(metric, 
> metricName, front);
> }
> } catch (Exception var11) {
> LOG.warn("Error while registering metric: {}.", metricName, 
> var11);
> }
> }
> }
>
>  Perhaps this is an error on my part as I had assumed the following would
> be sufficient to register my reporter (within a local / minicluster
> environment):
>
> private val metricsConfiguration = Configuration.fromMap(mutableMapOf(
> ConfigConstants.METRICS_REPORTER_PREFIX +
> "MockCustomMetricsReporter." +
> ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX to 
> MockCustomMetricsReporter::class.java.name))
> @ClassRule@JvmFieldval flink = MiniClusterResource(
> MiniClusterResourceConfiguration.Builder()
> .setConfiguration(metricsConfiguration)
> .setNumberTaskManagers(1)
> .setNumberSlotsPerTaskManager(1)
> .build()
> )
>
> However, it's clearly being recognized for the built-in metrics, just not
> these custom ones that are being registered as they are triggering the
> notifyOfAddedMetric() function within the reporter itself.
>
> *Do you only see JobManager metrics, or is there somewhere also something
>> about the TaskManager?*
>
>
> It looks like there are metrics coming from both the JobManager and
> TaskManagers from the following examples that were coming out:
>
> localhost.jobmanager.numRegisteredTaskManagers
> .taskmanager.ad5aaade-cd54-44f4-a099-a765b53f79b4.Status.Shuffle.Netty.UsedMemorySegments
> .taskmanager.ad5aaade-cd54-44f4-a099-a765b53f79b4.Status.JVM.Memory.Metaspace.Committed
> localhost.jobmanager.Status.JVM.Memory.Direct.Count
>
> I do agree that a factory implementation with a static reporter would
> likely be a better approach, so I may explore that a bit more. As well as
> adding some changes to the existing, albeit ghetto, implementation for
> handling the dynamic metrics. I did see several references to a
> MetricRegistry class, however I wasn't sure if that was the most
> appropriate place to add this type of functionality or if it was needed at
> all.
>
> Thanks much,
>
> Rion
>
>
>
> On Tue, Mar 16, 2021 at 4:45 AM Chesnay Schepler 
> wrote:
>
>> Was there anything in the logs (ideally on debug)?
>> Have you debugged the execution and followed the counter() calls all the
>> way to th

Re: Unit Testing for Custom Metrics in Flink

2021-03-16 Thread Rion Williams
Hi Chesnay,

Thanks for the prompt response and feedback, it's very much appreciated.
Please see the inline responses below to your questions:

*Was there anything in the logs (ideally on debug)?*


I didn't see anything within the logs that seemed to indicate anything out
of the ordinary. I'm currently using a MiniClusterResources for this and
attempted to set the logging levels to pick up everything (i.e. ALL), but
if there's a way to expose more, I'm not aware of it.

*Have you debugged the execution and followed the counter() calls all the
> way to the reporter?*


With the debugger, I traced one of the counter initializations and it seems
that no reporters were being found within the register call in the
MetricsRegistryImpl (i.e. this.reporters has no registered reporters):

if (this.reporters != null) {
for(int i = 0; i < this.reporters.size(); ++i) {
MetricRegistryImpl.ReporterAndSettings reporterAndSettings =
(MetricRegistryImpl.ReporterAndSettings)this.reporters.get(i);

try {
if (reporterAndSettings != null) {
FrontMetricGroup front = new
FrontMetricGroup(reporterAndSettings.getSettings(), group);

reporterAndSettings.getReporter().notifyOfAddedMetric(metric,
metricName, front);
}
} catch (Exception var11) {
LOG.warn("Error while registering metric: {}.", metricName, var11);
}
}
}

 Perhaps this is an error on my part as I had assumed the following would
be sufficient to register my reporter (within a local / minicluster
environment):

private val metricsConfiguration = Configuration.fromMap(mutableMapOf(
ConfigConstants.METRICS_REPORTER_PREFIX +
"MockCustomMetricsReporter." +
ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX to
MockCustomMetricsReporter::class.java.name
))

@ClassRule
@JvmField
val flink = MiniClusterResource(
MiniClusterResourceConfiguration.Builder()
.setConfiguration(metricsConfiguration)
.setNumberTaskManagers(1)
.setNumberSlotsPerTaskManager(1)
.build()
)

However, it's clearly being recognized for the built-in metrics, just not
these custom ones that are being registered as they are triggering the
notifyOfAddedMetric() function within the reporter itself.

*Do you only see JobManager metrics, or is there somewhere also something
> about the TaskManager?*


It looks like there are metrics coming from both the JobManager and
TaskManagers from the following examples that were coming out:

localhost.jobmanager.numRegisteredTaskManagers
.taskmanager.ad5aaade-cd54-44f4-a099-a765b53f79b4.Status.Shuffle.Netty.UsedMemorySegments
.taskmanager.ad5aaade-cd54-44f4-a099-a765b53f79b4.Status.JVM.Memory.Metaspace.Committed
localhost.jobmanager.Status.JVM.Memory.Direct.Count

I do agree that a factory implementation with a static reporter would
likely be a better approach, so I may explore that a bit more. As well as
adding some changes to the existing, albeit ghetto, implementation for
handling the dynamic metrics. I did see several references to a
MetricRegistry class, however I wasn't sure if that was the most
appropriate place to add this type of functionality or if it was needed at
all.

Thanks much,

Rion



On Tue, Mar 16, 2021 at 4:45 AM Chesnay Schepler  wrote:

> Was there anything in the logs (ideally on debug)?
> Have you debugged the execution and followed the counter() calls all the
> way to the reporter?
> Do you only see JobManager metrics, or is there somewhere also something
> about the TaskManager?
>
> I can see several issues with your code, but none that would fully explain
> the issue:
>
> a) your reporter is not thread-safe
> b) you only differentiate metrics by name, which will lead to quite a few
> collisions.
>
> Be also aware that there will be 2 reporter instances; one for the JM and
> one for the TM.
> To remedy this, I would recommend creating a factory that returns a static
> reporter instance instead; overall this tends to be cleaner.
>
> Alternatively, when using the testing harnesses IIRC you can also set set
> a custom MetricGroup implementation.
>
> On 3/16/2021 4:13 AM, Rion Williams wrote:
>
> Hi all,
>
> Recently, I was working on adding some custom metrics to a Flink job that
> required the use of dynamic labels (i.e. capturing various counters that
> were "slicable" by things like tenant / source, etc.).
>
> I ended up handling it in a very naive fashion that would just keep a
> dictionary of metrics that had already been registered and update them
> accordingly which looked something like this:
>
> class MyCustomProcessFunction: ProcessFunction() {
> private lateinit var metrics: CustomMetricsRegistryoverride fun 
> open(parameters: Configuration) {
> metrics = CustomMetricsRegistry(runtimeContext.metricGroup)
> }
>
> override f

Unit Testing for Custom Metrics in Flink

2021-03-15 Thread Rion Williams
Hi all,

Recently, I was working on adding some custom metrics to a Flink job that
required the use of dynamic labels (i.e. capturing various counters that
were "slicable" by things like tenant / source, etc.).

I ended up handling it in a very naive fashion that would just keep a
dictionary of metrics that had already been registered and update them
accordingly which looked something like this:

class MyCustomProcessFunction: ProcessFunction() {
private lateinit var metrics: CustomMetricsRegistry

override fun open(parameters: Configuration) {
metrics = CustomMetricsRegistry(runtimeContext.metricGroup)
}

override fun processElement(event: Event, context: Context,
collector: Collector) {
// Insert calls like metrics.inc("tenant-name", 4) here
}
}

class CustomMetricsRegistry(private val metricGroup: MetricGroup):
Serializable {
// Increments a given metric by key
fun inc(metric: String, tenant: String, amount: Long = 1) {
// Store a key for the metric
val key = "$metric-$tenant"
// Store/register the metric
if (!registeredMetrics.containsKey(key)){
registeredMetrics[key] = metricGroup
.addGroup("tenant", tenant)
.counter(metric)
}

// Update the metric by a given amount
registeredMetrics[key]!!.inc(amount)
}

companion object {
private var registeredMetrics: HashMap = hashMapOf()
}
}

Basically registering and updating new metrics for tenants as they are
encountered, which I've seen being emitted as expected via hitting the
appropriately configured metrics endpoint (using a PrometheusReporter).

However, while I was trying to write a few unit tests for this, I seemed to
encounter an issue. I was following a Stack Overflow post that was answered
by @Chesnay Schepler  [0] that described the use of an
in-memory/embedded Flink cluster and a custom reporter that would
statically expose the underlying metrics.

So I took a shot at implementing something similar as follows:

*Flink Cluster Definition*

private val metricsConfiguration = Configuration.fromMap(mutableMapOf(
ConfigConstants.METRICS_REPORTER_PREFIX +
"MockCustomMetricsReporter." +
ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX to
MockCustomMetricsReporter::class.java.name
))

@ClassRule
@JvmField
val flinkCluster = MiniClusterResource(
MiniClusterResourceConfiguration.Builder()
.setConfiguration(metricsConfiguration)
.setNumberTaskManagers(1)
.setNumberSlotsPerTaskManager(1)
.build()
)

*Custom Reporter*

class MockCustomMetricsReporter : MetricReporter {

override fun open(metricConfig: MetricConfig) {}
override fun close() {}
override fun notifyOfAddedMetric(metric: Metric, name: String,
metricGroup: MetricGroup) {
// Store the metrics that are being registered as we see them
if (!registeredCustomMetrics.containsKey(name)){
registeredCustomMetrics[name] = metric
}
}

override fun notifyOfRemovedMetric(metric: Metric, name: String,
metricGroup: MetricGroup) {
// Do nothing here
}

companion object {
// Static reference to metrics as they are registered
var registeredCustomMetrics = HashMap()
}
}

*Example Test*

@Test
fun `Example Metrics Use Case`(){
// Arrange
val stream = StreamExecutionEnvironment.getExecutionEnvironment()
val events = listOf(
eventWithUsers("tenant1", "us...@testing.com"),
eventWithUsers("tenant2", "us...@testing.com"),
)

// Act
stream
.fromCollection(events)
.process(MyCustomProcessFunction())

// Assert
stream.execute()
assertTrue(MockCustomMetricsReporter.registeredCustomMetrics.size > 0)
}

While this test will pass, *the problem is that the custom metrics defined
dynamically (via the CustomMetricsRegistry implementation) do not appear
within the registeredCustomMetrics collection*. In fact, there are 21
metrics that get registered but all of them appear to be classic
out-of-the-box metrics such as CPU usage, number of task managers, load,
various other Netty and JVM stats, but no custom metrics are included.

I've tried multiple different configurations, implementations via a custom
TestHarness, etc. but for some reason the custom metrics being defined are
never triggering the notifyOfAddedMetric function which would be
responsible for adding them to the static collection to be asserted
against.

Any ideas / guidance would be more than welcome. Perhaps a different
approach? Based off examples I've encountered, the code seems like it
should "just work".

Thanks much,

Rion

[0] :
https://stackoverflow.com/questions/51675597/how-to-unitest-gauge-metrics-in-flink


Re: Handling Bounded Sources with KafkaSource

2021-03-13 Thread Rion Williams
Following up on this issue, I realized my initial problem was that my test case 
only contained a single message to send through the pipeline. This resulted in 
the earliest offset also being the latest and things didn’t exactly work as 
expected. Once I added several other messages and sent them through, the 
pipeline appeared to run as expected.

However, the use of “bounded” seems to be fickle in terms of test cases. Since 
an experience is thrown once the bound is reached, I can typically just wrap my 
test execution within a try/catch and simply apply my assertion afterwards. 

This occasionally results in passing tests, but in others, it seems that the 
bound is reached prior to processing the messages it had seen thus far, and as 
a result yields a failing test. I don’t know if this is a bug, or intentional, 
but I’m not aware of a workaround that could “force” the pipeline to finish 
processing all of the messages from the topic once the bound is reached. I’ve 
tried sending through “flush records” to the topic, however since there are 
multiple partitions, it’s not guaranteed that the pipeline will read those 
last. 

This is purely a testing problem, as a production job would be streaming and 
unbounded, however I’d love to have a reliable integration test or a pattern 
that I could use to guarantee the processing of a finite set of data via a 
KafkaSource (I.e. send finite records to Kafka, read from topic, process all 
records, apply assertion after processing).

Any ideas/recommendations/workarounds would be greatly welcome and I’d be happy 
to share my specific code / use-cases if needed.

Thanks much,

Rion 

> On Mar 12, 2021, at 10:19 AM, Rion Williams  wrote:
> 
> 
> Hi all,
> 
> I've been using the KafkaSource API as opposed to the classic consumer and 
> things have been going well. I configured my source such that it could be 
> used in either a streaming or bounded mode, with the bounded approach 
> specifically aimed at improving testing (unit/integration).
> 
> I've noticed that when I attempt to run through a test - it seems that the 
> pipeline never acknowledges the "end" of the stream in a bounded context and 
> just runs forever and never makes it to my assert.
> 
> Does anything look glaringly wrong with how the source is being defined?
> object KafkaEventSource {
> 
> fun withParameters(parameters: ParameterTool): KafkaSource {
> val schemaRegistryUrl = parameters.getRequired("schema.registry.url")
> 
> val builder = KafkaSource.builder()
> .setBootstrapServers(parameters.getRequired("bootstrap.servers"))
> .setGroupId(parameters.getRequired("group.id"))
> .setStartingOffsets(OffsetsInitializer.earliest())
> .setProperty("schema.registry.url", schemaRegistryUrl)
> .setTopics(parameters.getRequired("topic"))
> .setDeserializer(EventDeserializer(schemaRegistryUrl))
> 
> if (parameters.getBoolean("bounded", false)) {
> builder.setBounded(OffsetsInitializer.latest())
> }
> 
> return builder.build()
> }
> }
> I can verify that the generated source has it's boundedness set properly and 
> all of the configuration options are correct.
> 
> My test itself is fairly simple and can be broken down as follows:
> Inject records into a Kafka Topic
> Initialize my Flink job using all of my testing parameters
> Apply my assertion (in this case verifying that a JdbcSink wrote to a 
> specific database)
> @Test
> fun `Example `(){
> // Arrange
> val events = getTestEvents()
> sendToKafka(events, parameters)
> 
> // Act
> EntityIdentificationJob.run(parameters)
> 
> // Assert
> val users = queryCount("SELECT * FROM users", connection)
> assertEquals(1, users)
> }
> Where my job itself is broken down further and reads from the source, 
> performs a process function into multiple side outputs and writes each of 
> them to a distinct JdbcSink based on the type:
> 
> @JvmStatic
> fun main(args: Array) {
> val parameters = loadParams(args)
> val stream = StreamExecutionEnvironment.getExecutionEnvironment()
> 
> // Read from Kafka
> val entities = stream
>.fromSource(KafkaEventSource.withParameters(parameters), 
> WatermarkStrategy.noWatermarks(), "kafka")
>.process(IdentifyEntitiesFunction())
> 
> // Write out each tag to its respective sink
> for (entityType in EntityTypes.all) {
> entities
> .getSideOutput(entityType)
> .addSink(PostgresEntitySink.withEntity(entityType.typeInfo, 
> parameters))
> }
> 
> stream.execute(parameters.ge

Handling Bounded Sources with KafkaSource

2021-03-12 Thread Rion Williams
Hi all,

I've been using the KafkaSource API as opposed to the classic consumer and
things have been going well. I configured my source such that it could be
used in either a streaming or bounded mode, with the bounded approach
specifically aimed at improving testing (unit/integration).

I've noticed that when I attempt to run through a test - it seems that the
pipeline never acknowledges the "end" of the stream in a bounded context
and just runs forever and never makes it to my assert.

Does anything look glaringly wrong with how the source is being defined?

object KafkaEventSource {

fun withParameters(parameters: ParameterTool): KafkaSource {
val schemaRegistryUrl = parameters.getRequired("schema.registry.url")

val builder = KafkaSource.builder()
.setBootstrapServers(parameters.getRequired("bootstrap.servers"))
.setGroupId(parameters.getRequired("group.id"))
.setStartingOffsets(OffsetsInitializer.earliest())
.setProperty("schema.registry.url", schemaRegistryUrl)
.setTopics(parameters.getRequired("topic"))
.setDeserializer(EventDeserializer(schemaRegistryUrl))

if (parameters.getBoolean("bounded", false)) {
builder.setBounded(OffsetsInitializer.latest())
}

return builder.build()
}
}

I can verify that the generated source has it's boundedness set properly
and all of the configuration options are correct.

My test itself is fairly simple and can be broken down as follows:

   1. Inject records into a Kafka Topic
   2. Initialize my Flink job using all of my testing parameters
   3. Apply my assertion (in this case verifying that a JdbcSink wrote to a
   specific database)

@Test
fun `Example `(){
// Arrange
val events = getTestEvents()
sendToKafka(events, parameters)

// Act
EntityIdentificationJob.run(parameters)

// Assert
val users = queryCount("SELECT * FROM users", connection)
assertEquals(1, users)
}

Where my job itself is broken down further and reads from the source,
performs a process function into multiple side outputs and writes each of
them to a distinct JdbcSink based on the type:

@JvmStatic
fun main(args: Array) {
val parameters = loadParams(args)
val stream = StreamExecutionEnvironment.getExecutionEnvironment()

// Read from Kafka
val entities = stream
   .fromSource(KafkaEventSource.withParameters(parameters),
WatermarkStrategy.noWatermarks(), "kafka")
   .process(IdentifyEntitiesFunction())

// Write out each tag to its respective sink
for (entityType in EntityTypes.all) {
entities
.getSideOutput(entityType)
.addSink(PostgresEntitySink.withEntity(entityType.typeInfo,
parameters))
}

stream.execute(parameters.getRequired("application"))
}

I can verify in the logs that my sink is being executed and writing to the
appropriate database, however the job itself never finishes. I've tried it
using a single Kafka partition as well as multiple partitions and even
commented out the logic related to writing to the database. It still just
seems to run ... forever.

Any recommendations? Perhaps there's a bad configuration or setting that
isn't being used as intended?

Thanks,

Rion


Request for Flink JIRA Access

2021-03-07 Thread Rion Williams
Hey folks,

The community here has been awesome with my recent questions about Flink, so 
I’d like to give back. I’m already a member of the ASF JIRA but I was wondering 
if I could get access to the Flink Project. 

I’ve contributed a good bit to Apache Beam in the past, but I figured that I’ll 
be using Flink for the foreseeable future that I’d like to give back where I 
can.

Additionally, if there are any low hanging fruit / tags for new contributors 
just to familiarize myself with the process, that’d be great / appreciated!

My username for JIRA is rionmonster and if there’s anything else I could 
provide, please let me know!

Thanks,

Rion

Dynamic JDBC Sink Support

2021-03-05 Thread Rion Williams
Hi all,

I’ve been playing around with a proof-of-concept application with Flink to
assist a colleague of mine. The application is fairly simple (take in a
single input and identify various attributes about it) with the goal of
outputting those to separate tables in Postgres:

object AttributeIdentificationJob {
@JvmStatic
fun main(args: Array) {
val stream = StreamExecutionEnvironment.getExecutionEnvironment()

stream
.addSource(ReadFromKafka())
.process(IdentifyAttributesFunction())
.addSink(DynamicJdbcHere())

// execute program
stream.execute("Attribute Identification")
}
}

Considering my attributes may be of varying types (all implementing an
Attribute interface), I don't know if the existing JdbcSink functionality
or some variant of it (i.e. one of the dynamic ones that I see listed)
could handle this functionality. Essentially for a given "bundle" of
records, I'd need to ensure that each respective type of attribute was
upserted into its corresponding table within a Postgres database.

Is that something that the connector can handle on it's own? Or would I
need to implement my own RichSinkFunction> that could
handle opening a connection to Postgres and dynamically generating the
appropriate UPSERT statements to handle sending the records? As a follow up
to that, if I did need to write my own RichSinkFunction, would I need to
implement my own checkmarking for resilience purposes or does that come
along for the ride for RichSinkFunctions?

Any insight or approaches would be welcome!

Thanks,

Rion


Re: Defining GlobalJobParameters in Flink Unit Testing Harnesses

2021-03-04 Thread Rion Williams
Thanks Chesnay!

I tried giving that a shot but I still wasn't able to access the
globalJobParameters from within the open function in my KeyedProcessFunction.
You can see the implementation below which I believe should be correct:

object CustomProcessFunctionTestHarness {
fun  forKeyedProcessFunction(
function: KeyedProcessFunction,
keySelector: KeySelector,
keyType: TypeInformation,
parameters: ParameterTool
): KeyedOneInputStreamOperatorTestHarness {
val testHarness: KeyedOneInputStreamOperatorTestHarness
=
KeyedOneInputStreamOperatorTestHarness(
KeyedProcessOperator(Preconditions.checkNotNull(function)),
keySelector,
keyType,
1,
1,
0
)

// Adjust execution configuration via parameters
testHarness.executionConfig.globalJobParameters = parameters
testHarness.open()

return testHarness
}
}

Usage-wise, the declaration looks as you might expect:

bufferedMagicWindowHarness =
CustomProcessFunctionTestHarness.forKeyedProcessFunction(
MagicWindowFunction(),
{ log -> log.getKey() },
TypeInformation.of(String::class.java),
ParameterTool.fromArgs(arrayOf("--processingTimeBuffer",
"6"))
)

And then I think as I described earlier, these are attempted to be read via
the following in the open() function:

val parameters = runtimeContext.executionConfig.globalJobParameters as?
ParameterTool
if (parameters != null) {
 processingTimeBuffer = parameters.getLong("processingTimeBuffer")
}

Does anything look out of place here? I haven't gone spelunking into the
source code for this yet, but I'm assuming that I'm setting the correct
values on the execution configuration.

Thanks again,

Rion


On Thu, Mar 4, 2021 at 7:57 AM Chesnay Schepler  wrote:

> The reason why your attempts have failed is that
> ProcessFunctionTestHarnesses.forKeyedProcessFunction automatically calls
> open(), thus any mutations on the harness happen too late.
>
> I'd suggest to take a look at the implementation of that method and
> essentially copy the code.
> You can then call the harness constructor manually and mutate the
> execution config before calling open().
>
> On 3/4/2021 2:49 PM, Rion Williams wrote:
>
> Absolutely,
>
> I think it's gone through quite a few iterations, but this is the current
> state of it (defined in a @Before function as part of scaffolding out the
> tests):
>
> private lateinit var magicWindowHarness:
> KeyedOneInputStreamOperatorTestHarness
>
> @Before
> fun init() {
> magicWindowHarness =
> ProcessFunctionTestHarnesses.forKeyedProcessFunction(
> MagicWindowFunction(),
> { log -> log.getKey() },
> TypeInformation.of(String::class.java)
> )
> }
>
> I've also tried a few variants of that with a separate declaration for the
> function itself, etc.
>
>
>
> On Thu, Mar 4, 2021 at 6:47 AM Chesnay Schepler 
> wrote:
>
>> Could you show us how you create test harness?
>>
>> On 3/4/2021 5:13 AM, Rion Williams wrote:
>>
>> Hi all,
>>
>> Early today I had asked a few questions regarding the use of the many
>> testing constructs available within Flink and believe that I have things in
>> a good direction at present. I did run into a specific case that either may
>> not be supported, or just isn't documented well enough for me to determine
>> what is going wrong.
>>
>> Basically, I have a KeyedProcessFunction that reads some global-level
>> configuration via GlobalJobParameters within its open function:
>>
>> override fun open(configuration: Configuration) {
>> // Omitted for brevity
>>
>> val parameters = runtimeContext.executionConfig.globalJobParameters
>> as? ParameterTool
>> if (parameters != null) {
>> processingTimeBuffer = parameters.getLong("processingTimeBuffer",
>> 0L)
>> }
>> }
>>
>> This works just as expected within the actual pipeline itself when set
>> similarly:
>>
>> streamEnvironment.config.globalJobParameters = parameters
>>
>> However, I don't see an effective way to set this against a TestHarness
>> as I've made several attempts but I never can seem to populate the
>> globalJobParameters property within the KeyedProcessFunction itself using a
>> test harness despite multiple attempts
>>
>> // Attempt 1
>> magicWindowHarness.operator.runtimeContext.executionConfig.globalJobParameters
>> = ParameterTool.fromMap(...)
>>
>> // Attempt 2
>> magicWindowHarness.executionConfig.global

Re: Defining GlobalJobParameters in Flink Unit Testing Harnesses

2021-03-04 Thread Rion Williams
Absolutely,

I think it's gone through quite a few iterations, but this is the current
state of it (defined in a @Before function as part of scaffolding out the
tests):

private lateinit var magicWindowHarness:
KeyedOneInputStreamOperatorTestHarness

@Before
fun init() {
magicWindowHarness =
ProcessFunctionTestHarnesses.forKeyedProcessFunction(
MagicWindowFunction(),
{ log -> log.getKey() },
TypeInformation.of(String::class.java)
)
}

I've also tried a few variants of that with a separate declaration for the
function itself, etc.



On Thu, Mar 4, 2021 at 6:47 AM Chesnay Schepler  wrote:

> Could you show us how you create test harness?
>
> On 3/4/2021 5:13 AM, Rion Williams wrote:
>
> Hi all,
>
> Early today I had asked a few questions regarding the use of the many
> testing constructs available within Flink and believe that I have things in
> a good direction at present. I did run into a specific case that either may
> not be supported, or just isn't documented well enough for me to determine
> what is going wrong.
>
> Basically, I have a KeyedProcessFunction that reads some global-level
> configuration via GlobalJobParameters within its open function:
>
> override fun open(configuration: Configuration) {
> // Omitted for brevity
>
> val parameters = runtimeContext.executionConfig.globalJobParameters
> as? ParameterTool
> if (parameters != null) {
> processingTimeBuffer = parameters.getLong("processingTimeBuffer",
> 0L)
> }
> }
>
> This works just as expected within the actual pipeline itself when set
> similarly:
>
> streamEnvironment.config.globalJobParameters = parameters
>
> However, I don't see an effective way to set this against a TestHarness as
> I've made several attempts but I never can seem to populate the
> globalJobParameters property within the KeyedProcessFunction itself using a
> test harness despite multiple attempts
>
> // Attempt 1
> magicWindowHarness.operator.runtimeContext.executionConfig.globalJobParameters
> = ParameterTool.fromMap(...)
>
> // Attempt 2
> magicWindowHarness.executionConfig.globalJobParameters =
> ParameterTool.fromMap(...)
>
> // Attempt 3
> magicWindowHarness.environment.executionConfig.globalJobParameters =
> ParameterTool.fromMap(...)
>
> // Attempt 4
> val env = StreamExecutionEnvironment.
> env.config.globalJobParameters = ParameterTool.fromMap(...)
>
> Is this supported or am I simply going about it the wrong way? Or even
> just perhaps missing a piece of the puzzle?
>
> Thanks much,
>
> Rion
>
>
>


Defining GlobalJobParameters in Flink Unit Testing Harnesses

2021-03-03 Thread Rion Williams
Hi all,

Early today I had asked a few questions regarding the use of the many
testing constructs available within Flink and believe that I have things in
a good direction at present. I did run into a specific case that either may
not be supported, or just isn't documented well enough for me to determine
what is going wrong.

Basically, I have a KeyedProcessFunction that reads some global-level
configuration via GlobalJobParameters within its open function:

override fun open(configuration: Configuration) {
// Omitted for brevity

val parameters = runtimeContext.executionConfig.globalJobParameters as?
ParameterTool
if (parameters != null) {
processingTimeBuffer = parameters.getLong("processingTimeBuffer",
0L)
}
}

This works just as expected within the actual pipeline itself when set
similarly:

streamEnvironment.config.globalJobParameters = parameters

However, I don't see an effective way to set this against a TestHarness as
I've made several attempts but I never can seem to populate the
globalJobParameters property within the KeyedProcessFunction itself using a
test harness despite multiple attempts

// Attempt 1
magicWindowHarness.operator.runtimeContext.executionConfig.globalJobParameters
= ParameterTool.fromMap(...)

// Attempt 2
magicWindowHarness.executionConfig.globalJobParameters =
ParameterTool.fromMap(...)

// Attempt 3
magicWindowHarness.environment.executionConfig.globalJobParameters =
ParameterTool.fromMap(...)

// Attempt 4
val env = StreamExecutionEnvironment.
env.config.globalJobParameters = ParameterTool.fromMap(...)

Is this supported or am I simply going about it the wrong way? Or even just
perhaps missing a piece of the puzzle?

Thanks much,

Rion


Re: Unit Testing State Stores in KeyedProcessFunctions

2021-03-03 Thread Rion Williams
Thanks Chesnay,

I agree that output testing is more practical and far less brittle, I was just 
curious if support was there for it. I have a specific use case where I’m 
managing my own windows and may schedule something to be emitted but after some 
processing time delay so it could potentially be valuable to see this 
scheduling in state since it may not directly coincide with output.

Not a huge deal, I already have tests in place that function as black boxes 
with output verification, it was more of a question if it was supported.

Thanks much,

Rion

> On Mar 3, 2021, at 2:44 PM, Chesnay Schepler  wrote:
> 
> 
> I do not believe this to be possible.
> 
> Given that the state will likely in some form affect the behavior of the 
> function (usually in regards to what it outputs), it may be a better idea to 
> test for that. (I suppose you'd want tests like that anyway)
> 
> On 3/3/2021 8:10 PM, Rion Williams wrote:
>> Hi all!
>> 
>> Is it possible to apply assertions against the underlying state stores 
>> within a KeyedProcessFunction using the existing 
>> KeyedOneInputStreamOperatorTestHarness class within unit tests? Basically I 
>> wanted to ensure that if I passed in two elements each with unique keys that 
>> I would be able to query the underlying state stores to ensure they were 
>> working as expected. I don’t really see a mechanism that would support such 
>> behavior (i.e. give me the state store for key n from the operator?)
>> 
>> @Test
>> fun `Verify that instances with different keys retain separate watermarks`() 
>> {
>> // Arrange
>> val logs = listOf(
>> StreamRecord(TestGenerator.generateLog(tenant = "A")),
>> StreamRecord(TestGenerator.generateLog(tenant = "B")),
>> )
>> 
>> // Act
>> magicWindowHarness
>> .processElements(logs)
>> 
>> // Assert (I'd like to access the state by key for each here)
>> assert(magicWindowHarness.getStateForKey("A"), ...)
>> assert(magicWindowHarness.getStateForKey("B"), ...)
>> }
>> 
>> Is something like this possible or is there a better way to access the 
>> underlying state store? It seemed to work as expected when only a single key 
>> was involved, but when multiple keys were involved, things seemed to fall 
>> apart. The current testing documentation [0] is fantastic, however I think 
>> this might qualify as a more advanced task than it covered.
>> 
>> At present all of the state stores of the underlying function are privately 
>> declared, which may/may not be relevant:
>> 
>> @Transient private lateinit var watermark: ValueState
>> @Transient private lateinit var scheduledEvictions: MapState
>> 
>> Any recommendations or advice would be greatly appreciated and I'll be happy 
>> to provide any additional context/details as needed.
>> 
>> Thanks a lot!
>> 
>> Rion
>> 
>> [0]: 
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html
> 


Unit Testing State Stores in KeyedProcessFunctions

2021-03-03 Thread Rion Williams
Hi all!

Is it possible to apply assertions against the underlying state stores
within a KeyedProcessFunction using the existing
KeyedOneInputStreamOperatorTestHarness class within unit tests? Basically I
wanted to ensure that if I passed in two elements each with unique keys
that I would be able to query the underlying state stores to ensure they
were working as expected. I don’t really see a mechanism that would support
such behavior (i.e. give me the state store for key n from the operator?)

@Test
fun `Verify that instances with different keys retain separate
watermarks`() {
// Arrange
val logs = listOf(
StreamRecord(TestGenerator.generateLog(tenant = "A")),
StreamRecord(TestGenerator.generateLog(tenant = "B")),
)

// Act
magicWindowHarness
.processElements(logs)

// Assert (I'd like to access the state by key for each here)
assert(magicWindowHarness.getStateForKey("A"), ...)
assert(magicWindowHarness.getStateForKey("B"), ...)
}

Is something like this possible or is there a better way to access the
underlying state store? It seemed to work as expected when only a single
key was involved, but when multiple keys were involved, things seemed to
fall apart. The current testing documentation [0] is fantastic, however I
think this might qualify as a more advanced task than it covered.

At present all of the state stores of the underlying function are privately
declared, which may/may not be relevant:

@Transient private lateinit var watermark: ValueState
@Transient private lateinit var scheduledEvictions: MapState

Any recommendations or advice would be greatly appreciated and I'll be
happy to provide any additional context/details as needed.

Thanks a lot!

Rion

[0]:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html


Re: Handling Data Separation / Watermarking from Kafka in Flink

2021-03-01 Thread Rion Williams
Hey David et all,

I had one follow up question for this as I've been putting together some
integration/unit tests to verify that things are working as expected with
finite datasets (e.g. a text file with several hundred records that are
serialized, injected into Kafka, and processed through the pipeline). I'm
wondering if there's a good strategy to handle these finite sets (i.e. when
I'm done reading through all of the records that I care about, I'd need to
trigger something to explicitly flush the windows / evict messages. I'm not
sure what a great approach would be to handle here? I don't think there's
an easy way to simulate processing time delays outside of an explicit
Thread.sleep() call prior to injecting some messages into the running
pipeline asynchronously.

Any recommendations for handling something like this? I must imagine that
it's a fairly common use-case for testing, but maybe not?

Thanks much!

Rion

On Sat, Feb 27, 2021 at 10:56 AM Rion Williams 
wrote:

> Thanks David,
>
> I figured that the correct approach would obviously be to adopt a keying
> strategy upstream to ensure the same data that I used as a key downstream
> fell on the same partition (ensuring the ordering guarantees I’m looking
> for).
>
> I’m guessing implementation-wise, when I would normally evict a window
> after some event time and allowed lateness, I could set a timer or just
> explicitly keep the window open for some additional time to allow for out
> of order data to make its way into the window.
>
> Either way - I think the keying is probably the right approach, but I
> wanted to consider any other options should that become an issue upstream.
>
> Thanks!
>
> Rion
>
> On Feb 27, 2021, at 10:21 AM, David Anderson  wrote:
>
> 
> Rion,
>
> If you can arrange for each tenant's events to be in only one kafka
> partition, that should be the best way to simplify the processing you need
> to do. Otherwise, a simple change that may help would be to increase the
> bounded delay you use in calculating your own per-tenant watermarks,
> thereby making late events less likely.
>
> David
>
> On Sat, Feb 27, 2021 at 3:29 AM Rion Williams 
> wrote:
>
>> David and Timo,
>>
>> Firstly, thank you both so much for your contributions and advice. I
>> believe I’ve implemented things along the lines that you both detailed and
>> things appear to work just as expected (e.g. I can see things arriving,
>> being added to windows, discarding late records, and ultimately writing out
>> files as expected).
>>
>> With that said, I have one question / issue that I’ve run into with
>> handling the data coming my Kafka topic. Currently, my tenant/source (i.e.
>> my key) may be distributed across the 10 partitions of my Kafka topic. With
>> the way that I’m consuming from this topic (with a Kafka Consumer), it
>> looks like my data is arriving in a mixed order which seems to be causing
>> my own watermarks (those stored in my ValueState) to process as later data
>> may arrive earlier than other data and cause my windows to be evicted.
>>
>> I’m currently using the `withNoWatermarks()` along with a custom
>> timestamp assigned to handle all of my timestamping, but is there a
>> mechanism to handle the mixed ordering across partitions in this scenario
>> at the Flink level?
>>
>> I know the answer here likely lies with Kafka and adopting a better
>> keying strategy to ensure the same tenant/source (my key) lands on the same
>> partition, which by definition ensures ordering. I’m just wondering if
>> there’s some mechanism to accomplish this post-reading from Kafka in Flink
>> within my pipeline to handle things in a similar fashion?
>>
>> Again - thank you both so much, I’m loving the granularity and control
>> that Flink has been providing me over other streaming technologies I’ve
>> used in the past. I’m totally sold on it and am looking forward to doing
>> more incredible things with it.
>>
>> Best regards,
>>
>> Rion
>>
>> On Feb 26, 2021, at 4:36 AM, David Anderson  wrote:
>>
>> 
>> Yes indeed, Timo is correct -- I am proposing that you not use timers at
>> all. Watermarks and event-time timers go hand in hand -- and neither
>> mechanism can satisfy your requirements.
>>
>> You can instead put all of the timing logic in the processElement method
>> -- effectively emulating what you would get if Flink were to offer per-key
>> watermarking.
>>
>> The reason that the PseudoWindow example is using MapState is that for
>> each key/tenant, more than one window can be active simultaneously. This
>> occurs because the event stream is out-of-order with 

Re: Using Prometheus Client Metrics in Flink

2021-02-28 Thread Rion Williams
It looks like I was finally able to get the expected labeling behavior that
I was looking for by simply storing a reference to the underlying
MetricGroup and then keeping track of any new metrics that I needed to
dynamically create and use downstream:

class MagicMetricRegistry(private val metricGroup: MetricGroup):
Serializable {
// Reference for all of the registered metrics
private val registeredMetrics: HashMap = hashMapOf()

// Increments a given metric by key
fun inc(metric: String, tenant: String, source: String, amount: Long =
1) {
// Store a key
val key = "$metric-$tenant-$source"
if (!registeredMetrics.containsKey(key)){
registeredMetrics[key] = metricGroup
.addGroup("tenant", tenant)
.addGroup("source", source)
.counter(metric)
}

// Update the metric by a given amount
registeredMetrics[key]!!.inc(amount)
}
}

And then simply within the open function call in my KeyedProcessFunction, I
stored a reference to it and registered any new, in this case tenant/source
combinations, as they came in:

class MagicWindowFunction: KeyedProcessFunction<...>() {
@Transient private lateinit var metrics: MagicMetricRegistry

override fun open(parameters: Configuration) {
metrics = MagicMetricRegistry(runtimeContext.metricGroup)
}

override fun processElement(...) {
// Omitted for brevity

metrics.inc("logs_seen", "my-tenant", "my-source")
}

// Omitted for brevity
}

This appears to be working as expected as far as I can tell at this point.
I can see all of the expected labels appearing within Prometheus and
further downstream in Grafana!

Thanks again,

Rion

On Sun, Feb 28, 2021 at 8:15 AM Rion Williams  wrote:

> Thanks Dylan,
>
> Totally understandable. I already have the appropriate exporters /
> monitors in place for scraping metrics from Flink, including custom ones,
> into Prometheus. The labeling challenge is really the big one as while I
> see lots of labels for the metrics being exported (e.g. job id, worker,
> etc.) I didn’t see a mechanism to inject my own into those coming from
> Flink.
>
> Additionally, in my specific use case I’m dealing with a multi-tenant
> pipeline (I.e. reading messages from a single multi-tenant Kafka topic),
> which is where the labeling comes in. I’d love to be able to have a counter
> (among other types of metrics) with their appropriate labels for each
> tenant.
>
> I suppose I could implement a custom counter or series of counters (one
> for each tenant) that would each be responsible for keeping track of their
> own respective tenant values. In my case I’m dealing with a
> KeyedProcessFunction, so I only have access to the key (tenant) within the
> processElement function as opposed to when the function is initially
> opened, where I understand you would typically register a metric.
>
> Sorry for the somewhat convoluted response, I’m still getting accustomed
> to some of the Flink APIs, specifically around metrics.
>
> Thanks,
>
> Rion
>
> On Feb 28, 2021, at 8:02 AM, Meissner, Dylan <
> dylan.t.meiss...@nordstrom.com> wrote:
>
> 
> Hi Rion,
>
> Regarding the question about adding Prometheus labels out of the box. This
> is common ask of all exporters, but Prometheus philosophy sees this as an
> "anti-pattern" as the metrics source can often be ambivalent about context.
> See [0] for example of such a discussion.
>
> Instead, we can establish context during service discovery. If, for
> example, we run clusters for tenants on Kubernetes, then within the
> kubernetes_sd_config [1] labelling rules we can instruct Prometheus to add
> the Kubernetes labels from the pods, such as "tenant-id: foo" and
> "environment: staging" to each incoming metric it processes.
>
> This isn't limited to Kubernetes; each of the service discovery configs
> designed to accomodate translating metadata from context into metric labels.
>
> If this doesn't work for you, then consider encoding tenant identifier
> into job names, and extract this identifier in a metric_relabel_config [2]
>
> [0]: https://github.com/prometheus/node_exporter/issues/319
> [1]:
> https://prometheus.io/docs/prometheus/latest/configuration/configuration/#kubernetes_sd_config
> [2]:
> https://prometheus.io/docs/prometheus/latest/configuration/configuration/#metric_relabel_configs
>
>
> --
> *From:* Rion Williams 
> *Sent:* Sunday, February 28, 2021 12:46 AM
> *To:* Prasanna kumar 
> *Cc:* user 
> *Subject:* Re: Using Prometheus Client Metrics in Flink
>
> Hi Prassana,
>
> Thanks for that. It’s what I was doing pr

Re: Using Prometheus Client Metrics in Flink

2021-02-28 Thread Rion Williams
Thanks Dylan,

Totally understandable. I already have the appropriate exporters / monitors in 
place for scraping metrics from Flink, including custom ones, into Prometheus. 
The labeling challenge is really the big one as while I see lots of labels for 
the metrics being exported (e.g. job id, worker, etc.) I didn’t see a mechanism 
to inject my own into those coming from Flink.

Additionally, in my specific use case I’m dealing with a multi-tenant pipeline 
(I.e. reading messages from a single multi-tenant Kafka topic), which is where 
the labeling comes in. I’d love to be able to have a counter (among other types 
of metrics) with their appropriate labels for each tenant.

I suppose I could implement a custom counter or series of counters (one for 
each tenant) that would each be responsible for keeping track of their own 
respective tenant values. In my case I’m dealing with a KeyedProcessFunction, 
so I only have access to the key (tenant) within the processElement function as 
opposed to when the function is initially opened, where I understand you would 
typically register a metric.

Sorry for the somewhat convoluted response, I’m still getting accustomed to 
some of the Flink APIs, specifically around metrics.

Thanks,

Rion

> On Feb 28, 2021, at 8:02 AM, Meissner, Dylan  
> wrote:
> 
> 
> Hi Rion,
> 
> Regarding the question about adding Prometheus labels out of the box. This is 
> common ask of all exporters, but Prometheus philosophy sees this as an 
> "anti-pattern" as the metrics source can often be ambivalent about context. 
> See [0] for example of such a discussion.
> 
> Instead, we can establish context during service discovery. If, for example, 
> we run clusters for tenants on Kubernetes, then within the 
> kubernetes_sd_config [1] labelling rules we can instruct Prometheus to add 
> the Kubernetes labels from the pods, such as "tenant-id: foo" and 
> "environment: staging" to each incoming metric it processes.
> 
> This isn't limited to Kubernetes; each of the service discovery configs 
> designed to accomodate translating metadata from context into metric labels.
> 
> If this doesn't work for you, then consider encoding tenant identifier into 
> job names, and extract this identifier in a metric_relabel_config [2]
> 
> [0]: https://github.com/prometheus/node_exporter/issues/319
> [1]: 
> https://prometheus.io/docs/prometheus/latest/configuration/configuration/#kubernetes_sd_config
> [2]: 
> https://prometheus.io/docs/prometheus/latest/configuration/configuration/#metric_relabel_configs
> 
> 
> From: Rion Williams 
> Sent: Sunday, February 28, 2021 12:46 AM
> To: Prasanna kumar 
> Cc: user 
> Subject: Re: Using Prometheus Client Metrics in Flink
>  
> Hi Prassana,
> 
> Thanks for that. It’s what I was doing previously as a workaround however I 
> was just curious if there was any Flink-specific functionality to handle this 
> prior to Prometheus.
> 
> Additionally from the docs on metrics [0], it seems that there’s a pattern in 
> place to use supported third-party metrics such as those from 
> CodeHale/DropWizard via a Maven package (flink-metrics-dropwizard). I do see 
> a similarly named package for Prometheus which may be what I’m looking for as 
> it’s similarly named (flink-metrics-prometheus), so I may give that a try.
> 
> Thanks,
> 
> Rion
> 
> [0]: https://ci.apache.org/projects/flink/flink-docs-stable/ops/metrics.html
> 
>>> On Feb 28, 2021, at 12:20 AM, Prasanna kumar 
>>>  wrote:
>>> 
>> 
>> Rion,
>> 
>> Regarding the second question , you can aggregate by using sum function  
>> sum(metric_name{jobb_name="JOBNAME"}) .  This works is you are using the 
>> metric counter.
>> 
>> Prasanna.
>> 
>> On Sat, Feb 27, 2021 at 9:01 PM Rion Williams  wrote:
>> Hi folks,
>> 
>> I’ve just recently started working with Flink and I was in the process of 
>> adding some metrics through my existing pipeline with the hopes of building 
>> some Grafana dashboards with them to help with observability.
>> 
>> Initially I looked at the built-in Flink metrics that were available, but I 
>> didn’t see an easy mechanism for setting/using labels with them. 
>> Essentially, I have two properties for my messages coming through the 
>> pipeline that I’d like to be able to keep track of (tenant/source) across 
>> several metrics (e.g. total_messages with tenant / source labels, etc.). I 
>> didn’t see an easy way to adjust this out of the box, or wasn’t aware of a 
>> good pattern for handling these.
>> 
>> I had previously used the Prometheus Client metrics [0] to accomplish this 
>> in the past but I wasn’t entirely s

Re: Using Prometheus Client Metrics in Flink

2021-02-28 Thread Rion Williams
Hi Prassana,

Thanks for that. It’s what I was doing previously as a workaround however I was 
just curious if there was any Flink-specific functionality to handle this prior 
to Prometheus.

Additionally from the docs on metrics [0], it seems that there’s a pattern in 
place to use supported third-party metrics such as those from 
CodeHale/DropWizard via a Maven package (flink-metrics-dropwizard). I do see a 
similarly named package for Prometheus which may be what I’m looking for as 
it’s similarly named (flink-metrics-prometheus), so I may give that a try.

Thanks,

Rion

[0]: https://ci.apache.org/projects/flink/flink-docs-stable/ops/metrics.html

> On Feb 28, 2021, at 12:20 AM, Prasanna kumar  
> wrote:
> 
> 
> Rion,
> 
> Regarding the second question , you can aggregate by using sum function  
> sum(metric_name{jobb_name="JOBNAME"}) .  This works is you are using the 
> metric counter.
> 
> Prasanna.
> 
>> On Sat, Feb 27, 2021 at 9:01 PM Rion Williams  wrote:
>> Hi folks,
>> 
>> I’ve just recently started working with Flink and I was in the process of 
>> adding some metrics through my existing pipeline with the hopes of building 
>> some Grafana dashboards with them to help with observability.
>> 
>> Initially I looked at the built-in Flink metrics that were available, but I 
>> didn’t see an easy mechanism for setting/using labels with them. 
>> Essentially, I have two properties for my messages coming through the 
>> pipeline that I’d like to be able to keep track of (tenant/source) across 
>> several metrics (e.g. total_messages with tenant / source labels, etc.). I 
>> didn’t see an easy way to adjust this out of the box, or wasn’t aware of a 
>> good pattern for handling these.
>> 
>> I had previously used the Prometheus Client metrics [0] to accomplish this 
>> in the past but I wasn’t entirely sure how it would/could mesh with Flink. 
>> Does anyone have experience in working with these or know if they are 
>> supported?
>> 
>> Secondly, when using the Flink metrics, I noticed I was receiving a separate 
>> metric for each task that was being spun up. Is there an “easy button” to 
>> handle aggregating these to ensure that a single metric (e.g. 
>> total_messages) reflects the total processed across all of the tasks instead 
>> of each individual one?
>> 
>> Any recommendations / resources / advice would be greatly appreciated!
>> 
>> Thanks,
>> 
>> Rion
>> 
>> [0] : https://prometheus.io/docs/instrumenting/clientlibs/


Re: Handling Data Separation / Watermarking from Kafka in Flink

2021-02-27 Thread Rion Williams
Thanks David,

I figured that the correct approach would obviously be to adopt a keying 
strategy upstream to ensure the same data that I used as a key downstream fell 
on the same partition (ensuring the ordering guarantees I’m looking for).

I’m guessing implementation-wise, when I would normally evict a window after 
some event time and allowed lateness, I could set a timer or just explicitly 
keep the window open for some additional time to allow for out of order data to 
make its way into the window.

Either way - I think the keying is probably the right approach, but I wanted to 
consider any other options should that become an issue upstream.

Thanks!

Rion

> On Feb 27, 2021, at 10:21 AM, David Anderson  wrote:
> 
> 
> Rion,
> 
> If you can arrange for each tenant's events to be in only one kafka 
> partition, that should be the best way to simplify the processing you need to 
> do. Otherwise, a simple change that may help would be to increase the bounded 
> delay you use in calculating your own per-tenant watermarks, thereby making 
> late events less likely.
> 
> David
> 
>> On Sat, Feb 27, 2021 at 3:29 AM Rion Williams  wrote:
>> David and Timo,
>> 
>> Firstly, thank you both so much for your contributions and advice. I believe 
>> I’ve implemented things along the lines that you both detailed and things 
>> appear to work just as expected (e.g. I can see things arriving, being added 
>> to windows, discarding late records, and ultimately writing out files as 
>> expected).
>> 
>> With that said, I have one question / issue that I’ve run into with handling 
>> the data coming my Kafka topic. Currently, my tenant/source (i.e. my key) 
>> may be distributed across the 10 partitions of my Kafka topic. With the way 
>> that I’m consuming from this topic (with a Kafka Consumer), it looks like my 
>> data is arriving in a mixed order which seems to be causing my own 
>> watermarks (those stored in my ValueState) to process as later data may 
>> arrive earlier than other data and cause my windows to be evicted.
>> 
>> I’m currently using the `withNoWatermarks()` along with a custom timestamp 
>> assigned to handle all of my timestamping, but is there a mechanism to 
>> handle the mixed ordering across partitions in this scenario at the Flink 
>> level?
>> 
>> I know the answer here likely lies with Kafka and adopting a better keying 
>> strategy to ensure the same tenant/source (my key) lands on the same 
>> partition, which by definition ensures ordering. I’m just wondering if 
>> there’s some mechanism to accomplish this post-reading from Kafka in Flink 
>> within my pipeline to handle things in a similar fashion?
>> 
>> Again - thank you both so much, I’m loving the granularity and control that 
>> Flink has been providing me over other streaming technologies I’ve used in 
>> the past. I’m totally sold on it and am looking forward to doing more 
>> incredible things with it.
>> 
>> Best regards,
>> 
>> Rion
>> 
>>>> On Feb 26, 2021, at 4:36 AM, David Anderson  wrote:
>>>> 
>>> 
>>> Yes indeed, Timo is correct -- I am proposing that you not use timers at 
>>> all. Watermarks and event-time timers go hand in hand -- and neither 
>>> mechanism can satisfy your requirements.
>>> 
>>> You can instead put all of the timing logic in the processElement method -- 
>>> effectively emulating what you would get if Flink were to offer per-key 
>>> watermarking.
>>> 
>>> The reason that the PseudoWindow example is using MapState is that for each 
>>> key/tenant, more than one window can be active simultaneously. This occurs 
>>> because the event stream is out-of-order with respect to time, so events 
>>> for the "next window" are probably being processed before "the previous" 
>>> window is complete. And if you want to accommodate allowed lateness, the 
>>> requirement to have several windows open at once becomes even more 
>>> important. 
>>> 
>>> MapState gives you a per-tenant hashmap, where each entry in that map 
>>> corresponds to an open window for some particular tenant, where the map's 
>>> key is the timestamp for a window, and the value is whatever state you want 
>>> that window to hold.
>>> 
>>> Best regards,
>>> David
>>> 
>>> 
>>> 
>>> 
>>>> On Fri, Feb 26, 2021 at 9:44 AM Timo Walther  wrote:
>>>> Hi Rion,
>>>> 
>>>> I think what David was refering to is that you do the entire time 
>>>> 

Using Prometheus Client Metrics in Flink

2021-02-27 Thread Rion Williams
Hi folks,

I’ve just recently started working with Flink and I was in the process of 
adding some metrics through my existing pipeline with the hopes of building 
some Grafana dashboards with them to help with observability.

Initially I looked at the built-in Flink metrics that were available, but I 
didn’t see an easy mechanism for setting/using labels with them. Essentially, I 
have two properties for my messages coming through the pipeline that I’d like 
to be able to keep track of (tenant/source) across several metrics (e.g. 
total_messages with tenant / source labels, etc.). I didn’t see an easy way to 
adjust this out of the box, or wasn’t aware of a good pattern for handling 
these.

I had previously used the Prometheus Client metrics [0] to accomplish this in 
the past but I wasn’t entirely sure how it would/could mesh with Flink. Does 
anyone have experience in working with these or know if they are supported?

Secondly, when using the Flink metrics, I noticed I was receiving a separate 
metric for each task that was being spun up. Is there an “easy button” to 
handle aggregating these to ensure that a single metric (e.g. total_messages) 
reflects the total processed across all of the tasks instead of each individual 
one?

Any recommendations / resources / advice would be greatly appreciated!

Thanks,

Rion

[0] : https://prometheus.io/docs/instrumenting/clientlibs/

Re: Handling Data Separation / Watermarking from Kafka in Flink

2021-02-26 Thread Rion Williams
David and Timo,

Firstly, thank you both so much for your contributions and advice. I believe 
I’ve implemented things along the lines that you both detailed and things 
appear to work just as expected (e.g. I can see things arriving, being added to 
windows, discarding late records, and ultimately writing out files as expected).

With that said, I have one question / issue that I’ve run into with handling 
the data coming my Kafka topic. Currently, my tenant/source (i.e. my key) may 
be distributed across the 10 partitions of my Kafka topic. With the way that 
I’m consuming from this topic (with a Kafka Consumer), it looks like my data is 
arriving in a mixed order which seems to be causing my own watermarks (those 
stored in my ValueState) to process as later data may arrive earlier than other 
data and cause my windows to be evicted.

I’m currently using the `withNoWatermarks()` along with a custom timestamp 
assigned to handle all of my timestamping, but is there a mechanism to handle 
the mixed ordering across partitions in this scenario at the Flink level?

I know the answer here likely lies with Kafka and adopting a better keying 
strategy to ensure the same tenant/source (my key) lands on the same partition, 
which by definition ensures ordering. I’m just wondering if there’s some 
mechanism to accomplish this post-reading from Kafka in Flink within my 
pipeline to handle things in a similar fashion?

Again - thank you both so much, I’m loving the granularity and control that 
Flink has been providing me over other streaming technologies I’ve used in the 
past. I’m totally sold on it and am looking forward to doing more incredible 
things with it.

Best regards,

Rion

> On Feb 26, 2021, at 4:36 AM, David Anderson  wrote:
> 
> 
> Yes indeed, Timo is correct -- I am proposing that you not use timers at all. 
> Watermarks and event-time timers go hand in hand -- and neither mechanism can 
> satisfy your requirements.
> 
> You can instead put all of the timing logic in the processElement method -- 
> effectively emulating what you would get if Flink were to offer per-key 
> watermarking.
> 
> The reason that the PseudoWindow example is using MapState is that for each 
> key/tenant, more than one window can be active simultaneously. This occurs 
> because the event stream is out-of-order with respect to time, so events for 
> the "next window" are probably being processed before "the previous" window 
> is complete. And if you want to accommodate allowed lateness, the requirement 
> to have several windows open at once becomes even more important. 
> 
> MapState gives you a per-tenant hashmap, where each entry in that map 
> corresponds to an open window for some particular tenant, where the map's key 
> is the timestamp for a window, and the value is whatever state you want that 
> window to hold.
> 
> Best regards,
> David
> 
> 
> 
> 
>> On Fri, Feb 26, 2021 at 9:44 AM Timo Walther  wrote:
>> Hi Rion,
>> 
>> I think what David was refering to is that you do the entire time 
>> handling yourself in process function. That means not using the 
>> `context.timerService()` or `onTimer()` that Flink provides but calling 
>> your own logic based on the timestamps that enter your process function 
>> and the stored state.
>> 
>> Regards,
>> Timo
>> 
>> 
>> On 26.02.21 00:29, Rion Williams wrote:
>> > 
>> > Hi David,
>> > 
>> > Thanks for your prompt reply, it was very helpful and the PseudoWindow 
>> > example is excellent. I believe it closely aligns with an approach that 
>> > I was tinkering with but seemed to be missing a few key pieces. In my 
>> > case, I'm essentially going to want to be aggregating the messages that 
>> > are coming into the window (a simple string-concatenation aggregation 
>> > would work). Would I need another form of state to hold that, as looking 
>> > through this example with naive eyes, it seems that this function is 
>> > currently storing multiple windows in state via the MapState provided:
>> > 
>> > // Keyed, managed state, with an entry for each window, keyed by the 
>> > window's end time.
>> > // There is a separate MapState object for each driver.
>> > private transient MapState sumOfTips;
>> > 
>> > If I wanted to perform an aggregation for each key/tenant, would a 
>> > MapState be appropriate? Such as a MapState if I was doing 
>> > a string aggregation, so that within my processElement function I could 
>> > use something similar for building these aggregations and ultimately 
>> > triggering them:
>> > 
>> > // Keep track of a tenant/source specific watermark
&

Re: Handling Data Separation / Watermarking from Kafka in Flink

2021-02-25 Thread Rion Williams
e [1] showing how to implement your own 
> tumbling event time windows with a process function. That implementation 
> assumes you can rely on watermarks for triggering the windows; you'll have to 
> do that differently. 
> 
> What you can do instead is to track, in ValueState, the largest timestamp 
> you've seen so far (for each key/tenant). Whenever that advances, you can 
> subtract the bounded-out-of-orderness duration from that timestamp, and then 
> check to see if the resulting value is now large enough to trigger any of the 
> windows for that key/tenant. 
> 
> Handling allowed lateness should be pretty straightforward. 
> 
> Hope this helps,
> David
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/learn-flink/event_driven.html#example
> 
>> On Thu, Feb 25, 2021 at 9:05 PM Rion Williams  wrote:
>> Hey folks, I have a somewhat high-level/advice question regarding Flink and 
>> if it has the mechanisms in place to accomplish what I’m trying to do. I’ve 
>> spent a good bit of time using Apache Beam, but recently pivoted over to 
>> native Flink simply because some of the connectors weren’t as mature or 
>> didn’t support some of the functionality that I needed.
>> 
>> Basically - I have a single Kafka topic with 10 partitions that I’m 
>> consuming from. This is a multi-tenant topic containing data that comes in 
>> at various times from various tenants and is not at all guaranteed to be in 
>> order, at least with regards to “event time”, which is what I care about. 
>> 
>> What I’m trying to accomplish is this: Given a multi-tenant topic with 
>> records eventually distributed across partitions, is it possible to consume 
>> and window each of these records independently of one another without one 
>> tenant potentially influencing another and write out to separate files per 
>> tenant/source (i.e. some other defined property on the records)?”
>> 
>> My pipeline currently looks something like this:
>> 
>> @JvmStatic
>> fun main(args: Array) {
>> val pipeline = StreamExecutionEnvironment
>> .getExecutionEnvironment()
>> //.createLocalEnvironmentWithWebUI(Configuration())
>> 
>> val properties = buildPropertiesFromArgs(args)
>> val stream = pipeline
>> .addSource(readFromKafka("events", properties))
>> .assignTimestampsAndWatermarks(
>> WatermarkStrategy
>> .forBoundedOutOfOrderness(Duration.ofSeconds(...))
>> .withTimestampAssigner { event: Event, _: Long ->
>> // Assign the created timestamp as the event timestamp
>> Instant(event.createdTimestamp).millis
>> }
>> )
>> 
>> // There are multiple data sources that each have their own windows and 
>> allowed lateness
>> // so ensure that each source only handles records for it
>> DataSources.forEach { source ->
>> stream
>> .filter { event ->
>> event.source == source.name
>> }
>> .keyBy { event ->
>> //print("Keying record with id ${record.`id$1`} by tenant 
>> ${record.`source$1`.tenantName}")
>> event.tenant
>> }
>> .window(
>> 
>> TumblingEventTimeWindows.of(Time.minutes(source.windowDuration))
>> )
>> .allowedLateness(
>> Time.minutes(source.allowedLateness)
>> )
>> .process(
>> // This just contains some logic to take the existing 
>> windows and construct a file
>> // using the window range and keys (tenant/source) with the 
>> values being 
>> // an aggregation of all of the records
>> WindowedEventProcessFunction(source.name)
>> )
>> .map { summary ->
>> // This would be a sink to write to a file
>> }
>> }
>> pipeline.execute("event-processor")
>> }
>> 
>> My overarching question is really - Can I properly separate the data with 
>> custom watermark strategies and ensure that keying (or some other construct) 
>> is enough to allow each tenant/source combination to be treated as it’s own 
>> stream with it’s own watermarking? I know I could possibly break the single 
>> topic up into multiple disparate topics, however that level of granularity 
>> would likely result in several thousand (7000+) topics so I'm hoping that 
>> some of the constructs available within Flink may help with this 
>> (WatermarkStrategies, etc.)
>> 
>> Any recommendations / advice would be extremely helpful as I'm quite new to 
>> the Flink world, however I have quite a bit of experience in Apache Beam, 
>> Kafka Streams, and a smattering of other streaming technologies.
>> 
>> Thanks much,
>> 
>> Rion


Handling Data Separation / Watermarking from Kafka in Flink

2021-02-25 Thread Rion Williams
Hey folks, I have a somewhat high-level/advice question regarding Flink and
if it has the mechanisms in place to accomplish what I’m trying to do. I’ve
spent a good bit of time using Apache Beam, but recently pivoted over to
native Flink simply because some of the connectors weren’t as mature or
didn’t support some of the functionality that I needed.

Basically - I have a single Kafka topic with 10 partitions that I’m
consuming from. This is a multi-tenant topic containing data that comes in
at various times from various tenants and is not at all guaranteed to be in
order, at least with regards to “event time”, which is what I care about.

What I’m trying to accomplish is this:
*Given a multi-tenant topic with records eventually distributed across
partitions, is it possible to consume and window each of these records
independently of one another without one tenant potentially influencing
another and write out to separate files per tenant/source (i.e. some other
defined property on the records)?”*
My pipeline currently looks something like this:

@JvmStatic
fun main(args: Array) {
val pipeline = StreamExecutionEnvironment
.getExecutionEnvironment()
//.createLocalEnvironmentWithWebUI(Configuration())

val properties = buildPropertiesFromArgs(args)
val stream = pipeline
.addSource(readFromKafka("events", properties))
.assignTimestampsAndWatermarks(
WatermarkStrategy
.forBoundedOutOfOrderness(Duration.ofSeconds(...))
.withTimestampAssigner { event: Event, _: Long ->
// Assign the created timestamp as the event timestamp
Instant(event.createdTimestamp).millis
}
)

// There are multiple data sources that each have their own windows and
allowed lateness
// so ensure that each source only handles records for it
DataSources.forEach { source ->
stream
.filter { event ->
event.source == source.name
}
.keyBy { event ->
//print("Keying record with id ${record.`id$1`} by tenant
${record.`source$1`.tenantName}")
event.tenant
}
.window(

TumblingEventTimeWindows.of(Time.minutes(source.windowDuration))
)
.allowedLateness(
Time.minutes(source.allowedLateness)
)
.process(
// This just contains some logic to take the existing
windows and construct a file
// using the window range and keys (tenant/source) with the
values being
// an aggregation of all of the records
WindowedEventProcessFunction(source.name)
)
.map { summary ->
// This would be a sink to write to a file
}
}
pipeline.execute("event-processor")
}

My overarching question is really - *Can I properly separate the data with
custom watermark strategies and ensure that keying (or some other
construct) is enough to allow each tenant/source combination to be treated
as it’s own stream with it’s own watermarking? *I know I could possibly
break the single topic up into multiple disparate topics, however that
level of granularity would likely result in several thousand (7000+) topics
so I'm hoping that some of the constructs available within Flink may help
with this (WatermarkStrategies, etc.)

Any recommendations / advice would be extremely helpful as I'm quite new to
the Flink world, however I have quite a bit of experience in Apache Beam,
Kafka Streams, and a smattering of other streaming technologies.

Thanks much,

Rion