Re: I/O reactor status: STOPPED after moving to elasticsearch7 connector

2021-10-15 Thread Oran Shuster
The cluster is not really overloaded and also couldn't find some ES errors logs 
(atleast something that is repeating)
The job IS very busy (100% on the sink and backpressure) but I think its due to 
the reactor exceptions being thrown

what do you mean by incorrect batching?

On 2021/10/13 07:43:38, Itamar Syn-Hershko  wrote: 
> Hi Oran, can you check your ES logs / metrics?
> 
> Most issues we see with the ES sink are around incorrect batching and/or
> overloaded clusters. Could it be your ES write queue is building up?
> 
> On Wed, Oct 13, 2021 at 1:06 AM Oran Shuster  wrote:
> 
> > Flink version 1.13.1
> > ES Version 7.12.0
> > Flink deployment type - session cluster
> > Scala version 2.12
> >
> > Same job used to run with elasticsearch6 connector with about the same
> > load and had no issues
> > Since moving to elasticsearch7 more and more exceptions were being thrown
> > about reactor being stopped in our logs
> >
> > [IllegalStateException] with message [Request cannot be executed; I/O
> > reactor status: STOPPED] with rest status -1
> >
> > Looking online for this type of error gave me some results from people
> > instantiating the ES client on their own which is irrelevant for our case.
> > Also found that it might be caused by an uncaught exception in the failure
> > handler, so i wrapped all the code in try..catch but found no uncaught
> > exceptions
> >
> > Next thing I tried was to force using the updated version of ES rest
> > client with the same version as my cluster 7.12. That didn't seem to fix
> > any issues
> >
> > Problem is that once this happens it does not fix itself. That instance of
> > the sink will continuously get those exceptions until we restart the job. I
> > tried to look for any logs before that but couldn't find anything
> >
> 
> 
> -- 
> [image: logo] 
> Itamar Syn-Hershko
> CTO, Founder
> Email: ita...@bigdataboutique.com
> Website: https://bigdataboutique.com
>    
> 
> 
> 


Re: Any issues with reinterpretAsKeyedStream when scaling partitions?

2021-10-15 Thread Dan Hill
Thanks Thias and JING ZHANG!

Here's a Google drive folder link

with the execution plan and two screenshots from the job graph.

I'm guessing I violate the "The second operator needs to be single-input
(i.e. no TwoInputOp nor union() before)" part.

After I do a transformation on a KeyedStream (so it goes back to a
SingleOutputStreamOperator), even if I do a simple map, it usually
disallows operator chaining.  Even with reinterpretAsKeyedStream, it
doesn't work.


On Fri, Oct 15, 2021 at 12:34 AM JING ZHANG  wrote:

> Hi Dan,
> Sorry for tipos,  I meant to provide the code to reproduce the problem. If
> the current program is complex and secret, maybe you could try to simplify
> the code.
> Besides, Matthias's guess is very reasonable. Could you please whether is
> there network shuffle between your two operators. Were those two
> operators chained into one vertex?
>
> Best,
> JING ZHANG
>
> Schwalbe Matthias  于2021年10月15日周五 下午2:57写道:
>
>> … didn’t mean to hit the send button so soon 😊
>>
>>
>>
>> I guess we are getting closer to a solution
>>
>>
>>
>>
>>
>> Thias
>>
>>
>>
>>
>>
>>
>>
>> *From:* Schwalbe Matthias
>> *Sent:* Freitag, 15. Oktober 2021 08:49
>> *To:* 'Dan Hill' ; user 
>> *Subject:* RE: Any issues with reinterpretAsKeyedStream when scaling
>> partitions?
>>
>>
>>
>> Hi Dan again 😊,
>>
>>
>>
>> I shed a second look … from what I see from your call stack I conclude
>> that indeed you have a network shuffle between your two operators,
>>
>> In which case reinterpretAsKeyedStream wouldn’t work
>>
>>
>>
>> ($StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:277
>> indicates that the two operators are not chained)
>>
>>
>>
>>
>>
>> … just as a double-check could you please share both your
>>
>>- Execution plan (call println(env.getExecutionPlan) right before
>>your call env.execute) (json), and
>>- Your job plan (screenshot from flink dashboard)
>>
>>
>>
>> There is a number of preconditions before two operators get chained, and
>> probably one of them fails (see [1]):
>>
>>- The two operators need to allow chaining the resp. other (see [2] …
>>chaining strategy)
>>- We need a ForwardPartitioner in between
>>- We need to be in streaming mode
>>- Both operators need the same parallelism
>>- Chaining needs to be enabled for the streaming environment
>>- The second operator needs to be single-input (i.e. no TwoInputOp
>>nor union() before)
>>
>>
>>
>>
>>
>> [1]
>> https://github.com/apache/flink/blob/2dabdd95c15ccae2a97a0e898d1acfc958a0f7f3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java#L861-L873
>>
>> [2]
>> https://github.com/apache/flink/blob/2dabdd95c15ccae2a97a0e898d1acfc958a0f7f3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java#L903-L932
>>
>>
>>
>>
>>
>> *From:* Dan Hill 
>> *Sent:* Donnerstag, 14. Oktober 2021 17:50
>> *To:* user 
>> *Subject:* Any issues with reinterpretAsKeyedStream when scaling
>> partitions?
>>
>>
>>
>> I have a job that uses reinterpretAsKeyedStream across a simple map to
>> avoid a shuffle.  When changing the number of partitions, I'm hitting an
>> issue with registerEventTimeTimer complaining that "key group from 110 to
>> 119 does not contain 186".  I'm using Flink v1.12.3.
>>
>>
>>
>> Any thoughts on this?  I don't know if there is a known issue
>> with reinterpretAsKeyedStream.
>>
>>
>>
>> Rough steps:
>>
>> 1. I have a raw input stream of View records.  I keyBy the View using
>> Tuple2(platform_id, log_user_id).
>>
>> 2. I do a small transformation of View to a TinyView.  I
>> reinterpretAsKeyedStream the TinyView as a KeyedStream with the same key.
>> The keys are the same.
>>
>> 3. I use the TinyView in a KeyedCoProcessFunction.
>>
>>
>>
>> When I savepoint and start again with a different number of partitions,
>> my KeyedCoProcessFunction hits an issue with registerEventTimeTimer and
>> complains that "key group from 110 to 119 does not contain 186".  I
>> verified that the key does not change and that we use Tuple2 with
>> primitives Long and String.
>>
>>
>>
>>
>>
>>
>>
>> 2021-10-14 08:17:07
>>
>> java.lang.IllegalArgumentException: view x insertion issue with
>> registerEventTimeTimer for key=(120,3bfd5b19-9d86-4455-a5a1-480f8596a174),
>> flat=platform_id: 120
>>
>> log_user_id: "3bfd5b19-9d86-4455-a5a1-480f8596a174"
>>
>> log_timestamp: 1634224329606
>>
>> view_id: "8fcdf922-7c79-4902-9778-3f20f39b0bc2"
>>
>>
>>
>> at
>> ai.promoted.metrics.logprocessor.common.functions.inferred.BaseInferred.processElement1(BaseInferred.java:318)
>>
>> at
>> ai.promoted.metrics.logprocessor.common.functions.inferred.BaseInferred.processElement1(BaseInferred.java:59)
>>
>> at
>> ai.promoted.metrics.logprocessor.common.functions.LogSlowOnTimer.processElement1(LogSlowOnTi

Programmatically configuring S3 settings

2021-10-15 Thread Pavel Penkov
Apparently Flink 1.14.0 doesn't correctly translate S3 options when they
are set programmatically. I'm creating a local environment like this to
connect to local MinIO instance:

  val flinkConf = new Configuration()
  flinkConf.setString("s3.endpoint", "http://127.0.0.1:9000";)
  flinkConf.setString("s3.aws.credentials.provider",
"org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider")

  val env = 
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(flinkConf)

Then StreamingFileSink fails with a huge stack trace with most relevant
messages being Caused by:
org.apache.hadoop.fs.s3a.auth.NoAuthWithAWSException: No AWS Credentials
provided by SimpleAWSCredentialsProvider
EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider :
com.amazonaws.SdkClientException: Failed to connect to service endpoint:  which
means that Hadoop tried to enumerate all of the credential providers
instead of using the one set in configuration. What am I doing wrong?


Display time in UTC on the UI

2021-10-15 Thread Shilpa Shankar
Is there a way we can update flink UI to display UTC time instead of local
time?

[image: image.png]

Thanks,
Shilpa


Re: [External] : Timeout settings for Flink jobs?

2021-10-15 Thread Fuyao Li
Hi Sharon,

I think for DataStream API, you can override the isEndOfStream() method in the 
DeserializationSchema to control the input data source to end and thus end the 
workflow.

Thanks,
Fuyao

From: Sharon Xie 
Date: Monday, October 11, 2021 at 12:43
To: user@flink.apache.org 
Subject: [External] : Timeout settings for Flink jobs?
Hi there,

We have a use case where we want to terminate a job when a time limit is 
reached. Is there a Flink setting that we can use for this use case?


Thanks,
Sharon


Re: Disable KafkaSourceReaderMetrics logs

2021-10-15 Thread Preston Price
There is an open bug for this here:
https://issues.apache.org/jira/browse/FLINK-24497
For log4j2 these settings worked for me:

# mute obnoxious warnings due to this bug:
https://issues.apache.org/jira/browse/FLINK-24497
logger.flink_annoying_mute.name =
org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics
logger.flink_annoying_mute.level = error


On Fri, Oct 15, 2021 at 8:08 AM Denis Nutiu  wrote:

> Hi,
>
> My Flink (1.14.0) job seems to output a lot of error messages with the
> following text:
>
> 16:46:38,562 WARN
> org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics []
> - Error when getting Kafka consumer metric "records-lag" for partition
> "lambada.events-0". Metric "pendingBytes" may not be reported correctly.
> java.lang.IllegalStateException: Cannot find Kafka metric matching current
> filter.
> at
> org.apache.flink.connector.kafka.MetricUtil.lambda$getKafkaMetric$1(MetricUtil.java:63)
> ~[flink-connector-kafka_2.11-1.14.0.jar:1.14.0]
> at java.util.Optional.orElseThrow(Optional.java:408) ~[?:?]
> at
> org.apache.flink.connector.kafka.MetricUtil.getKafkaMetric(MetricUtil.java:61)
> ~[flink-connector-kafka_2.11-1.14.0.jar:1.14.0]
> at
> org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics.getRecordsLagMetric(KafkaSourceReaderMetrics.java:304)
> ~[flink-connector-kafka_2.11-1.14.0.jar:1.14.0]
> at
> org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics.lambda$maybeAddRecordsLagMetric$4(KafkaSourceReaderMetrics.java:229)
> ~[flink-connector-kafka_2.11-1.14.0.jar:1.14.0]
> at
> java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1705)
> [?:?]
> at
> org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics.maybeAddRecordsLagMetric(KafkaSourceReaderMetrics.java:228)
> [flink-connector-kafka_2.11-1.14.0.jar:1.14.0]
> at
> org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.fetch(KafkaPartitionSplitReader.java:187)
> [flink-connector-kafka_2.11-1.14.0.jar:1.14.0]
> at
> org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
> [flink-connector-base-1.14.0.jar:1.14.0]
> at
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
> [flink-connector-base-1.14.0.jar:1.14.0]
> at
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
> [flink-connector-base-1.14.0.jar:1.14.0]
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
> [?:?]
> at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> [?:?]
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> [?:?]
> at java.lang.Thread.run(Thread.java:829) [?:?]
>
> I tried to disable the logs by adding the following line log4j2.properties
> but it did not work.
>
>
> log4j.logger.org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics=OFF
>
> Is there any other way to disable the messages?
> --
> Best,
> Denis
>


Disable KafkaSourceReaderMetrics logs

2021-10-15 Thread Denis Nutiu
Hi,

My Flink (1.14.0) job seems to output a lot of error messages with the
following text:

16:46:38,562 WARN
org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics []
- Error when getting Kafka consumer metric "records-lag" for partition
"lambada.events-0". Metric "pendingBytes" may not be reported correctly.
java.lang.IllegalStateException: Cannot find Kafka metric matching current
filter.
at
org.apache.flink.connector.kafka.MetricUtil.lambda$getKafkaMetric$1(MetricUtil.java:63)
~[flink-connector-kafka_2.11-1.14.0.jar:1.14.0]
at java.util.Optional.orElseThrow(Optional.java:408) ~[?:?]
at
org.apache.flink.connector.kafka.MetricUtil.getKafkaMetric(MetricUtil.java:61)
~[flink-connector-kafka_2.11-1.14.0.jar:1.14.0]
at
org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics.getRecordsLagMetric(KafkaSourceReaderMetrics.java:304)
~[flink-connector-kafka_2.11-1.14.0.jar:1.14.0]
at
org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics.lambda$maybeAddRecordsLagMetric$4(KafkaSourceReaderMetrics.java:229)
~[flink-connector-kafka_2.11-1.14.0.jar:1.14.0]
at
java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1705)
[?:?]
at
org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics.maybeAddRecordsLagMetric(KafkaSourceReaderMetrics.java:228)
[flink-connector-kafka_2.11-1.14.0.jar:1.14.0]
at
org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.fetch(KafkaPartitionSplitReader.java:187)
[flink-connector-kafka_2.11-1.14.0.jar:1.14.0]
at
org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
[flink-connector-base-1.14.0.jar:1.14.0]
at
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
[flink-connector-base-1.14.0.jar:1.14.0]
at
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
[flink-connector-base-1.14.0.jar:1.14.0]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
[?:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
[?:?]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
[?:?]
at java.lang.Thread.run(Thread.java:829) [?:?]

I tried to disable the logs by adding the following line log4j2.properties
but it did not work.

log4j.logger.org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics=OFF

Is there any other way to disable the messages?
-- 
Best,
Denis


Re: [DISCUSS] Creating an external connector repository

2021-10-15 Thread Chesnay Schepler
My opinion of splitting the Flink repositories hasn't changed; I'm still 
in favor of it.


While it would technically be possible to release individual connectors 
even if they are part of the Flink repo,
it is quite a hassle to do so and error prone due to the current branch 
structure.

A split would also force us to watch out much more for API stability.

I'm gonna assume that we will move out all connectors:

What I'm concerned about, and which we never really covered in past 
discussions about split repositories, are

a) ways to share infrastructure (e.g., CI/release utilities/codestyle)
b) testing
c) documentation integration

Particularly for b) we still lack any real public utilities.
Even fundamental things such as the MiniClusterResource are not 
annotated in any way.

I would argue that we need to sort this out before a split can happen.
We've seen with the flink-benchmarks repo and recent discussions how 
easily things can break.


Related to that, there is the question on how Flink is then supposed to 
ensure that things don't break. My impression is that we heavily rely on 
the connector tests to that end at the moment.
Similarly, what connector (version) would be used for examples (like the 
WordCount which reads from Kafka) or (e2e) tests that want to read 
something other than a file? You end up with this circular dependency 
which are always troublesome.


As for for the repo structure, I would think that a single one could 
work quite well (because having 10+ connector repositories is just a 
mess), but currently I wouldn't set it up as a single project.
I would rather have something like N + 1 projects (one for each 
connectors + a shared testing project) which are released individually 
as required, without any snapshot dependencies in-between.
Then 1 branch for each major Flink version (again, no snapshot 
dependencies). Individual connectors can be released at any time against 
any of the latest bugfix releases, which due to lack of binaries (and 
python releases) would be a breeze.


I don't like the idea of moving existing connectors out of the Apache 
organization. At the very least, not all of them. While some are 
certainly ill-maintained (e.g., Cassandra) where it would be neat if 
external projects could maintain them, others (like Kafka) are not and 
quite fundamental to actually using Flink.


On 15/10/2021 14:47, Arvid Heise wrote:

Dear community,

Today I would like to kickstart a series of discussions around creating an
external connector repository. The main idea is to decouple the release
cycle of Flink with the release cycles of the connectors. This is a common
approach in other big data analytics projects and seems to scale better
than the current approach. In particular, it will yield the following
changes.


-

Faster releases of connectors: New features can be added more quickly,
bugs can be fixed immediately, and we can have faster security patches in
case of direct or indirect (through dependencies) security flaws.
-

New features can be added to old Flink versions: If the connector API
didn’t change, the same connector jar may be used with different Flink
versions. Thus, new features can also immediately be used with older Flink
versions. A compatibility matrix on each connector page will help users to
find suitable connector versions for their Flink versions.
-

More activity and contributions around connectors: If we ease the
contribution and development process around connectors, we will see faster
development and also more connectors. Since that heavily depends on the
chosen approach discussed below, more details will be shown there.
-

An overhaul of the connector page: In the future, all known connectors
will be shown on the same page in a similar layout independent of where
they reside. They could be hosted on external project pages (e.g., Iceberg
and Hudi), on some company page, or may stay within the main Flink reposi
tory. Connectors may receive some sort of quality seal such that users
can quickly access the production-readiness and we could also add which
community/company promises which kind of support.
-

If we take out (some) connectors out of Flink, Flink CI will be faster
and Flink devs will experience less build stabilities (which mostly come
from connectors). That would also speed up Flink development.


Now I’d first like to collect your viewpoints on the ideal state. Let’s
first recap which approaches, we currently have:


-

We have half of the connectors in the main Flink repository. Relatively
few of them have received updates in the past couple of months.
-

Another large chunk of connectors are in Apache Bahir. It recently has
seen the first release in 3 years.
-

There are a few other (Apache) projects that maintain a Flink connector,
such as Apache Iceberg, Apache Hudi, and Pravega.
-

 

Re: [DISCUSS] Creating an external connector repository

2021-10-15 Thread Ingo Bürk
Hi Arvid,

In general I think breaking up the big repo would be a good move with many
benefits (which you have outlined already). One concern would be how to
proceed with our docs / examples if we were to really separate out all
connectors.

1. More real-life examples would essentially now depend on external
projects. Particularly if hosted outside the ASF, this would feel somewhat
odd. Or to put it differently, if flink-connector-foo is not part of Flink
itself, should the Flink Docs use it for any examples?
2. Generation of documentation (config options) wouldn't be possible unless
the docs depend on these external projects, which would create weird
version dependency cycles (Flink 1.X's docs depend on flink-connector-foo
1.X which depends on Flink 1.X).
3. Documentation would inevitably be much less consistent when split across
many repositories.

As for your approaches, how would (A) allow hosting personal / company
projects if only Flink committers can write to it?

> Connectors may receive some sort of quality seal

This sounds like a lot of work and process, and could easily become a
source of frustration.


Best
Ingo

On Fri, Oct 15, 2021 at 2:47 PM Arvid Heise  wrote:

> Dear community,
>
> Today I would like to kickstart a series of discussions around creating an
> external connector repository. The main idea is to decouple the release
> cycle of Flink with the release cycles of the connectors. This is a common
> approach in other big data analytics projects and seems to scale better
> than the current approach. In particular, it will yield the following
> changes.
>
>
>-
>
>Faster releases of connectors: New features can be added more quickly,
>bugs can be fixed immediately, and we can have faster security patches in
>case of direct or indirect (through dependencies) security flaws.
>-
>
>New features can be added to old Flink versions: If the connector API
>didn’t change, the same connector jar may be used with different Flink
>versions. Thus, new features can also immediately be used with older Flink
>versions. A compatibility matrix on each connector page will help users to
>find suitable connector versions for their Flink versions.
>-
>
>More activity and contributions around connectors: If we ease the
>contribution and development process around connectors, we will see faster
>development and also more connectors. Since that heavily depends on the
>chosen approach discussed below, more details will be shown there.
>-
>
>An overhaul of the connector page: In the future, all known connectors
>will be shown on the same page in a similar layout independent of where
>they reside. They could be hosted on external project pages (e.g., Iceberg
>and Hudi), on some company page, or may stay within the main Flink reposi
>tory. Connectors may receive some sort of quality seal such that users
>can quickly access the production-readiness and we could also add which
>community/company promises which kind of support.
>-
>
>If we take out (some) connectors out of Flink, Flink CI will be faster
>and Flink devs will experience less build stabilities (which mostly come
>from connectors). That would also speed up Flink development.
>
>
> Now I’d first like to collect your viewpoints on the ideal state. Let’s
> first recap which approaches, we currently have:
>
>
>-
>
>We have half of the connectors in the main Flink repository.
>Relatively few of them have received updates in the past couple of months.
>-
>
>Another large chunk of connectors are in Apache Bahir. It recently has
>seen the first release in 3 years.
>-
>
>There are a few other (Apache) projects that maintain a Flink
>connector, such as Apache Iceberg, Apache Hudi, and Pravega.
>-
>
>A few connectors are listed on company-related repositories, such as
>Apache Pulsar on StreamNative and CDC connectors on Ververica.
>
>
> My personal observation is that having a repository per connector seems to
> increase the activity on a connector as it’s easier to maintain. For
> example, in Apache Bahir all connectors are built against the same Flink
> version, which may not be desirable when certain APIs change; for example,
> SinkFunction will be eventually deprecated and removed but new Sink
> interface may gain more features.
>
> Now, I'd like to outline different approaches. All approaches will allow
> you to host your connector on any kind of personal, project, or company
> repository. We still want to provide a default place where users can
> contribute their connectors and hopefully grow a community around it. The
> approaches are:
>
>
>1.
>
>Create a mono-repo under the Apache umbrella where all connectors will
>reside, for example, github.com/apache/flink-connectors. That
>repository needs to follow its rules: No GitHub issues, no Dependabot or
>similar tools, and a strict manual

[DISCUSS] Creating an external connector repository

2021-10-15 Thread Arvid Heise
Dear community,

Today I would like to kickstart a series of discussions around creating an
external connector repository. The main idea is to decouple the release
cycle of Flink with the release cycles of the connectors. This is a common
approach in other big data analytics projects and seems to scale better
than the current approach. In particular, it will yield the following
changes.


   -

   Faster releases of connectors: New features can be added more quickly,
   bugs can be fixed immediately, and we can have faster security patches in
   case of direct or indirect (through dependencies) security flaws.
   -

   New features can be added to old Flink versions: If the connector API
   didn’t change, the same connector jar may be used with different Flink
   versions. Thus, new features can also immediately be used with older Flink
   versions. A compatibility matrix on each connector page will help users to
   find suitable connector versions for their Flink versions.
   -

   More activity and contributions around connectors: If we ease the
   contribution and development process around connectors, we will see faster
   development and also more connectors. Since that heavily depends on the
   chosen approach discussed below, more details will be shown there.
   -

   An overhaul of the connector page: In the future, all known connectors
   will be shown on the same page in a similar layout independent of where
   they reside. They could be hosted on external project pages (e.g., Iceberg
   and Hudi), on some company page, or may stay within the main Flink reposi
   tory. Connectors may receive some sort of quality seal such that users
   can quickly access the production-readiness and we could also add which
   community/company promises which kind of support.
   -

   If we take out (some) connectors out of Flink, Flink CI will be faster
   and Flink devs will experience less build stabilities (which mostly come
   from connectors). That would also speed up Flink development.


Now I’d first like to collect your viewpoints on the ideal state. Let’s
first recap which approaches, we currently have:


   -

   We have half of the connectors in the main Flink repository. Relatively
   few of them have received updates in the past couple of months.
   -

   Another large chunk of connectors are in Apache Bahir. It recently has
   seen the first release in 3 years.
   -

   There are a few other (Apache) projects that maintain a Flink connector,
   such as Apache Iceberg, Apache Hudi, and Pravega.
   -

   A few connectors are listed on company-related repositories, such as
   Apache Pulsar on StreamNative and CDC connectors on Ververica.


My personal observation is that having a repository per connector seems to
increase the activity on a connector as it’s easier to maintain. For
example, in Apache Bahir all connectors are built against the same Flink
version, which may not be desirable when certain APIs change; for example,
SinkFunction will be eventually deprecated and removed but new Sink
interface may gain more features.

Now, I'd like to outline different approaches. All approaches will allow
you to host your connector on any kind of personal, project, or company
repository. We still want to provide a default place where users can
contribute their connectors and hopefully grow a community around it. The
approaches are:


   1.

   Create a mono-repo under the Apache umbrella where all connectors will
   reside, for example, github.com/apache/flink-connectors. That repository
   needs to follow its rules: No GitHub issues, no Dependabot or similar
   tools, and a strict manual release process. It would be under the Flink
   community, such that Flink committers can write to that repository but
   no-one else.
   2.

   Create a GitHub organization with small repositories, for example
   github.com/flink-connectors. Since it’s not under the Apache umbrella,
   we are free to use whatever process we deem best (up to a future
   discussion). Each repository can have a shared list of maintainers +
   connector specific committers. We can provide more automation. We may even
   allow different licenses to incorporate things like a connector to Oracle
   that cannot be released under ASL.
   3.

   ??? <- please provide your additional approaches


In both cases, we will provide opinionated module/repository templates
based on a connector testing framework and guidelines. Depending on the
approach, we may need to enforce certain things.

I’d like to first focus on what the community would ideally seek and
minimize the discussions around legal issues, which we would discuss later.
For now, I’d also like to postpone the discussion if we move all or only a
subset of connectors from Flink to the new default place as it seems to be
orthogonal to the fundamental discussion.

PS: If the external repository for connectors is successful, I’d also like
to move out other things like formats, filesystems, and metri

Re: Securing Stateful Functions

2021-10-15 Thread Igal Shilman
Hi Mark,

For communicating with remote functions we use the default trust manager,
so I believe that if you add the self signed certificate to
the container the JVM will pick it up automatically.
I haven't done it myself, but I've found this blog post that explains how
to do it [1]
Let me know if that works for you, otherwise I'll dig deeper :-)

Good luck,
Igal.

[1]
https://medium.com/@codebyamir/the-java-developers-guide-to-ssl-certificates-b78142b3a0fc


On Fri, Oct 15, 2021 at 12:34 PM mark  wrote:

> Hello,
> My team needs to apply (currently self-signed) HTTPS certificates to our
> Stateful Function endpoints. Is this currently possible? I can't find any
> examples in the playground, or information in the online documentation.
>
> We are using Remote Stateful Functions and a custom, Java-based 
> `org.apache.flink.statefun.sdk.io.Router`.
> We need help please in knowing how to set up the Router's client
> connection with the necessary client-side certificates.
>
> Very many thanks in advance!
>
> Regards,
> Mark
>


Re: Synchronisation between different sources in flink

2021-10-15 Thread Akshay Agarwal
Hey,

The use case is as follows:
Kafka is the source for our frontend/impression events like any user
activity and what we are doing is enriching that stream some values from
the backend which is the source of certain slowly changing properties(that
changes/updated over time) values, it gets dumped as files from the backend
in regular intervals like every hour, quarter of an hour.

*Akshay Agarwal*

On Fri, Oct 15, 2021 at 3:48 PM Martijn Visser 
wrote:

> Hi,
>
> Can you elaborate a bit more on what your use case is and what you're
> trying to achieve?
>
> Best regards,
>
> Martijn
>
> On Fri, 15 Oct 2021 at 11:25, Akshay Agarwal 
> wrote:
>
>> Hey,
>>
>> I have few doubts about HybridSource, it says:
>>
>>> To arrange multiple sources in a HybridSource, all sources except the
>>> last one need to be bounded. Therefore, the sources typically need to be
>>> assigned a start and end position.
>>
>> Both sources that I use in the job are unbounded, (file source and Kafka
>> source both get continuous updates). Further schema of both sources is
>> different since they give different data points. So wanted to clarify that
>> further you are suggesting converting Kafka source to hybrid source(file
>> source and Kafka source) and update state based on that and keep file
>> source as it is.
>>
>> *Akshay Agarwal*
>>
>>
>> On Fri, Oct 15, 2021 at 2:06 PM Akshay Agarwal <
>> akshay.agar...@grofers.com> wrote:
>>
>>> Nope, I haven't gone through that, will take a look, thanks for the
>>> prompt reply.
>>>
>>> *Akshay Agarwal*
>>>
>>>
>>>
>>> On Fri, Oct 15, 2021 at 2:00 PM Martijn Visser 
>>> wrote:
>>>
 Hi,

 Have you checked out the Hybrid Source? [1]

 Thanks, Martijn

 [1]
 https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/datastream/hybridsource/

 On Fri, 15 Oct 2021 at 10:22, Akshay Agarwal <
 akshay.agar...@grofers.com> wrote:

> Hi Team,
>
> I have a streaming job that creates an enriched stream from streams of
> 2 different sources like one from files in object storage and another one
> Kafka. But since both of these are from different sources, events from 
> them
> are read at different moments based on their performance. For example
> events from Kafka arrives first than from file storage events and even
> though I have used event watermarks, thus at the restart of jobs, enriched
> events are wrong and it gets corrected after file events start arriving. I
> tried to search for setting up synchronization between different sources 
> in
> flink but did not found any blog/material. It might be a noob question, 
> but
> if you guys have built something around this, could you let me know.
>
> Regards
> *Akshay Agarwal*
>
> [image: https://grofers.com] 


>> [image: https://grofers.com] 
>
>

-- 
 


Securing Stateful Functions

2021-10-15 Thread mark
Hello,
My team needs to apply (currently self-signed) HTTPS certificates to our
Stateful Function endpoints. Is this currently possible? I can't find any
examples in the playground, or information in the online documentation.

We are using Remote Stateful Functions and a custom, Java-based
`org.apache.flink.statefun.sdk.io.Router`.
We need help please in knowing how to set up the Router's client connection
with the necessary client-side certificates.

Very many thanks in advance!

Regards,
Mark


Re: Synchronisation between different sources in flink

2021-10-15 Thread Martijn Visser
Hi,

Can you elaborate a bit more on what your use case is and what you're
trying to achieve?

Best regards,

Martijn

On Fri, 15 Oct 2021 at 11:25, Akshay Agarwal 
wrote:

> Hey,
>
> I have few doubts about HybridSource, it says:
>
>> To arrange multiple sources in a HybridSource, all sources except the
>> last one need to be bounded. Therefore, the sources typically need to be
>> assigned a start and end position.
>
> Both sources that I use in the job are unbounded, (file source and Kafka
> source both get continuous updates). Further schema of both sources is
> different since they give different data points. So wanted to clarify that
> further you are suggesting converting Kafka source to hybrid source(file
> source and Kafka source) and update state based on that and keep file
> source as it is.
>
> *Akshay Agarwal*
>
>
> On Fri, Oct 15, 2021 at 2:06 PM Akshay Agarwal 
> wrote:
>
>> Nope, I haven't gone through that, will take a look, thanks for the
>> prompt reply.
>>
>> *Akshay Agarwal*
>>
>>
>>
>> On Fri, Oct 15, 2021 at 2:00 PM Martijn Visser 
>> wrote:
>>
>>> Hi,
>>>
>>> Have you checked out the Hybrid Source? [1]
>>>
>>> Thanks, Martijn
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/datastream/hybridsource/
>>>
>>> On Fri, 15 Oct 2021 at 10:22, Akshay Agarwal 
>>> wrote:
>>>
 Hi Team,

 I have a streaming job that creates an enriched stream from streams of
 2 different sources like one from files in object storage and another one
 Kafka. But since both of these are from different sources, events from them
 are read at different moments based on their performance. For example
 events from Kafka arrives first than from file storage events and even
 though I have used event watermarks, thus at the restart of jobs, enriched
 events are wrong and it gets corrected after file events start arriving. I
 tried to search for setting up synchronization between different sources in
 flink but did not found any blog/material. It might be a noob question, but
 if you guys have built something around this, could you let me know.

 Regards
 *Akshay Agarwal*

 [image: https://grofers.com] 
>>>
>>>
> [image: https://grofers.com] 


Re: Synchronisation between different sources in flink

2021-10-15 Thread Akshay Agarwal
Hey,

I have few doubts about HybridSource, it says:

> To arrange multiple sources in a HybridSource, all sources except the
> last one need to be bounded. Therefore, the sources typically need to be
> assigned a start and end position.

Both sources that I use in the job are unbounded, (file source and Kafka
source both get continuous updates). Further schema of both sources is
different since they give different data points. So wanted to clarify that
further you are suggesting converting Kafka source to hybrid source(file
source and Kafka source) and update state based on that and keep file
source as it is.

*Akshay Agarwal*


On Fri, Oct 15, 2021 at 2:06 PM Akshay Agarwal 
wrote:

> Nope, I haven't gone through that, will take a look, thanks for the prompt
> reply.
>
> *Akshay Agarwal*
>
>
>
> On Fri, Oct 15, 2021 at 2:00 PM Martijn Visser 
> wrote:
>
>> Hi,
>>
>> Have you checked out the Hybrid Source? [1]
>>
>> Thanks, Martijn
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/datastream/hybridsource/
>>
>> On Fri, 15 Oct 2021 at 10:22, Akshay Agarwal 
>> wrote:
>>
>>> Hi Team,
>>>
>>> I have a streaming job that creates an enriched stream from streams of 2
>>> different sources like one from files in object storage and another one
>>> Kafka. But since both of these are from different sources, events from them
>>> are read at different moments based on their performance. For example
>>> events from Kafka arrives first than from file storage events and even
>>> though I have used event watermarks, thus at the restart of jobs, enriched
>>> events are wrong and it gets corrected after file events start arriving. I
>>> tried to search for setting up synchronization between different sources in
>>> flink but did not found any blog/material. It might be a noob question, but
>>> if you guys have built something around this, could you let me know.
>>>
>>> Regards
>>> *Akshay Agarwal*
>>>
>>> [image: https://grofers.com] 
>>
>>

-- 
 


Re: Synchronisation between different sources in flink

2021-10-15 Thread Akshay Agarwal
Nope, I haven't gone through that, will take a look, thanks for the prompt
reply.

*Akshay Agarwal*



On Fri, Oct 15, 2021 at 2:00 PM Martijn Visser 
wrote:

> Hi,
>
> Have you checked out the Hybrid Source? [1]
>
> Thanks, Martijn
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/datastream/hybridsource/
>
> On Fri, 15 Oct 2021 at 10:22, Akshay Agarwal 
> wrote:
>
>> Hi Team,
>>
>> I have a streaming job that creates an enriched stream from streams of 2
>> different sources like one from files in object storage and another one
>> Kafka. But since both of these are from different sources, events from them
>> are read at different moments based on their performance. For example
>> events from Kafka arrives first than from file storage events and even
>> though I have used event watermarks, thus at the restart of jobs, enriched
>> events are wrong and it gets corrected after file events start arriving. I
>> tried to search for setting up synchronization between different sources in
>> flink but did not found any blog/material. It might be a noob question, but
>> if you guys have built something around this, could you let me know.
>>
>> Regards
>> *Akshay Agarwal*
>>
>> [image: https://grofers.com] 
>
>

-- 
 


Re: Synchronisation between different sources in flink

2021-10-15 Thread Martijn Visser
Hi,

Have you checked out the Hybrid Source? [1]

Thanks, Martijn

[1]
https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/datastream/hybridsource/

On Fri, 15 Oct 2021 at 10:22, Akshay Agarwal 
wrote:

> Hi Team,
>
> I have a streaming job that creates an enriched stream from streams of 2
> different sources like one from files in object storage and another one
> Kafka. But since both of these are from different sources, events from them
> are read at different moments based on their performance. For example
> events from Kafka arrives first than from file storage events and even
> though I have used event watermarks, thus at the restart of jobs, enriched
> events are wrong and it gets corrected after file events start arriving. I
> tried to search for setting up synchronization between different sources in
> flink but did not found any blog/material. It might be a noob question, but
> if you guys have built something around this, could you let me know.
>
> Regards
> *Akshay Agarwal*
>
> [image: https://grofers.com] 


Synchronisation between different sources in flink

2021-10-15 Thread Akshay Agarwal
Hi Team,

I have a streaming job that creates an enriched stream from streams of 2
different sources like one from files in object storage and another one
Kafka. But since both of these are from different sources, events from them
are read at different moments based on their performance. For example
events from Kafka arrives first than from file storage events and even
though I have used event watermarks, thus at the restart of jobs, enriched
events are wrong and it gets corrected after file events start arriving. I
tried to search for setting up synchronization between different sources in
flink but did not found any blog/material. It might be a noob question, but
if you guys have built something around this, could you let me know.

Regards
*Akshay Agarwal*

-- 
 


Re: Removing metrics

2021-10-15 Thread JING ZHANG
Hi Mason,
I'm afraid there is no way for users to actively remove/unregister metrics.
These things are automatically completed by the Flink engine after the job
finishes.

Here is a very hacky way to achieve this. After you think the UDF with
metrics registration has already processed all it's business, you could
stop the job with savepoint, update the UDF to delete code which defines
metrics, then restore the job from savepoint. I admit the solution is very
hacky, however I don't know a better way yet.

BTW, Would you please explain why you need to remove/unregister metrics
actively?

Best,

JING ZHANG

Mason Chen  于2021年10月15日周五 上午8:43写道:

> Hi all,
>
> Suppose I have a short lived process within a UDF that defines metrics.
> After the process has completed, the underlying resources should be cleaned
> up. Is there an API to remove/unregister metrics?
>
> Best,
> Mason
>


Re: Any issues with reinterpretAsKeyedStream when scaling partitions?

2021-10-15 Thread JING ZHANG
Hi Dan,
Sorry for tipos,  I meant to provide the code to reproduce the problem. If
the current program is complex and secret, maybe you could try to simplify
the code.
Besides, Matthias's guess is very reasonable. Could you please whether is
there network shuffle between your two operators. Were those two operators
chained into one vertex?

Best,
JING ZHANG

Schwalbe Matthias  于2021年10月15日周五 下午2:57写道:

> … didn’t mean to hit the send button so soon 😊
>
>
>
> I guess we are getting closer to a solution
>
>
>
>
>
> Thias
>
>
>
>
>
>
>
> *From:* Schwalbe Matthias
> *Sent:* Freitag, 15. Oktober 2021 08:49
> *To:* 'Dan Hill' ; user 
> *Subject:* RE: Any issues with reinterpretAsKeyedStream when scaling
> partitions?
>
>
>
> Hi Dan again 😊,
>
>
>
> I shed a second look … from what I see from your call stack I conclude
> that indeed you have a network shuffle between your two operators,
>
> In which case reinterpretAsKeyedStream wouldn’t work
>
>
>
> ($StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:277
> indicates that the two operators are not chained)
>
>
>
>
>
> … just as a double-check could you please share both your
>
>- Execution plan (call println(env.getExecutionPlan) right before your
>call env.execute) (json), and
>- Your job plan (screenshot from flink dashboard)
>
>
>
> There is a number of preconditions before two operators get chained, and
> probably one of them fails (see [1]):
>
>- The two operators need to allow chaining the resp. other (see [2] …
>chaining strategy)
>- We need a ForwardPartitioner in between
>- We need to be in streaming mode
>- Both operators need the same parallelism
>- Chaining needs to be enabled for the streaming environment
>- The second operator needs to be single-input (i.e. no TwoInputOp nor
>union() before)
>
>
>
>
>
> [1]
> https://github.com/apache/flink/blob/2dabdd95c15ccae2a97a0e898d1acfc958a0f7f3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java#L861-L873
>
> [2]
> https://github.com/apache/flink/blob/2dabdd95c15ccae2a97a0e898d1acfc958a0f7f3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java#L903-L932
>
>
>
>
>
> *From:* Dan Hill 
> *Sent:* Donnerstag, 14. Oktober 2021 17:50
> *To:* user 
> *Subject:* Any issues with reinterpretAsKeyedStream when scaling
> partitions?
>
>
>
> I have a job that uses reinterpretAsKeyedStream across a simple map to
> avoid a shuffle.  When changing the number of partitions, I'm hitting an
> issue with registerEventTimeTimer complaining that "key group from 110 to
> 119 does not contain 186".  I'm using Flink v1.12.3.
>
>
>
> Any thoughts on this?  I don't know if there is a known issue
> with reinterpretAsKeyedStream.
>
>
>
> Rough steps:
>
> 1. I have a raw input stream of View records.  I keyBy the View using
> Tuple2(platform_id, log_user_id).
>
> 2. I do a small transformation of View to a TinyView.  I
> reinterpretAsKeyedStream the TinyView as a KeyedStream with the same key.
> The keys are the same.
>
> 3. I use the TinyView in a KeyedCoProcessFunction.
>
>
>
> When I savepoint and start again with a different number of partitions, my
> KeyedCoProcessFunction hits an issue with registerEventTimeTimer and
> complains that "key group from 110 to 119 does not contain 186".  I
> verified that the key does not change and that we use Tuple2 with
> primitives Long and String.
>
>
>
>
>
>
>
> 2021-10-14 08:17:07
>
> java.lang.IllegalArgumentException: view x insertion issue with
> registerEventTimeTimer for key=(120,3bfd5b19-9d86-4455-a5a1-480f8596a174),
> flat=platform_id: 120
>
> log_user_id: "3bfd5b19-9d86-4455-a5a1-480f8596a174"
>
> log_timestamp: 1634224329606
>
> view_id: "8fcdf922-7c79-4902-9778-3f20f39b0bc2"
>
>
>
> at
> ai.promoted.metrics.logprocessor.common.functions.inferred.BaseInferred.processElement1(BaseInferred.java:318)
>
> at
> ai.promoted.metrics.logprocessor.common.functions.inferred.BaseInferred.processElement1(BaseInferred.java:59)
>
> at
> ai.promoted.metrics.logprocessor.common.functions.LogSlowOnTimer.processElement1(LogSlowOnTimer.java:36)
>
> at
> org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator.processElement1(KeyedCoProcessOperator.java:78)
>
> at
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.processRecord1(StreamTwoInputProcessorFactory.java:199)
>
> at
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.lambda$create$0(StreamTwoInputProcessorFactory.java:164)
>
> at
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:277)
>
> at
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204)
>
>

Re: Flink 1.11 loses track of event timestamps and watermarks after process function

2021-10-15 Thread Ahmad Alkilani
Thanks again Arvid,
I am getting closer to the culprit as I've found some interesting
scenarios. Still no exact answer yet. We are indeed also using
.withIdleness to mitigate slow/issues with partitions.

I did have a few clarifying questions though w.r.t watermarks if you don't
mind.
*Watermark progression:*
The code you pointed out in [1], seems to indicate that watermarks are a
logical side-effect that travel alongside events in the stream but can also
progress on their own? This continues to puzzle me. Here's a contrived
example to clarify: A process function receives data but never emits
anything (neither in the processElement or based on a timer).. i.e., the
processFunction is just a black hole for event records passing through it.
Do watermarks still make it through to the subsequent next operator? If the
answer is yes here, then this confirms my now correct understanding of how
this works. I always thought, and according to what I see in litterature,
that watermarks travelled with events in a stream.

*Using process time windows alongside event-time computation:*
While most of the processing we have uses event-time.. we do have a
"processing time" window towards the end that essentially has the
responsibility of grouping events together in a batch before sending to an
external system.. the receiving end prefers slightly larger batches of data
vs one event at a time. This window uses processing time since we really
only care about how many records we send / second vs what the semantic
event time meaning of that record is. So for example we'd group events in a
2 second window, i.e., limiting to 1 request every 2 seconds downstream
(assuming parallelism of 1). The question here is, does this use of event
time windowing have any impact upstream, or downstream, on the progression
of watermarks? By impact downstream, the concern is if the watermark
somehow gets "lost" due to the change of semantics in processing. FWIW,
this window precedes the AsyncI/O function. Both of these have been removed
to simplify troubleshooting for the original question but they are part of
the bigger application once fully assembled.

Thank you!



On Thu, Oct 14, 2021 at 8:40 AM Arvid Heise  wrote:

> Hi Ahmad,
>
> The ProcessFunction is simply forwarding the Watermark [1]. So I don't
> have any explanation as to why it would not advance anymore as soon as you
> emit data. My assumption was that by emitting in the process function
> causes backpressure and thus halts the advancement of the watermark
> upstream.
>
> A usual suspect when not seeing good watermarks is that the custom
> watermark assigner is not working as expected. But you mentioned that with
> a no-op, the process function is actually showing the watermark and that
> leaves me completely puzzled.
>
> I would dump down your example even more to find the culprit.
>
> Another common issue is that if you have empty partitions in kafka, you
> wouldn't see advancing watermarks in the 2. process function after the
> keyBy. Since the watermark advances as the min watermark of all input
> subtasks, it will stay as MIN_VALUE in all operators after the keyBy if
> only one subtask sees no watermark advancement. See [2] for a solution.
>
> [1]
> https://github.com/apache/flink/blob/release-1.11/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java#L72-L72
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#dealing-with-idle-sources
>
> On Tue, Oct 12, 2021 at 10:22 PM Ahmad Alkilani  wrote:
>
>> Thanks Arvid.
>> Getting the easy stuff out of the way, I certainly wait for longer than
>> 10s (typically observe what happens over a few minutes) so the bounded
>> watermark issue isn't in play here.
>>
>> The Async IO as it stands today has timeouts so it doesn't run
>> indefinitely. WIth that said, I replaced the Aync IO with a simple process
>> function and print statements in the body of the process function. The
>> process function simply emits what it received. I also removed the custom
>> sink (that has an external dependency) and replaced it with a simple lambda
>> that occasionally prints just to show progress.
>>
>> So now:
>> Kafka -> Flink Kafka source -> flatMap (map & filter) ->
>> assignTimestampsAndWaterMarks -> map Function -> *process function
>> (print watermarks) *-> Key By -> Keyed Process Function -> *process
>> function (print watermarks)* -> Simple Sink
>>
>> I am seeing similar problems. The watermarks don't seem to advance until
>> a checkpoint is triggered. I haven't been able to measure if they indeed
>> advance *regularly *post checkpoint but the watermark at the very least
>> is no longer Long.MinValue. Also the checkpoint could simply be a
>> red-herring, I also see some offset commit messages to Kafka around the
>> same time that I notice watermarks advancing. This is well into a job
>> execution (5-10 minutes)
>>
>> So now that the Aync I/O is out of th