Re: Using Prometheus Client Metrics in Flink

2021-02-27 Thread Prasanna kumar
Rion,

Regarding the second question , you can aggregate by using sum function
sum(metric_name{jobb_name="JOBNAME"}) .  This works is you are using the
metric counter.

Prasanna.

On Sat, Feb 27, 2021 at 9:01 PM Rion Williams  wrote:

> Hi folks,
>
> I’ve just recently started working with Flink and I was in the process of
> adding some metrics through my existing pipeline with the hopes of building
> some Grafana dashboards with them to help with observability.
>
> Initially I looked at the built-in Flink metrics that were available, but
> I didn’t see an easy mechanism for setting/using labels with them.
> Essentially, I have two properties for my messages coming through the
> pipeline that I’d like to be able to keep track of (tenant/source) across
> several metrics (e.g. total_messages with tenant / source labels, etc.). I
> didn’t see an easy way to adjust this out of the box, or wasn’t aware of a
> good pattern for handling these.
>
> I had previously used the Prometheus Client metrics [0] to accomplish this
> in the past but I wasn’t entirely sure how it would/could mesh with Flink.
> Does anyone have experience in working with these or know if they are
> supported?
>
> Secondly, when using the Flink metrics, I noticed I was receiving a
> separate metric for each task that was being spun up. Is there an “easy
> button” to handle aggregating these to ensure that a single metric (e.g.
> total_messages) reflects the total processed across all of the tasks
> instead of each individual one?
>
> Any recommendations / resources / advice would be greatly appreciated!
>
> Thanks,
>
> Rion
>
> [0] : https://prometheus.io/docs/instrumenting/clientlibs/
>


Re: Producer Configuration

2021-02-27 Thread Alexey Trenikhun
They are picked up, otherwise you would not able to write any messages at all. 
I believe the page you referring is not for displaying Kafka properties (in my 
case it is empty as well, but Kafka works). Check logs.


From: Claude M 
Sent: Saturday, February 27, 2021 4:00:23 PM
To: Alexey Trenikhun 
Cc: user 
Subject: Re: Producer Configuration

Yes, the flink job also works in producing messages.  It's just that after a 
short period of time, it fails w/ a timeout.  That is why I'm trying to set a 
longer timeout period but it doesn't seem like the properties are being picked 
up.

On Sat, Feb 27, 2021 at 1:17 PM Alexey Trenikhun 
mailto:yen...@msn.com>> wrote:
Can you produce messages using Kafka console producer connect using same 
properties ?


From: Claude M mailto:claudemur...@gmail.com>>
Sent: Saturday, February 27, 2021 8:05 AM
To: Alexey Trenikhun mailto:yen...@msn.com>>
Cc: user mailto:user@flink.apache.org>>
Subject: Re: Producer Configuration

Thanks for your reply, yes it was specified.  Sorry I forgot to include it:
 properties.setProperty("bootstrap.servers", "localhost:9092");

On Fri, Feb 26, 2021 at 7:56 PM Alexey Trenikhun 
mailto:yen...@msn.com>> wrote:
I believe bootstrap.servers is mandatory Kafka property, but it looks like you 
didn’t set it


From: Claude M mailto:claudemur...@gmail.com>>
Sent: Friday, February 26, 2021 12:02:10 PM
To: user mailto:user@flink.apache.org>>
Subject: Producer Configuration

Hello,

I created a simple Producer and when the job ran, it was getting the following 
error:
Caused by: org.apache.kafka.common.errors.TimeoutException

I read about increasing the request.timeout.ms.   
Thus, I added the following properties.

Properties properties = new Properties();
properties.setProperty("request.timeout.ms", 
"3");
properties.setProperty("retries", "20");
DataStream stream = env.addSource(new SimpleStringGenerator());
stream.addSink(new FlinkKafkaProducer<>("flink-test", new SimpleStringSchema(), 
properties));

However, after the job is submitted, the User Configuration is empty, please 
see attached.
Therefore, it seems these properties are taking into effect since I still have 
the same problem.
Any help on these issues are appreciated, thanks.


Re: Producer Configuration

2021-02-27 Thread Claude M
Yes, the flink job also works in producing messages.  It's just that after
a short period of time, it fails w/ a timeout.  That is why I'm trying to
set a longer timeout period but it doesn't seem like the properties are
being picked up.

On Sat, Feb 27, 2021 at 1:17 PM Alexey Trenikhun  wrote:

> Can you produce messages using Kafka console producer connect using same
> properties ?
>
> --
> *From:* Claude M 
> *Sent:* Saturday, February 27, 2021 8:05 AM
> *To:* Alexey Trenikhun 
> *Cc:* user 
> *Subject:* Re: Producer Configuration
>
> Thanks for your reply, yes it was specified.  Sorry I forgot to include
> it:
>  properties.setProperty("bootstrap.servers", "localhost:9092");
>
> On Fri, Feb 26, 2021 at 7:56 PM Alexey Trenikhun  wrote:
>
> I believe bootstrap.servers is mandatory Kafka property, but it looks like
> you didn’t set it
>
> --
> *From:* Claude M 
> *Sent:* Friday, February 26, 2021 12:02:10 PM
> *To:* user 
> *Subject:* Producer Configuration
>
> Hello,
>
> I created a simple Producer and when the job ran, it was getting the
> following error:
> Caused by: org.apache.kafka.common.errors.TimeoutException
>
> I read about increasing the request.timeout.ms.   Thus, I added the
> following properties.
>
> Properties properties = new Properties();
> properties.setProperty("request.timeout.ms", "3");
> properties.setProperty("retries", "20");
> DataStream stream = env.addSource(new SimpleStringGenerator());
> stream.addSink(new FlinkKafkaProducer<>("flink-test", new
> SimpleStringSchema(), properties));
>
> However, after the job is submitted, the User Configuration is empty,
> please see attached.
> Therefore, it seems these properties are taking into effect since I still
> have the same problem.
> Any help on these issues are appreciated, thanks.
>
>


Re: Does the Kafka source perform retractions on Key?

2021-02-27 Thread Rex Fenley
Hi Arvid,

>If you are not rereading the topics, why do you compact them?
We are rereading the topics, at any time we might want a completely
different materialized view for a different web service for some new
application feature. Other jobs / new jobs need to read all the up-to-date
rows from the databases.

>correctness depends on compaction < downtime
I still don't see how this is the case if everything just needs to be
overwritten by primary key. To re-emphasize, we do not care about
historical data.

>Again, a cloud-native key/value store would perform much better and be
much cheaper with better SLAs
Is there a cloud-native key/value store which can read from a Postgres WAL
or MySQL binlog and then keep an up-to-date read marker for any
materialization consumers downstream *besides* Kafka + Debezium?

Appreciate all the feedback, though hopefully we can get closer to the same
mental model. If there's really a better alternative here I'm all for it!


On Sat, Feb 27, 2021 at 11:50 AM Arvid Heise  wrote:

> Hi Rex,
>
> Your initial question was about the impact of compaction on your CDC
> application logic. I have been (unsuccessfully) trying to tell you that you
> do not need compaction and it's counterproductive.
>
> If you are not rereading the topics, why do you compact them? It's lost
> compute time and I/O on the Kafka brokers (which are both very valuable)
> and does not give you anything that an appropriate retention time wouldn't
> give you (=lower SSD usage). It makes the mental model more complicated. An
> aggressive compaction and a larger backlog (compaction time < application
> failure/restart/upgrade time) would lead to incorrect results (in the same
> way an inappropriate retention period may cause data loss for the same
> reason).
>
> The only use case for log compaction is if you're using a Kafka topic for
> a key/value store to serve a web application (in which case, it's usually
> better to take a real key/value store) but then you don't need retractions
> anymore but you'd simply overwrite the actual values or use tombstone
> records for deletions.
>
> If you consume the same topic both for web applications and Flink and
> don't want to use another technology for key/value store, then log
> compaction of retractions kinda makes sense to kill 2 birds with one stone.
> However, you have to live with the downsides on the Flink side (correctness
> depends on compaction < downtime) and on web application (deal with
> retractions even though they do not make any sense at that level). Again, a
> cloud-native key/value store would perform much better and be much cheaper
> with better SLAs and solve all issues on the Flink side (final note: it's
> independent of the technology, any stream processor will encounter the same
> issue as it's a conceptual mismatch).
>
> On Sat, Feb 27, 2021 at 8:24 PM Rex Fenley  wrote:
>
>> Hi Arvid,
>>
>> I really appreciate the thorough response but I don't think this
>> contradicts our use case. In servicing web applications we're doing nothing
>> more than taking data from giant databases we use, and performing joins and
>> denormalizing aggs strictly for performance reasons (joining across a lot
>> of stuff on query time is slow) and putting specified results into another
>> database connected to the specified web server. Our Flink jobs are purely
>> used for up-to-date materialized views. We don't care about historical
>> analysis, we only care about what the exact current state of the world is.
>>
>> This is why every row has a primary key, from beginning to end of the job
>> (even though Flink's table api can't seem to detect that after a lot of
>> joins in our plan, but it's logically true since then the join key will be
>> pk). This is also why all we need to do is retract the current row from the
>> Kafka source on the existing primary key that's being overwritten, have
>> that retract propagate downstream to throw away any data transformed from
>> that row, and then process the new row. We don't care what other data
>> changes may have happened in between, it's not applicable to our use case.
>>
>> We're using CDC for nothing more than a way to get the latest rows in
>> real time into Kafka so they can be read by various Flink jobs we hope to
>> build (starting with the one we're currently working on that has ~35
>> stateful operators) which then just transform and forward to another
>> database.
>>
>> 
>>
>> Reading the Upsert Kafka docs [1] "In the physical operator, we will use
>> state to know whether the key is the first time to be seen. The operator
>> will produce INSERT rows, or additionally generate UPDATE_BEFORE rows for
>> the previous image, or produce DELETE rows with all columns filled with
>> values." This is how we thought the regular Kafka source actually worked,
>> that it had state on PKs it could retract on, because we weren't even
>> thinking of any other use case until it hit me that may not be true.
>> Luckily the doc 

Re: Does the Kafka source perform retractions on Key?

2021-02-27 Thread Arvid Heise
Hi Rex,

Your initial question was about the impact of compaction on your CDC
application logic. I have been (unsuccessfully) trying to tell you that you
do not need compaction and it's counterproductive.

If you are not rereading the topics, why do you compact them? It's lost
compute time and I/O on the Kafka brokers (which are both very valuable)
and does not give you anything that an appropriate retention time wouldn't
give you (=lower SSD usage). It makes the mental model more complicated. An
aggressive compaction and a larger backlog (compaction time < application
failure/restart/upgrade time) would lead to incorrect results (in the same
way an inappropriate retention period may cause data loss for the same
reason).

The only use case for log compaction is if you're using a Kafka topic for a
key/value store to serve a web application (in which case, it's usually
better to take a real key/value store) but then you don't need retractions
anymore but you'd simply overwrite the actual values or use tombstone
records for deletions.

If you consume the same topic both for web applications and Flink and don't
want to use another technology for key/value store, then log compaction of
retractions kinda makes sense to kill 2 birds with one stone. However, you
have to live with the downsides on the Flink side (correctness depends on
compaction < downtime) and on web application (deal with retractions even
though they do not make any sense at that level). Again, a cloud-native
key/value store would perform much better and be much cheaper with better
SLAs and solve all issues on the Flink side (final note: it's independent
of the technology, any stream processor will encounter the same issue as
it's a conceptual mismatch).

On Sat, Feb 27, 2021 at 8:24 PM Rex Fenley  wrote:

> Hi Arvid,
>
> I really appreciate the thorough response but I don't think this
> contradicts our use case. In servicing web applications we're doing nothing
> more than taking data from giant databases we use, and performing joins and
> denormalizing aggs strictly for performance reasons (joining across a lot
> of stuff on query time is slow) and putting specified results into another
> database connected to the specified web server. Our Flink jobs are purely
> used for up-to-date materialized views. We don't care about historical
> analysis, we only care about what the exact current state of the world is.
>
> This is why every row has a primary key, from beginning to end of the job
> (even though Flink's table api can't seem to detect that after a lot of
> joins in our plan, but it's logically true since then the join key will be
> pk). This is also why all we need to do is retract the current row from the
> Kafka source on the existing primary key that's being overwritten, have
> that retract propagate downstream to throw away any data transformed from
> that row, and then process the new row. We don't care what other data
> changes may have happened in between, it's not applicable to our use case.
>
> We're using CDC for nothing more than a way to get the latest rows in real
> time into Kafka so they can be read by various Flink jobs we hope to build
> (starting with the one we're currently working on that has ~35 stateful
> operators) which then just transform and forward to another database.
>
> 
>
> Reading the Upsert Kafka docs [1] "In the physical operator, we will use
> state to know whether the key is the first time to be seen. The operator
> will produce INSERT rows, or additionally generate UPDATE_BEFORE rows for
> the previous image, or produce DELETE rows with all columns filled with
> values." This is how we thought the regular Kafka source actually worked,
> that it had state on PKs it could retract on, because we weren't even
> thinking of any other use case until it hit me that may not be true.
> Luckily the doc also provides an example of simply forwarding from DBZ
> Kafka to Upsert Kafka, even if DBZ Kafka source data is compacted it won't
> matter since now everything in the actual job reading from Upsert Kafka
> should function by PK like we need. On that note, I think it may be helpful
> to edit the documentation to indicate that if you need stateful PK based
> Kafka consumption it must be via Upsert Kafka.
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-149%3A+Introduce+the+upsert-kafka+Connector
>
> Again, thanks for the thorough reply, this really helped my understanding!
>
> On Sat, Feb 27, 2021 at 4:02 AM Arvid Heise  wrote:
>
>> Hi Rex,
>>
>> imho log compaction and CDC for historic processes are incompatible on
>> conceptual level. Let's take this example:
>>
>> topic: party membership
>> +(1, Dem, 2000)
>> -(1, Dem, 2009)
>> +(1, Gop, 2009)
>> Where 1 is the id of a real person.
>>
>> Now, let's consider you want to count memberships retroactively each year.
>> You'd get 2000-2009, 1 Dem and 0 GOP and 2009+ 1 GOP and 0 Dem.
>>
>> Now, consider you have log compaction with a compaction 

Re: Does the Kafka source perform retractions on Key?

2021-02-27 Thread Rex Fenley
Hi Arvid,

I really appreciate the thorough response but I don't think this
contradicts our use case. In servicing web applications we're doing nothing
more than taking data from giant databases we use, and performing joins and
denormalizing aggs strictly for performance reasons (joining across a lot
of stuff on query time is slow) and putting specified results into another
database connected to the specified web server. Our Flink jobs are purely
used for up-to-date materialized views. We don't care about historical
analysis, we only care about what the exact current state of the world is.

This is why every row has a primary key, from beginning to end of the job
(even though Flink's table api can't seem to detect that after a lot of
joins in our plan, but it's logically true since then the join key will be
pk). This is also why all we need to do is retract the current row from the
Kafka source on the existing primary key that's being overwritten, have
that retract propagate downstream to throw away any data transformed from
that row, and then process the new row. We don't care what other data
changes may have happened in between, it's not applicable to our use case.

We're using CDC for nothing more than a way to get the latest rows in real
time into Kafka so they can be read by various Flink jobs we hope to build
(starting with the one we're currently working on that has ~35 stateful
operators) which then just transform and forward to another database.



Reading the Upsert Kafka docs [1] "In the physical operator, we will use
state to know whether the key is the first time to be seen. The operator
will produce INSERT rows, or additionally generate UPDATE_BEFORE rows for
the previous image, or produce DELETE rows with all columns filled with
values." This is how we thought the regular Kafka source actually worked,
that it had state on PKs it could retract on, because we weren't even
thinking of any other use case until it hit me that may not be true.
Luckily the doc also provides an example of simply forwarding from DBZ
Kafka to Upsert Kafka, even if DBZ Kafka source data is compacted it won't
matter since now everything in the actual job reading from Upsert Kafka
should function by PK like we need. On that note, I think it may be helpful
to edit the documentation to indicate that if you need stateful PK based
Kafka consumption it must be via Upsert Kafka.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-149%3A+Introduce+the+upsert-kafka+Connector

Again, thanks for the thorough reply, this really helped my understanding!

On Sat, Feb 27, 2021 at 4:02 AM Arvid Heise  wrote:

> Hi Rex,
>
> imho log compaction and CDC for historic processes are incompatible on
> conceptual level. Let's take this example:
>
> topic: party membership
> +(1, Dem, 2000)
> -(1, Dem, 2009)
> +(1, Gop, 2009)
> Where 1 is the id of a real person.
>
> Now, let's consider you want to count memberships retroactively each year.
> You'd get 2000-2009, 1 Dem and 0 GOP and 2009+ 1 GOP and 0 Dem.
>
> Now, consider you have log compaction with a compaction period <1 year.
> You'd get 2000-2009, 0 Dem and 0 GOP and only the real result for 2009+
> (or in general the time at the latest change).
>
> Let's take another example:
> +(2, Dem, 2000)
> -(2, Dem, 2009)
>
> With log compaction, you'd get -1/0/-1 Dem and 0 GOP for 2009+ depending
> on how well your application can deal with incomplete logs. Let's say your
> application is simply adding and subtracting retractions, you'd get -1. If
> your application is ignoring deletions without insertions (needs to be
> tracked for each person), you'd get 0. If your application is not looking
> at the retraction type, you'd get 1.
>
> As you can see, you need to be really careful to craft your application
> correctly. The correct result will only be achieved through the most
> complex application (aggregating state for each person and dealing with
> incomplete information). This is completely independent of Kafka, Debezium,
> or Flink.
>
> ---
>
> However, as Jan pointed out: If you don't process data before compaction,
> then your application is correct. Now, then the question is what's the
> benefit of having data in the topic older than the compaction? The value is
> close to 0 as you can't really use it for CDC processing (again independent
> of Flink).
>
> Consequently, instead of compaction, I'd go with a lower retention policy
> and offload the data to s3 for historic (re)processing (afaik the cloud
> offering of confluent finally has automatic offloading but you can also
> build it yourself). Then you only need to ensure that your application is
> never accessing data that is deleted because of the retention time. In
> general, it's better to choose a technology such as Pulsar with tiered
> storage that gives you exactly what you want with low overhead: you need
> unlimited retention without compaction but without holding much data in
> expensive storage (SSD) by offloading 

Re: Producer Configuration

2021-02-27 Thread Alexey Trenikhun
Can you produce messages using Kafka console producer connect using same 
properties ?


From: Claude M 
Sent: Saturday, February 27, 2021 8:05 AM
To: Alexey Trenikhun 
Cc: user 
Subject: Re: Producer Configuration

Thanks for your reply, yes it was specified.  Sorry I forgot to include it:
 properties.setProperty("bootstrap.servers", "localhost:9092");

On Fri, Feb 26, 2021 at 7:56 PM Alexey Trenikhun 
mailto:yen...@msn.com>> wrote:
I believe bootstrap.servers is mandatory Kafka property, but it looks like you 
didn’t set it


From: Claude M mailto:claudemur...@gmail.com>>
Sent: Friday, February 26, 2021 12:02:10 PM
To: user mailto:user@flink.apache.org>>
Subject: Producer Configuration

Hello,

I created a simple Producer and when the job ran, it was getting the following 
error:
Caused by: org.apache.kafka.common.errors.TimeoutException

I read about increasing the request.timeout.ms.   
Thus, I added the following properties.

Properties properties = new Properties();
properties.setProperty("request.timeout.ms", 
"3");
properties.setProperty("retries", "20");
DataStream stream = env.addSource(new SimpleStringGenerator());
stream.addSink(new FlinkKafkaProducer<>("flink-test", new SimpleStringSchema(), 
properties));

However, after the job is submitted, the User Configuration is empty, please 
see attached.
Therefore, it seems these properties are taking into effect since I still have 
the same problem.
Any help on these issues are appreciated, thanks.


Job downgrade

2021-02-27 Thread Alexey Trenikhun
Hello,
Let's have version 1 of my job uses keyed state with name "a" and type A, which 
some Avro generated class. Then I upgrade to version 2, which in addition uses 
keyed state "b" and type B (another concrete Avro generated class), I take 
savepoint with version 2 and decided to downgrade to version 1 and start with 
taken savepoint, can I do it? On one hand, version 1 doesn't have state "b", 
but seems Flink still tries to create call restoreSerializer​ and it tries to 
read runtimeType (`class B`) which is not available in version 1

Thanks,
Alexey


Stateful functions 2.2 and stop with savepoint

2021-02-27 Thread Meissner, Dylan
I have an embedded function with a SinkFunction as an egress, implemented as 
this pseudo-code:

val serializationSchema = KafkaSchemaSerializationSchema(... props required to 
use a Confluent Schema Registry with Avro, auth etc ...)
return SinkFunctionSpec(EGRESS_ID, FlinkKafkaProducer(serializationSchema, 
props, AT_LEAST_ONCE))

Checkpointing and taking a savepoint without stopping work as expected.

However, when I run "flink stop " or even "flink stop --drain 
", the operation never completes, reporting IN_PROGRESS until I hit the 
"failure-cause: org.apache.flink.runtime.checkpoint.CheckpointException: 
Checkpoint expired before completing" CompletedException.

In the "Checkpoint History" it shows only 2 of my 3 operators completed their 
work:

Source: my-ingress-ingress -> router (my-ingress) | acknowledge: 1/1 (100%) | 
end-to-end duration: 638ms | data-size 1.38 KB
feedback-union -> functions -> Sink: my-egress-egress | acknowledge 0/1 0% | 
end-to-end duration: n/a | data-size: n/a
feedback | acknowledge: 1/1 (100%) | end-to-end duration: 626ms | data-size: 0 B

I've been unable to gain any insights from logs so far. Thoughts?


Re: Handling Data Separation / Watermarking from Kafka in Flink

2021-02-27 Thread Rion Williams
Thanks David,

I figured that the correct approach would obviously be to adopt a keying 
strategy upstream to ensure the same data that I used as a key downstream fell 
on the same partition (ensuring the ordering guarantees I’m looking for).

I’m guessing implementation-wise, when I would normally evict a window after 
some event time and allowed lateness, I could set a timer or just explicitly 
keep the window open for some additional time to allow for out of order data to 
make its way into the window.

Either way - I think the keying is probably the right approach, but I wanted to 
consider any other options should that become an issue upstream.

Thanks!

Rion

> On Feb 27, 2021, at 10:21 AM, David Anderson  wrote:
> 
> 
> Rion,
> 
> If you can arrange for each tenant's events to be in only one kafka 
> partition, that should be the best way to simplify the processing you need to 
> do. Otherwise, a simple change that may help would be to increase the bounded 
> delay you use in calculating your own per-tenant watermarks, thereby making 
> late events less likely.
> 
> David
> 
>> On Sat, Feb 27, 2021 at 3:29 AM Rion Williams  wrote:
>> David and Timo,
>> 
>> Firstly, thank you both so much for your contributions and advice. I believe 
>> I’ve implemented things along the lines that you both detailed and things 
>> appear to work just as expected (e.g. I can see things arriving, being added 
>> to windows, discarding late records, and ultimately writing out files as 
>> expected).
>> 
>> With that said, I have one question / issue that I’ve run into with handling 
>> the data coming my Kafka topic. Currently, my tenant/source (i.e. my key) 
>> may be distributed across the 10 partitions of my Kafka topic. With the way 
>> that I’m consuming from this topic (with a Kafka Consumer), it looks like my 
>> data is arriving in a mixed order which seems to be causing my own 
>> watermarks (those stored in my ValueState) to process as later data may 
>> arrive earlier than other data and cause my windows to be evicted.
>> 
>> I’m currently using the `withNoWatermarks()` along with a custom timestamp 
>> assigned to handle all of my timestamping, but is there a mechanism to 
>> handle the mixed ordering across partitions in this scenario at the Flink 
>> level?
>> 
>> I know the answer here likely lies with Kafka and adopting a better keying 
>> strategy to ensure the same tenant/source (my key) lands on the same 
>> partition, which by definition ensures ordering. I’m just wondering if 
>> there’s some mechanism to accomplish this post-reading from Kafka in Flink 
>> within my pipeline to handle things in a similar fashion?
>> 
>> Again - thank you both so much, I’m loving the granularity and control that 
>> Flink has been providing me over other streaming technologies I’ve used in 
>> the past. I’m totally sold on it and am looking forward to doing more 
>> incredible things with it.
>> 
>> Best regards,
>> 
>> Rion
>> 
 On Feb 26, 2021, at 4:36 AM, David Anderson  wrote:
 
>>> 
>>> Yes indeed, Timo is correct -- I am proposing that you not use timers at 
>>> all. Watermarks and event-time timers go hand in hand -- and neither 
>>> mechanism can satisfy your requirements.
>>> 
>>> You can instead put all of the timing logic in the processElement method -- 
>>> effectively emulating what you would get if Flink were to offer per-key 
>>> watermarking.
>>> 
>>> The reason that the PseudoWindow example is using MapState is that for each 
>>> key/tenant, more than one window can be active simultaneously. This occurs 
>>> because the event stream is out-of-order with respect to time, so events 
>>> for the "next window" are probably being processed before "the previous" 
>>> window is complete. And if you want to accommodate allowed lateness, the 
>>> requirement to have several windows open at once becomes even more 
>>> important. 
>>> 
>>> MapState gives you a per-tenant hashmap, where each entry in that map 
>>> corresponds to an open window for some particular tenant, where the map's 
>>> key is the timestamp for a window, and the value is whatever state you want 
>>> that window to hold.
>>> 
>>> Best regards,
>>> David
>>> 
>>> 
>>> 
>>> 
 On Fri, Feb 26, 2021 at 9:44 AM Timo Walther  wrote:
 Hi Rion,
 
 I think what David was refering to is that you do the entire time 
 handling yourself in process function. That means not using the 
 `context.timerService()` or `onTimer()` that Flink provides but calling 
 your own logic based on the timestamps that enter your process function 
 and the stored state.
 
 Regards,
 Timo
 
 
 On 26.02.21 00:29, Rion Williams wrote:
 > 
 > Hi David,
 > 
 > Thanks for your prompt reply, it was very helpful and the PseudoWindow 
 > example is excellent. I believe it closely aligns with an approach that 
 > I was tinkering with but seemed to be missing a few key pieces. In my 
 > 

Re: Handling Data Separation / Watermarking from Kafka in Flink

2021-02-27 Thread David Anderson
Rion,

If you can arrange for each tenant's events to be in only one kafka
partition, that should be the best way to simplify the processing you need
to do. Otherwise, a simple change that may help would be to increase the
bounded delay you use in calculating your own per-tenant watermarks,
thereby making late events less likely.

David

On Sat, Feb 27, 2021 at 3:29 AM Rion Williams  wrote:

> David and Timo,
>
> Firstly, thank you both so much for your contributions and advice. I
> believe I’ve implemented things along the lines that you both detailed and
> things appear to work just as expected (e.g. I can see things arriving,
> being added to windows, discarding late records, and ultimately writing out
> files as expected).
>
> With that said, I have one question / issue that I’ve run into with
> handling the data coming my Kafka topic. Currently, my tenant/source (i.e.
> my key) may be distributed across the 10 partitions of my Kafka topic. With
> the way that I’m consuming from this topic (with a Kafka Consumer), it
> looks like my data is arriving in a mixed order which seems to be causing
> my own watermarks (those stored in my ValueState) to process as later data
> may arrive earlier than other data and cause my windows to be evicted.
>
> I’m currently using the `withNoWatermarks()` along with a custom timestamp
> assigned to handle all of my timestamping, but is there a mechanism to
> handle the mixed ordering across partitions in this scenario at the Flink
> level?
>
> I know the answer here likely lies with Kafka and adopting a better keying
> strategy to ensure the same tenant/source (my key) lands on the same
> partition, which by definition ensures ordering. I’m just wondering if
> there’s some mechanism to accomplish this post-reading from Kafka in Flink
> within my pipeline to handle things in a similar fashion?
>
> Again - thank you both so much, I’m loving the granularity and control
> that Flink has been providing me over other streaming technologies I’ve
> used in the past. I’m totally sold on it and am looking forward to doing
> more incredible things with it.
>
> Best regards,
>
> Rion
>
> On Feb 26, 2021, at 4:36 AM, David Anderson  wrote:
>
> 
> Yes indeed, Timo is correct -- I am proposing that you not use timers at
> all. Watermarks and event-time timers go hand in hand -- and neither
> mechanism can satisfy your requirements.
>
> You can instead put all of the timing logic in the processElement method
> -- effectively emulating what you would get if Flink were to offer per-key
> watermarking.
>
> The reason that the PseudoWindow example is using MapState is that for
> each key/tenant, more than one window can be active simultaneously. This
> occurs because the event stream is out-of-order with respect to time, so
> events for the "next window" are probably being processed before "the
> previous" window is complete. And if you want to accommodate allowed
> lateness, the requirement to have several windows open at once becomes even
> more important.
>
> MapState gives you a per-tenant hashmap, where each entry in that map
> corresponds to an open window for some particular tenant, where the map's
> key is the timestamp for a window, and the value is whatever state you want
> that window to hold.
>
> Best regards,
> David
>
>
>
>
> On Fri, Feb 26, 2021 at 9:44 AM Timo Walther  wrote:
>
>> Hi Rion,
>>
>> I think what David was refering to is that you do the entire time
>> handling yourself in process function. That means not using the
>> `context.timerService()` or `onTimer()` that Flink provides but calling
>> your own logic based on the timestamps that enter your process function
>> and the stored state.
>>
>> Regards,
>> Timo
>>
>>
>> On 26.02.21 00:29, Rion Williams wrote:
>> > 
>> > Hi David,
>> >
>> > Thanks for your prompt reply, it was very helpful and the PseudoWindow
>> > example is excellent. I believe it closely aligns with an approach that
>> > I was tinkering with but seemed to be missing a few key pieces. In my
>> > case, I'm essentially going to want to be aggregating the messages that
>> > are coming into the window (a simple string-concatenation aggregation
>> > would work). Would I need another form of state to hold that, as
>> looking
>> > through this example with naive eyes, it seems that this function is
>> > currently storing multiple windows in state via the MapState provided:
>> >
>> > // Keyed, managed state, with an entry for each window, keyed by the
>> > window's end time.
>> > // There is a separate MapState object for each driver.
>> > private transient MapState sumOfTips;
>> >
>> > If I wanted to perform an aggregation for each key/tenant, would a
>> > MapState be appropriate? Such as a MapState if I was
>> doing
>> > a string aggregation, so that within my processElement function I could
>> > use something similar for building these aggregations and ultimately
>> > triggering them:
>> >
>> > // Keep track of a tenant/source specific watermark
>> > 

Re: Producer Configuration

2021-02-27 Thread Claude M
Thanks for your reply, yes it was specified.  Sorry I forgot to include it:
 properties.setProperty("bootstrap.servers", "localhost:9092");

On Fri, Feb 26, 2021 at 7:56 PM Alexey Trenikhun  wrote:

> I believe bootstrap.servers is mandatory Kafka property, but it looks like
> you didn’t set it
>
> --
> *From:* Claude M 
> *Sent:* Friday, February 26, 2021 12:02:10 PM
> *To:* user 
> *Subject:* Producer Configuration
>
> Hello,
>
> I created a simple Producer and when the job ran, it was getting the
> following error:
> Caused by: org.apache.kafka.common.errors.TimeoutException
>
> I read about increasing the request.timeout.ms.   Thus, I added the
> following properties.
>
> Properties properties = new Properties();
> properties.setProperty("request.timeout.ms", "3");
> properties.setProperty("retries", "20");
> DataStream stream = env.addSource(new SimpleStringGenerator());
> stream.addSink(new FlinkKafkaProducer<>("flink-test", new
> SimpleStringSchema(), properties));
>
> However, after the job is submitted, the User Configuration is empty,
> please see attached.
> Therefore, it seems these properties are taking into effect since I still
> have the same problem.
> Any help on these issues are appreciated, thanks.
>


Using Prometheus Client Metrics in Flink

2021-02-27 Thread Rion Williams
Hi folks,

I’ve just recently started working with Flink and I was in the process of 
adding some metrics through my existing pipeline with the hopes of building 
some Grafana dashboards with them to help with observability.

Initially I looked at the built-in Flink metrics that were available, but I 
didn’t see an easy mechanism for setting/using labels with them. Essentially, I 
have two properties for my messages coming through the pipeline that I’d like 
to be able to keep track of (tenant/source) across several metrics (e.g. 
total_messages with tenant / source labels, etc.). I didn’t see an easy way to 
adjust this out of the box, or wasn’t aware of a good pattern for handling 
these.

I had previously used the Prometheus Client metrics [0] to accomplish this in 
the past but I wasn’t entirely sure how it would/could mesh with Flink. Does 
anyone have experience in working with these or know if they are supported?

Secondly, when using the Flink metrics, I noticed I was receiving a separate 
metric for each task that was being spun up. Is there an “easy button” to 
handle aggregating these to ensure that a single metric (e.g. total_messages) 
reflects the total processed across all of the tasks instead of each individual 
one?

Any recommendations / resources / advice would be greatly appreciated!

Thanks,

Rion

[0] : https://prometheus.io/docs/instrumenting/clientlibs/

Re: Does the Kafka source perform retractions on Key?

2021-02-27 Thread Arvid Heise
Hi Rex,

imho log compaction and CDC for historic processes are incompatible on
conceptual level. Let's take this example:

topic: party membership
+(1, Dem, 2000)
-(1, Dem, 2009)
+(1, Gop, 2009)
Where 1 is the id of a real person.

Now, let's consider you want to count memberships retroactively each year.
You'd get 2000-2009, 1 Dem and 0 GOP and 2009+ 1 GOP and 0 Dem.

Now, consider you have log compaction with a compaction period <1 year.
You'd get 2000-2009, 0 Dem and 0 GOP and only the real result for 2009+ (or
in general the time at the latest change).

Let's take another example:
+(2, Dem, 2000)
-(2, Dem, 2009)

With log compaction, you'd get -1/0/-1 Dem and 0 GOP for 2009+ depending on
how well your application can deal with incomplete logs. Let's say your
application is simply adding and subtracting retractions, you'd get -1. If
your application is ignoring deletions without insertions (needs to be
tracked for each person), you'd get 0. If your application is not looking
at the retraction type, you'd get 1.

As you can see, you need to be really careful to craft your application
correctly. The correct result will only be achieved through the most
complex application (aggregating state for each person and dealing with
incomplete information). This is completely independent of Kafka, Debezium,
or Flink.

---

However, as Jan pointed out: If you don't process data before compaction,
then your application is correct. Now, then the question is what's the
benefit of having data in the topic older than the compaction? The value is
close to 0 as you can't really use it for CDC processing (again independent
of Flink).

Consequently, instead of compaction, I'd go with a lower retention policy
and offload the data to s3 for historic (re)processing (afaik the cloud
offering of confluent finally has automatic offloading but you can also
build it yourself). Then you only need to ensure that your application is
never accessing data that is deleted because of the retention time. In
general, it's better to choose a technology such as Pulsar with tiered
storage that gives you exactly what you want with low overhead: you need
unlimited retention without compaction but without holding much data in
expensive storage (SSD) by offloading automatically to cold storage.

If this is not working for you, then please share your requirements with me
why you'd need compaction + a different retention for source/intermediate
topics.

For the final topic, from my experience, a real key/value store works much
better than log compacted topics for serving web applications. Confluent's
marketing is strongly pushing that Kafka can be used as a database and as a
key/value store while in reality, it's "just" a good distribution log. I
can provide pointers that discuss the limitations if there is interest.
Also note that the final topic should not be in CDC format anymore (so no
retractions). It should just contain the current state. For both examples
together it would be
1, Gop, 2009
and no record for person 2.


On Sat, Feb 27, 2021 at 3:33 AM Rex Fenley  wrote:

> Digging around, it looks like Upsert Kafka which requires a Primary Key
> will actually do what I want and uses compaction, but it doesn't look
> compatible with Debezium format? Is this on the roadmap?
>
> In the meantime, we're considering consuming from Debezium Kafka (still
> compacted) and then writing directly to an Upsert Kafka sink and then
> reading right back out of a corresponding Upsert Kafka source. Since that
> little roundabout will key all changes by primary key it should give us a
> compacted topic to start with initially. Once we get that working we can
> probably do the same thing with intermediate flink jobs too.
>
> Would appreciate any feedback on this approach, thanks!
>
> On Fri, Feb 26, 2021 at 10:52 AM Rex Fenley  wrote:
>
>> Does this also imply that it's not safe to compact the initial topic
>> where data is coming from Debezium? I'd think that Flink's Kafka source
>> would emit retractions on any existing data with a primary key as new data
>> with the same pk arrived (in our case all data has primary keys). I guess
>> that goes back to my original question still however, is this not what the
>> Kafka source does? Is there no way to make that happen?
>>
>> We really can't live with the record amplification, it's sometimes
>> nonlinear and randomly kills RocksDB performance.
>>
>> On Fri, Feb 26, 2021 at 2:16 AM Arvid Heise  wrote:
>>
>>> Just to clarify, intermediate topics should in most cases not be
>>> compacted for exactly the reasons if your application depends on all
>>> intermediate data. For the final topic, it makes sense. If you also consume
>>> intermediate topics for web application, one solution is to split it into
>>> two topics (like topic-raw for Flink and topic-compacted for applications)
>>> and live with some amplification.
>>>
>>> On Thu, Feb 25, 2021 at 12:11 AM Rex Fenley  wrote:
>>>
 All of our Flink jobs are 

Re:Re: Re: Flink SQL 应用情况请教

2021-02-27 Thread 邮件帮助中心
感谢您的答复,刚才看到您的答复后,紧急远程连接跑了下,stdout还真的有数据出来了,周一上班时间再好好测试下,万分感谢!














在 2021-02-27 19:08:25,"xg...@126.com"  写道:
>1503,61,15811,1614405166858
>1504,61,15813,1614405333871
>1505,61,15814,1614405544862
>1506,61,15814,1614405673863
>就这几条数据,并行度设置为1
>
>
> 
>发件人: yinghua...@163.com
>发送时间: 2021-02-27 14:23
>收件人: user-zh
>主题: Re: Flink SQL 应用情况请教
>这个问题不知道是不是这个原因导致的,我在Flink的webUI监控界面source和sink任务中都没看到watermark的值,其中source的Watermarks显示No
> Data,sink显示的是No Watermark
>我的SQL语句如下:
>CREATE TABLE t_stock_match_p_1(
>  id VARCHAR, 
>  stkcode INT,
>  volume INT,
>  matchtime BIGINT,
>  ts as TO_TIMESTAMP(FROM_UNIXTIME(matchtime/1000,'-MM-dd HH:mm:ss')),
>  WATERMARK  FOR ts AS ts - INTERVAL '1' SECOND
>) WITH (
>  'connector' = 'kafka-0.10',
>  'topic' = 'stock_match_p_zyh',
>  'scan.startup.mode' = 'latest-offset',
>  'properties.group.id' = 'stock_match_p_zyh',
>  'properties.bootstrap.servers' = 'sdp-10-88-100-101:6668',
>  'properties.security.protocol' = 'SASL_SDP',
>  'properties.sasl.mechanism' = 'SDP',
>  'properties.key.deserializer' = 
> 'org.apache.kafka.common.serialization.LongDeserializer',
>  'properties.value.deserializer' = 
> 'org.apache.kafka.common.serialization.StringDeserializer',
>  'format' = 'csv',
>  'csv.field-delimiter' = ','
>);
> 
>CREATE TABLE t_stock_match_1 (
>  stkcode int,
>  pd TIMESTAMP,
>  volume  INT 
>) WITH (
>'connector' = 'print'
>);
> 
>INSERT INTO t_stock_match_1 SELECT stkcode,TUMBLE_END(ts, INTERVAL '1' MINUTE) 
>as pd, sum(volume) FROM t_stock_match_p_1 GROUP BY TUMBLE(ts, INTERVAL '1' 
>MINUTE),stkcode; 
> 
>然后kafka的stock_match_p_zyh的topic中我手工输入了如下几条数据,Flink的WebUI的任务的stdout中没有数据输出,日志中也无错误产生,不知道问题发生在什么地方?
>1503,61,15811,1614405166858
>1504,61,15813,1614405333871
>1505,61,15814,1614405544862
>1506,61,15814,1614405673863
> 
> 
> 
> 
> 
>> 在 2021年2月26日,15:02,Smile  写道:
>> 
>> 你好,
>> 
>> 关于指标的问题,可以进到具体的算子里面的 Metrics 页面看看每个算子的 numRecordsIn 和
>> numRecordsOut,看是哪个算子开始有输入没输出的。
>> 上面贴的指标看起来是 Overview 页面上的,这个地方展示的指标是对整个 Chain 起来的整体算的。
>> 
>> GroupWindowAggregate(groupBy=[stkcode], window=[TumblingGroupWindow('w$,
>> matchtime, 6)], properties=[w$start, w$end, w$rowtime, w$proctime],
>> select=[stkcode, SUM(volume) AS EXPR$2, start('w$) AS w$start, end('w$) AS
>> w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
>> -> Calc(select=[stkcode, CAST(w$end) AS pd, EXPR$2 AS volume])
>> -> Sink: Sink(table=[cpp_flink_catalog.default.t_stock_match_1],
>> fields=[stkcode, pd, volume])
>> 
>> 对于这一串而言,Records Received 实际上就是 GroupWindowAggregate 这个算子接收到的数据条数,Records
>> Sent 是 Sink 下发的数据条数,但是注意这个“下发”并不是指 invoke 方法执行的次数,而是类似 collector.collect()
>> 这种在 Flink 框架内上下游的发送。所以 sink 算子的 Records Sent 应该一直是 0,无论是否成功写数据到 MySQL
>> 了。一般我们更关心的是 Sink 算子有没有 numRecordsIn(Records Received),如果有,说明上游算子没问题,是写 MySQL
>> 失败了,可以检查下写 MySQL 那部分的配置信息;如果 Sink 算子 numRecordsIn(Records Received) 也是
>> 0,那说明是上游有问题,没发数据到 Sink 上,可以依次检查再之前的算子的情况。
>> 
>> 比如这个是我的作业的截图(第一次插图片不知道能不能显示出来,不能的话我再想办法……),Sink 是 print,一直有在打印数据,但是 overview
>> 页面指标一直是 0 的。
>>  
>>  
>> 
>> 祝好~
>> Smile
>> 
>> 
>> 
>> 
>> 
>> 
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Re: Flink SQL 应用情况请教

2021-02-27 Thread xg...@126.com
1503,61,15811,1614405166858
1504,61,15813,1614405333871
1505,61,15814,1614405544862
1506,61,15814,1614405673863
就这几条数据,并行度设置为1


 
发件人: yinghua...@163.com
发送时间: 2021-02-27 14:23
收件人: user-zh
主题: Re: Flink SQL 应用情况请教
这个问题不知道是不是这个原因导致的,我在Flink的webUI监控界面source和sink任务中都没看到watermark的值,其中source的Watermarks显示No
 Data,sink显示的是No Watermark
我的SQL语句如下:
CREATE TABLE t_stock_match_p_1(
  id VARCHAR, 
  stkcode INT,
  volume INT,
  matchtime BIGINT,
  ts as TO_TIMESTAMP(FROM_UNIXTIME(matchtime/1000,'-MM-dd HH:mm:ss')),
  WATERMARK  FOR ts AS ts - INTERVAL '1' SECOND
) WITH (
  'connector' = 'kafka-0.10',
  'topic' = 'stock_match_p_zyh',
  'scan.startup.mode' = 'latest-offset',
  'properties.group.id' = 'stock_match_p_zyh',
  'properties.bootstrap.servers' = 'sdp-10-88-100-101:6668',
  'properties.security.protocol' = 'SASL_SDP',
  'properties.sasl.mechanism' = 'SDP',
  'properties.key.deserializer' = 
'org.apache.kafka.common.serialization.LongDeserializer',
  'properties.value.deserializer' = 
'org.apache.kafka.common.serialization.StringDeserializer',
  'format' = 'csv',
  'csv.field-delimiter' = ','
);
 
CREATE TABLE t_stock_match_1 (
  stkcode int,
  pd TIMESTAMP,
  volume  INT 
) WITH (
'connector' = 'print'
);
 
INSERT INTO t_stock_match_1 SELECT stkcode,TUMBLE_END(ts, INTERVAL '1' MINUTE) 
as pd, sum(volume) FROM t_stock_match_p_1 GROUP BY TUMBLE(ts, INTERVAL '1' 
MINUTE),stkcode; 
 
然后kafka的stock_match_p_zyh的topic中我手工输入了如下几条数据,Flink的WebUI的任务的stdout中没有数据输出,日志中也无错误产生,不知道问题发生在什么地方?
1503,61,15811,1614405166858
1504,61,15813,1614405333871
1505,61,15814,1614405544862
1506,61,15814,1614405673863
 
 
 
 
 
> 在 2021年2月26日,15:02,Smile  写道:
> 
> 你好,
> 
> 关于指标的问题,可以进到具体的算子里面的 Metrics 页面看看每个算子的 numRecordsIn 和
> numRecordsOut,看是哪个算子开始有输入没输出的。
> 上面贴的指标看起来是 Overview 页面上的,这个地方展示的指标是对整个 Chain 起来的整体算的。
> 
> GroupWindowAggregate(groupBy=[stkcode], window=[TumblingGroupWindow('w$,
> matchtime, 6)], properties=[w$start, w$end, w$rowtime, w$proctime],
> select=[stkcode, SUM(volume) AS EXPR$2, start('w$) AS w$start, end('w$) AS
> w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
> -> Calc(select=[stkcode, CAST(w$end) AS pd, EXPR$2 AS volume])
> -> Sink: Sink(table=[cpp_flink_catalog.default.t_stock_match_1],
> fields=[stkcode, pd, volume])
> 
> 对于这一串而言,Records Received 实际上就是 GroupWindowAggregate 这个算子接收到的数据条数,Records
> Sent 是 Sink 下发的数据条数,但是注意这个“下发”并不是指 invoke 方法执行的次数,而是类似 collector.collect()
> 这种在 Flink 框架内上下游的发送。所以 sink 算子的 Records Sent 应该一直是 0,无论是否成功写数据到 MySQL
> 了。一般我们更关心的是 Sink 算子有没有 numRecordsIn(Records Received),如果有,说明上游算子没问题,是写 MySQL
> 失败了,可以检查下写 MySQL 那部分的配置信息;如果 Sink 算子 numRecordsIn(Records Received) 也是
> 0,那说明是上游有问题,没发数据到 Sink 上,可以依次检查再之前的算子的情况。
> 
> 比如这个是我的作业的截图(第一次插图片不知道能不能显示出来,不能的话我再想办法……),Sink 是 print,一直有在打印数据,但是 overview
> 页面指标一直是 0 的。
>  
>  
> 
> 祝好~
> Smile
> 
> 
> 
> 
> 
> 
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


回复:Flink checkpoint 速度慢问题请教

2021-02-27 Thread 熊云昆
你的checkpoint是用rocksdb还是filesystem?磁盘不同确实会影响checkpoint速度的


| |
熊云昆
|
|
邮箱:xiongyun...@163.com
|

签名由 网易邮箱大师 定制

在2021年02月27日 14:14,Jacob 写道:
Hi All,

生产环境有一个Job,在hadoopA集群运行稳定正常,checkpoint速度也很快(checkpoint间隔时间是30s,每一个checkpoint大小几十kb,做一次checkpoint耗时为毫秒级别)

相同的job,代码没有任何变化,将job迁移到另一个hadoopB集群,checkpoint就非常慢,做一次耗时10几分钟,导致job运行瘫痪,大部分时间和资源都在做checkpoint,而没有处理我们的业务逻辑。
  

   
目前我所了解到的这两个hadoop集群唯一不同的是,A集群是SSD,B集群机器是机械硬盘。


job的checkpoint存储是在hdfs,是否是因为磁盘性能问题,导致B集群checkpoint速度过慢呢?是否应该使用内存作为checkpoint存储?请指教。



-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/