Re: Can we use custom serialization/deserialization for kafka sources and sinks through the table api?

2024-07-11 Thread Kevin Lam via user
Hi Gabriel,

You could consider overriding the value.serializer

 and value.deserializer

 (and similar for key) in the consumer and producer configuration that
Flink sets, using the `properties.*` option in the Kafka Connector.

Your serializer
and deserializer will have access to the headers, and can perform your
integrity checks, and can otherwise pass the byte[] around so the formats'
logic continues to handle SerDes otherwise.

Flink uses the ByteArray(De|S)erializers by default in its source

 and sink
.
It's currently not possible to override the source serializer, but it's a
work in progress via https://issues.apache.org/jira/browse/FLINK-35808
. Hoping to have it
merged soon.

Alternatively, you can wait for first-class header support in Flink table
Formats. There's some ongoing discussion and work via FLIP-454

 and this mailing list discussion
.

On Thu, Jul 11, 2024 at 2:13 PM Gabriel Giussi 
wrote:

> Reading from a kafka topic with custom serialization/deserialization can
> be done using a KafkaSource configured with an implementation
> of KafkaRecordDeserializationSchema, which has access even to kafka headers
> which are used in my case for checking message integrity.
> How can we do the same but using the table API where you can just
> configure the value.format with a string to a predefined set of formats?
>
> Thanks.
>


Best Practices? Fault Isolation for Processing Large Number of Same-Shaped Input Kafka Topics in a Big Flink Job

2024-05-13 Thread Kevin Lam via user
Hi everyone,

I'm currently prototyping on a project where we need to process a large
number of Kafka input topics (say, a couple of hundred), all of which share
the same DataType/Schema.

Our objective is to run the same Flink SQL on all of the input topics, but
I am concerned about doing this in a single large Flink SQL application for
fault isolation purposes. We'd like to limit the "blast radius" in cases of
data issues or "poison pills" in any particular Kafka topic — meaning, if
one topic runs into a problem, it shouldn’t compromise or halt the
processing of the others.

At the same time, we are concerned about the operational toil associated
with managing hundreds of Flink jobs that are really one logical
application.

Has anyone here tackled a similar challenge? If so:

   1. How did you design your solution to handle a vast number of topics
   without creating a heavy management burden?
   2. What strategies or patterns have you found effective in isolating
   issues within a specific topic so that they do not affect the processing of
   others?
   3. Are there specific configurations or tools within the Flink ecosystem
   that you'd recommend to efficiently manage this scenario?

Any examples, suggestions, or references to relevant documentation would be
helpful. Thank you in advance for your time and help!


Re: Issue with HybridSource recovering from Savepoint

2022-05-04 Thread Kevin Lam
Following up on this, is there a good way to debug restoring from
savepoints locally? We currently have a set-up where we use IntelliJ to run
and test our pipelines locally, but would like an API to be able to specify
the savepoint to restore from, without needing to spin up a full cluster.

In intelliJ we just use the build and run functionality, and don't have
access to the Flink CLI.

On Tue, May 3, 2022 at 2:48 PM Kevin Lam  wrote:

> Hi,
>
> We're encountering an error using a HybridSource that is composed of a
> FileSource + KafkaSource, only when recovering from a savepoint [0]. This
> HybridSource is used to read from a Kafka topic's archives hosted on GCS
> via a bounded FileSource, and then automatically switch over to the data
> stream from the Kafka associated topic.
>
> Has anyone seen this error before?
>
> [0]
> ```
> 2022-05-03 09:47:57
> org.apache.flink.util.FlinkException: Global failure triggered by
> OperatorCoordinator for 'Source: ShopAppTrackingEventUpdate_V1' (operator
> afb3208349a953c47059c1994f800aa2).
> at
> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:545)
> at
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$QuiesceableContext.failJob(RecreateOnResetOperatorCoordinator.java:223)
> at
> org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.failJob(SourceCoordinatorContext.java:285)
> at
> org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$8(SourceCoordinator.java:358)
> at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
> Source)
> at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
> Source)
> at java.base/java.lang.Thread.run(Unknown Source)
> Caused by: java.lang.NullPointerException: Source for index=0 not available
> at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:104)
> at
> org.apache.flink.connector.base.source.hybrid.SwitchedSources.sourceOf(SwitchedSources.java:36)
> at
> org.apache.flink.connector.base.source.hybrid.HybridSourceSplitEnumerator.sendSwitchSourceEvent(HybridSourceSplitEnumerator.java:148)
> at
> org.apache.flink.connector.base.source.hybrid.HybridSourceSplitEnumerator.handleSourceEvent(HybridSourceSplitEnumerator.java:222)
> at
> org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$handleEventFromOperator$1(SourceCoordinator.java:182)
> at
> org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$8(SourceCoordinator.java:344)
> ... 3 more
> ```
>


Issue with HybridSource recovering from Savepoint

2022-05-03 Thread Kevin Lam
Hi,

We're encountering an error using a HybridSource that is composed of a
FileSource + KafkaSource, only when recovering from a savepoint [0]. This
HybridSource is used to read from a Kafka topic's archives hosted on GCS
via a bounded FileSource, and then automatically switch over to the data
stream from the Kafka associated topic.

Has anyone seen this error before?

[0]
```
2022-05-03 09:47:57
org.apache.flink.util.FlinkException: Global failure triggered by
OperatorCoordinator for 'Source: ShopAppTrackingEventUpdate_V1' (operator
afb3208349a953c47059c1994f800aa2).
at
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:545)
at
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$QuiesceableContext.failJob(RecreateOnResetOperatorCoordinator.java:223)
at
org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.failJob(SourceCoordinatorContext.java:285)
at
org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$8(SourceCoordinator.java:358)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
Source)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.lang.NullPointerException: Source for index=0 not available
at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:104)
at
org.apache.flink.connector.base.source.hybrid.SwitchedSources.sourceOf(SwitchedSources.java:36)
at
org.apache.flink.connector.base.source.hybrid.HybridSourceSplitEnumerator.sendSwitchSourceEvent(HybridSourceSplitEnumerator.java:148)
at
org.apache.flink.connector.base.source.hybrid.HybridSourceSplitEnumerator.handleSourceEvent(HybridSourceSplitEnumerator.java:222)
at
org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$handleEventFromOperator$1(SourceCoordinator.java:182)
at
org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$8(SourceCoordinator.java:344)
... 3 more
```


Reading FileSource Files in a particular order

2022-03-14 Thread Kevin Lam
Hi all,

We're interested in being able to use a FileSource

read from a Google Cloud Storage (GCS) archive of messages from a Kafka
topic, roughly in order.

Our GCS archive is partitioned into folders by time, however, when we read
it using a FileSource, the messages are processed in a random order. We'd
like to be able to control what order the files are read in, and take
advantage of the clear ordering our GCS archive provides.

What is the best way to achieve this? Would it be possible to write a
custom FileEnumerator

that
sorts the directories and returns the splits in order?

Any help would be greatly appreciated!

Thanks,
Kevin


Evolving Schemas with ParquetColumnarRowInputFormat

2022-03-09 Thread Kevin Lam
Hi all,

We're interested in using ParquetColumnarRowInputFormat

or similar with evolving Parquet schemas. Any advice or recommendations?

Specifically, the situation we are interested in is when the passed in
RowType projectedType contains a new field with Type.Repetition.OPTIONAL
that is not present in the Parquet file being read. In this case we want
that column to just be read as null.

Thanks in advance for your help!


Re: Avro BulkFormat for the new FileSource API?

2022-02-08 Thread Kevin Lam
Sorry to revive this thread, I'm just circling back to this now. Is it
possible to use  https://issues.apache.org/jira/browse/FLINK-24565 with the
DataStream API? I am not sure how to make use of AvroFileFormatFactory in
the DataStream APi context, and couldn't find any examples.



On Mon, Jan 10, 2022 at 4:19 PM Kevin Lam  wrote:

> Hi David,
>
> Awesome, wasn't aware of FLINK-24565. That's the kind of thing we were
> looking for and will take a look at it. Thanks for sharing that!
>
>
>
> On Fri, Jan 7, 2022 at 2:05 PM David Morávek 
> wrote:
>
>> Hi Kevin,
>>
>> I'm not as familiar with initiatives around the new sources, but it seems
>> that the BulkFormat for Avro [1] has been added recently and will be
>> released with the Flink 1.15.x.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-24565
>>
>> Best,
>> D.
>>
>> On Fri, Jan 7, 2022 at 7:23 PM Kevin Lam  wrote:
>>
>>> Hi all,
>>>
>>> We're looking into using the new FileSource
>>> <https://nightlies.apache.org/flink/flink-docs-release-1.14/api/java/org/apache/flink/connector/file/src/FileSource.html>
>>> API, we see that there is a BulkFormat
>>> <https://nightlies.apache.org/flink/flink-docs-release-1.14/api/java/org/apache/flink/connector/file/src/reader/BulkFormat.html>
>>> for Parquet, via ParquetColumnarRowFormat
>>> <https://nightlies.apache.org/flink/flink-docs-release-1.14/api/java/org/apache/flink/formats/parquet/ParquetColumnarRowInputFormat.html>.
>>> Is there similar BulkFormat available or in the works for Avro files?
>>>
>>> I imagined it may be a common use-case in the community so wanted to
>>> check here before we invest time implementing our own.
>>>
>>> Thanks in advance!
>>>
>>


Re: Future support for custom FileEnumerator in FileSource?

2022-02-02 Thread Kevin Lam
Hi,

Totally missed that setFileEnumerator method. That definitely helps, I
checked it out and this does what we were looking for.

Thanks FG!

On Wed, Feb 2, 2022 at 3:07 AM Francesco Guardiani 
wrote:

> Hi,
> From what I see here
> https://nightlies.apache.org/flink/flink-docs-release-1.14/api/java/org/apache/flink/connector/file/src/AbstractFileSource.AbstractFileSourceBuilder.html#setFileEnumerator-org.apache.flink.connector.file.src.enumerate.FileEnumerator.Provider-
> the file enumerator can be setup with the FileSourceBuilder:
>
> fileSourceBuilder.setFileEnumerator(new FileEnumerator.Provider() {
> @Override
> public FileEnumerator create() {
> // Do something
> return null;
> }
> })
>
>
> Hope it helps,
> FG
>


Future support for custom FileEnumerator in FileSource?

2022-02-01 Thread Kevin Lam
Hi all,

We're interested in being able to filter files using the new FileSource API
.
Are there plans to add it? If there's existing work, we would be happy to
help push this forward through contributions.

It seems like things are almost there. FileSource encapsulates filtering
functionality into FileEnumerator
.
However, the FileEnumerator is not parametrizable, it's currently hard-coded
.
One potential way to enable filtering files is to be able to pass a
custom FileEnumerator.

Thanks in advance,
Kevin


Re: Is FlinkKafkaProducer state compatible with KafkaSink sink? How to migrate?

2022-01-12 Thread Kevin Lam
Perfect, thanks for your consistently helpful and prompt responses Fabian!

On Wed, Jan 12, 2022 at 10:25 AM Fabian Paul  wrote:

> Hi Kevin,
>
> No, the state is not compatible but it is also not necessary because
> if the FlinkKafkaProducer is stopped with a savepoint all transactions
> are finalized and the new KafkaSink uses a different mechanism to
> track transaction ids. [1]
>
> It should be enough to recover from the savepoint with the KafkaSink
> and ignore the FlinkKafkaProducer state.
>
> Best,
> Fabian
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.14/release-notes/flink-1.14/#port-kafkasink-to-new-unified-sink-api-flip-143
>
> On Tue, Jan 11, 2022 at 9:58 PM Kevin Lam  wrote:
> >
> > Hi all,
> >
> > We're looking to migrating from FlinkKafkaProducer to the new KafkaSink
> for the new unified Sink API.
> >
> > Is the state compatible across the two Kafka sink APIs? If not, what's
> the best way to migrate from one to the other?
> >
> > Thanks in advance,
> > Kevin
>


Is FlinkKafkaProducer state compatible with KafkaSink sink? How to migrate?

2022-01-11 Thread Kevin Lam
Hi all,

We're looking to migrating from FlinkKafkaProducer to the new KafkaSink for
the new unified Sink API.

Is the state compatible across the two Kafka sink APIs? If not, what's the
best way to migrate from one to the other?

Thanks in advance,
Kevin


Re: Plans to update StreamExecutionEnvironment.readFiles to use the FLIP-27 compatible FileSource?

2022-01-11 Thread Kevin Lam
Hi Fabian,

No problem, thanks for the clarification. In terms of its importance, we
have some Flink applications running using
StreamExecutionEnvironment.readFiles
<https://nightlies.apache.org/flink/flink-docs-release-1.13/api/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.html#readFile-org.apache.flink.api.common.io.FileInputFormat-java.lang.String->,
so in order to adopt the new FileSource API, we would need to migrate those
applications. Ideally, we could migrate the state. If there isn't a way to
migrate state, it would be nice if there were some documentation or
guidance from the Flink community on how best to migrate.

Cheers,
Kevin

On Tue, Jan 11, 2022 at 10:19 AM Fabian Paul  wrote:

> Hi Kevin,
>
> Sorry for the misleading information. The FileSink is compatible with
> the predecessor but unfortunately, it is not the case for the
> FileSource. I updated the ticket accordingly. Perhaps there is a way
> to migrate the state but it would be a larger effort. Is this an
> important feature for you?
>
> Best,
> Fabian
>
> On Mon, Jan 10, 2022 at 3:58 PM Kevin Lam  wrote:
> >
> > Hi Fabian,
> >
> > Thanks for creating and sharing that ticket. I noticed the clause "The
> FileSource can already read the state of the previous version", a little
> off-topic from the original topic of this thread but I was wondering if you
> could elaborate on that. Can the new FileSource interoperate with the old
> .readFile operator state? Is there a smooth way to upgrade to the new
> FileSource API from the old one without losing state?
> >
> > Thanks!
> >
> > On Mon, Jan 10, 2022 at 7:20 AM Fabian Paul  wrote:
> >>
> >> Hi Kevin,
> >>
> >> I created a ticket to track the effort [1]. Unfortunately, we are
> >> already in the last few weeks of the release cycle for 1.15 so I
> >> cannot guarantee that someone can implement it until then.
> >>
> >> Best,
> >> Fabian
> >>
> >> [1] https://issues.apache.org/jira/browse/FLINK-25591
> >>
> >> On Fri, Jan 7, 2022 at 5:07 PM Kevin Lam  wrote:
> >> >
> >> > Hi all,
> >> >
> >> > Are there any plans to update StreamExecutionEnvironment.readFiles to
> use the new FLIP-27 compatible FileSource?
> >> >
> >> > readFiles supports some features via it's FileInputFormat like
> setNestedFileEnumeration and setFilesFilter that we'd be interested in
> continuing to use but it seems FileSource doesn't have that.
>


Re: Avro BulkFormat for the new FileSource API?

2022-01-10 Thread Kevin Lam
Hi David,

Awesome, wasn't aware of FLINK-24565. That's the kind of thing we were
looking for and will take a look at it. Thanks for sharing that!



On Fri, Jan 7, 2022 at 2:05 PM David Morávek 
wrote:

> Hi Kevin,
>
> I'm not as familiar with initiatives around the new sources, but it seems
> that the BulkFormat for Avro [1] has been added recently and will be
> released with the Flink 1.15.x.
>
> [1] https://issues.apache.org/jira/browse/FLINK-24565
>
> Best,
> D.
>
> On Fri, Jan 7, 2022 at 7:23 PM Kevin Lam  wrote:
>
>> Hi all,
>>
>> We're looking into using the new FileSource
>> <https://nightlies.apache.org/flink/flink-docs-release-1.14/api/java/org/apache/flink/connector/file/src/FileSource.html>
>> API, we see that there is a BulkFormat
>> <https://nightlies.apache.org/flink/flink-docs-release-1.14/api/java/org/apache/flink/connector/file/src/reader/BulkFormat.html>
>> for Parquet, via ParquetColumnarRowFormat
>> <https://nightlies.apache.org/flink/flink-docs-release-1.14/api/java/org/apache/flink/formats/parquet/ParquetColumnarRowInputFormat.html>.
>> Is there similar BulkFormat available or in the works for Avro files?
>>
>> I imagined it may be a common use-case in the community so wanted to
>> check here before we invest time implementing our own.
>>
>> Thanks in advance!
>>
>


Re: Plans to update StreamExecutionEnvironment.readFiles to use the FLIP-27 compatible FileSource?

2022-01-10 Thread Kevin Lam
Hi Fabian,

Thanks for creating and sharing that ticket. I noticed the clause "The
FileSource can already read the state of the previous version", a little
off-topic from the original topic of this thread but I was wondering if you
could elaborate on that. Can the new FileSource interoperate with the old
.readFile operator state? Is there a smooth way to upgrade to the new
FileSource API from the old one without losing state?

Thanks!

On Mon, Jan 10, 2022 at 7:20 AM Fabian Paul  wrote:

> Hi Kevin,
>
> I created a ticket to track the effort [1]. Unfortunately, we are
> already in the last few weeks of the release cycle for 1.15 so I
> cannot guarantee that someone can implement it until then.
>
> Best,
> Fabian
>
> [1] https://issues.apache.org/jira/browse/FLINK-25591
>
> On Fri, Jan 7, 2022 at 5:07 PM Kevin Lam  wrote:
> >
> > Hi all,
> >
> > Are there any plans to update StreamExecutionEnvironment.readFiles to
> use the new FLIP-27 compatible FileSource?
> >
> > readFiles supports some features via it's FileInputFormat like
> setNestedFileEnumeration and setFilesFilter that we'd be interested in
> continuing to use but it seems FileSource doesn't have that.
>


Avro BulkFormat for the new FileSource API?

2022-01-07 Thread Kevin Lam
Hi all,

We're looking into using the new FileSource

API, we see that there is a BulkFormat

for Parquet, via ParquetColumnarRowFormat
.
Is there similar BulkFormat available or in the works for Avro files?

I imagined it may be a common use-case in the community so wanted to check
here before we invest time implementing our own.

Thanks in advance!


Plans to update StreamExecutionEnvironment.readFiles to use the FLIP-27 compatible FileSource?

2022-01-07 Thread Kevin Lam
Hi all,

Are there any plans to update StreamExecutionEnvironment.readFiles

to use the new FLIP-27 compatible FileSource

?

readFiles supports some features via it's FileInputFormat
like setNestedFileEnumeration and setFilesFilter that we'd be interested in
continuing to use but it seems FileSource doesn't have that.


Python Interop with Java defined org.apache.flink.api.common.functions.Function classes?

2021-12-14 Thread Kevin Lam
Hi all,

We currently operate several Flink applications using the Scala API, and
run on kubernetes in Application mode. We're interested in researching the
Python API and how we can support Python for application developers that
prefer to use Python.

We have a common library which implements a number of useful sources and
sinks, as well as some implementations
of org.apache.flink.api.common.functions.Function, eg. a MapFunction for
computing and reporting latency metrics. We'd like to continue to use the
common library, and make it available to Python developers.

We understand that Java sources and sinks can be used in the Python API. Is
there a way to call Java org.apache.flink.api.common.functions.Function
implementations (eg. MapFunction, ProcessFunction classes) from the Python
API [1]? If not, are there any plans to support this?

Thanks in advance!


[1] imagining something like this:

```
env.set_parallelism(1)
ds = env.add_source(MySource())
# process the data stream with a Java function
ds = (ds
.map(JavaMapFunction("com.example.MyJavaMapFunction")))
)
```


Re: GCS/Object Storage Rate Limiting

2021-12-08 Thread Kevin Lam
Hey David,

Thanks for the response. The retry eventually succeeds, but I was wondering
if there was anything that people in the community have done to avoid
GCS/S3 rate-limiting issues. The retries do result in it taking longer for
all the task managers to recover and register.

On Mon, Dec 6, 2021 at 3:42 AM David Morávek  wrote:

> Hi Kevin,
>
> Flink comes with two schedulers for streaming:
> - Default
> - Adaptive (opt-in)
>
> Adaptive is still in experimental phase and doesn't support local recover.
> You're most likely using the first one, so you should be OK.
>
> Can you elaborate on this a bit? We aren't changing the parallelism when
>> restoring.
>>
>
> Splitting / merging of the rocksdb based operator checkpoint is currently
> an expensive operation. If the parallelism remains unchanged, you should be
> OK, the majority of time for the operator state restore will be spend on
> download of the rocksdb snapshot.
>
> Our checkpoint is about 900GB, and we have 256 TaskManagers with a
>> parallelism of 512.
>>
>
> This could definitely generate lot of concurrent requests when restoring
> the state.
>
> Does the restore operation fail, or the retry mechanism is sufficient to
> work around this?
>
> D.
>
> On Thu, Dec 2, 2021 at 7:54 PM Kevin Lam  wrote:
>
>> HI David,
>>
>> Thanks for your response.
>>
>> What's the DefaultScheduler you're referring to? Is that available in
>> Flink 1.13.1 (the version we are using)?
>>
>> How large is the state you're restoring from / how many TMs does the job
>>> consume / what is the parallelism?
>>
>>
>> Our checkpoint is about 900GB, and we have 256 TaskManagers with a
>> parallelism of 512.
>>
>> Also things could get even worse if the parallelism that has been used
>>> for taking the checkpoint is different from the one you're trying to
>>> restore with (especially with RocksDB).
>>>
>>
>> Can you elaborate on this a bit? We aren't changing the parallelism when
>> restoring.
>>
>> On Thu, Dec 2, 2021 at 10:48 AM David Morávek  wrote:
>>
>>> Hi Kevin,
>>>
>>> this happens only when the pipeline is started up from savepoint /
>>> retained checkpoint right? Guessing from the "path" you've shared it seems
>>> like a RockDB based retained checkpoint. In this case all task managers
>>> need to pull state files from the object storage in order to restore. This
>>> can indeed be a heavy operation especially when restore a large state with
>>> high parallelism.
>>>
>>> Recovery from failure should be faster (with DefaultScheduler) as we can
>>> re-use the local files that are already present on TaskManagers.
>>>
>>> How large is the state you're restoring from / how many TMs does the job
>>> consume / what is the parallelism?
>>>
>>> Also things could get even worse if the parallelism that has been used
>>> for taking the checkpoint is different from the one you're trying to
>>> restore with (especially with RocksDB).
>>>
>>> Best,
>>> D.
>>>
>>> On Thu, Dec 2, 2021 at 4:29 PM Kevin Lam  wrote:
>>>
>>>> Hi all,
>>>>
>>>> We're running a large (256 task managers with 4 task slots each) Flink
>>>> Cluster with High Availability enabled, on Kubernetes, and use Google Cloud
>>>> Storage (GCS) as our object storage for the HA metadata. In addition, our
>>>> Flink application writes out to GCS from one of its sinks via streaming
>>>> file sink + GCS connector.
>>>>
>>>> We observed the following types of errors when running our application:
>>>>
>>>> ```
>>>>
>>>> INFO: Encountered status code 429 when sending GET request to URL '
>>>> https://storage.googleapis.com/download/storage/v1/b//o/checkpoints%2F%2Fshared%2F13721c52-18d8-4782-80ab-1ed8a15d9ad5?alt=media&generation=1638448883568946'.
>>>> Delegating to response handler for possible retry. [CONTEXT
>>>> ratelimit_period="10 SECONDS [skipped: 8]" ]
>>>>
>>>> ```
>>>>
>>>> ```
>>>>  INFO: Encountered status code 503 when sending POST request to URL '
>>>> https://storage.googleapis.com/upload/storage/v1/b//o?uploadType=multipart'.
>>>> Delegating to response handler for possible retry.
>>>> ```
>>>>
>>>> They typically happen upon cluster start-up, when all the task managers
>>>> are registering with the jobmanager. We've also seen them occur as a result
>>>> of output from our sink operator as well.
>>>>
>>>> Has anyone else encountered similar issues? Any practices you can
>>>> suggest?
>>>>
>>>> Advice appreciated!
>>>>
>>>> Thanks
>>>>
>>>


Re: GCS/Object Storage Rate Limiting

2021-12-02 Thread Kevin Lam
HI David,

Thanks for your response.

What's the DefaultScheduler you're referring to? Is that available in Flink
1.13.1 (the version we are using)?

How large is the state you're restoring from / how many TMs does the job
> consume / what is the parallelism?


Our checkpoint is about 900GB, and we have 256 TaskManagers with a
parallelism of 512.

Also things could get even worse if the parallelism that has been used for
> taking the checkpoint is different from the one you're trying to restore
> with (especially with RocksDB).
>

Can you elaborate on this a bit? We aren't changing the parallelism when
restoring.

On Thu, Dec 2, 2021 at 10:48 AM David Morávek  wrote:

> Hi Kevin,
>
> this happens only when the pipeline is started up from savepoint /
> retained checkpoint right? Guessing from the "path" you've shared it seems
> like a RockDB based retained checkpoint. In this case all task managers
> need to pull state files from the object storage in order to restore. This
> can indeed be a heavy operation especially when restore a large state with
> high parallelism.
>
> Recovery from failure should be faster (with DefaultScheduler) as we can
> re-use the local files that are already present on TaskManagers.
>
> How large is the state you're restoring from / how many TMs does the job
> consume / what is the parallelism?
>
> Also things could get even worse if the parallelism that has been used for
> taking the checkpoint is different from the one you're trying to restore
> with (especially with RocksDB).
>
> Best,
> D.
>
> On Thu, Dec 2, 2021 at 4:29 PM Kevin Lam  wrote:
>
>> Hi all,
>>
>> We're running a large (256 task managers with 4 task slots each) Flink
>> Cluster with High Availability enabled, on Kubernetes, and use Google Cloud
>> Storage (GCS) as our object storage for the HA metadata. In addition, our
>> Flink application writes out to GCS from one of its sinks via streaming
>> file sink + GCS connector.
>>
>> We observed the following types of errors when running our application:
>>
>> ```
>>
>> INFO: Encountered status code 429 when sending GET request to URL '
>> https://storage.googleapis.com/download/storage/v1/b//o/checkpoints%2F%2Fshared%2F13721c52-18d8-4782-80ab-1ed8a15d9ad5?alt=media&generation=1638448883568946'.
>> Delegating to response handler for possible retry. [CONTEXT
>> ratelimit_period="10 SECONDS [skipped: 8]" ]
>>
>> ```
>>
>> ```
>>  INFO: Encountered status code 503 when sending POST request to URL '
>> https://storage.googleapis.com/upload/storage/v1/b//o?uploadType=multipart'.
>> Delegating to response handler for possible retry.
>> ```
>>
>> They typically happen upon cluster start-up, when all the task managers
>> are registering with the jobmanager. We've also seen them occur as a result
>> of output from our sink operator as well.
>>
>> Has anyone else encountered similar issues? Any practices you can
>> suggest?
>>
>> Advice appreciated!
>>
>> Thanks
>>
>


GCS/Object Storage Rate Limiting

2021-12-02 Thread Kevin Lam
Hi all,

We're running a large (256 task managers with 4 task slots each) Flink
Cluster with High Availability enabled, on Kubernetes, and use Google Cloud
Storage (GCS) as our object storage for the HA metadata. In addition, our
Flink application writes out to GCS from one of its sinks via streaming
file sink + GCS connector.

We observed the following types of errors when running our application:

```

INFO: Encountered status code 429 when sending GET request to URL '
https://storage.googleapis.com/download/storage/v1/b//o/checkpoints%2F%2Fshared%2F13721c52-18d8-4782-80ab-1ed8a15d9ad5?alt=media&generation=1638448883568946'.
Delegating to response handler for possible retry. [CONTEXT
ratelimit_period="10 SECONDS [skipped: 8]" ]

```

```
 INFO: Encountered status code 503 when sending POST request to URL '
https://storage.googleapis.com/upload/storage/v1/b//o?uploadType=multipart'.
Delegating to response handler for possible retry.
```

They typically happen upon cluster start-up, when all the task managers are
registering with the jobmanager. We've also seen them occur as a result of
output from our sink operator as well.

Has anyone else encountered similar issues? Any practices you can suggest?

Advice appreciated!

Thanks


Is there a way to update checkpoint configuration for a job "in-place"?

2021-11-02 Thread Kevin Lam
Hi all,

We run a Flink application on Kubernetes in Application Mode using Kafka
with exactly-once-semantics and high availability.

We are looking into a specific failure scenario: a flink job that has too
short a checkpoint timeout (execution.checkpointing.timeout) and at some
point during the job's execution, checkpoints begin to fail.

Is there a way to update the checkpoint timeout
(execution.checkpointing.timeout) of this job, in-place ie. without
creating a new job, or restoring from an old savepoint/checkpoint? Note:
one idea may be to take a savepoint, and then restore from that savepoint
with the new configuration, however this is not possible because if
checkpoints are timing out, so are savepoints and thus save points cannot
be taken. Are there any other ways to handle this situation?

We want to ensure exactly-once semantics are respected.

Thanks in advance!


Re: RocksDB: Spike in Memory Usage Post Restart

2021-10-06 Thread Kevin Lam
Hi Fabian,

Yes I can tell you a bit more about the job we are seeing the problem with.
I'll simplify things a bit but this captures the essence:

1. Input datastreams are from a few kafka sources that we intend to join.
2. We wrap the datastreams we want to join into a common container class
and key them on the join key.
3. Union and process the datastreams with a KeyedProcessFunction which
holds the latest value seen for each source in ValueStates, and emits an
output that is the function of the stored ValueStates each time a new value
comes in.
4. We have to support arbitrarily late arriving data, so we don't window,
and just keep everything in ValueState.
5. The state we want to support is very large, on the order of several TBs.

On Wed, Oct 6, 2021 at 10:50 AM Fabian Paul 
wrote:

> Hi Kevin,
>
> Since you are seeing the problem across multiple Flink versions and with
> the default RocksDb and custom configuration it might be related
>  to something else. A lot of different components can allocate direct
> memory i.e. some filesystem implementations, the connectors or some user
> grpc dependency.
>
>
> Can you tell use a bit more about the job you are seeing the problem with?
>
> Best,
> Fabian
>
>


Re: RocksDB: Spike in Memory Usage Post Restart

2021-10-06 Thread Kevin Lam
Hi Fabian,

Thanks for collecting feedback. Here's the answers to your questions:

1. Yes, we enabled incremental checkpoints for our job by setting
`state.backend.incremental` to true. As for whether the checkpoint we
recover from is incremental or not, I'm not sure how to determine that.
It's whatever Flink does by default with incremental checkpoints enabled.

2. Yes this was on purpose, we had tuned our job to work well on SSDs. We
have also run jobs with those parameters unset and using defaults, and
still have the same OOM issues.

Thanks for the pointer, yes we've been looking at the RocksDB metrics. They
haven't indicated to us what the issue is yet.

On Wed, Oct 6, 2021 at 3:21 AM Fabian Paul  wrote:

> Hi Kevin,
>
> Sorry for the late reply. I collected some feedback from other folks and
> have two more questions.
>
> 1. Did you enable incremental checkpoints for your job and is the
> checkpoint you recover from incremental?
>
> 2. I saw in your configuration that you set
> `state.backend.rocksdb.block.cache-size` and
> `state.backend.rocksdb.predefined.options` by doing
>  so you overwrite the values Flink automatically sets. Can you confirm
> that this is on purpose? The value for block.cache-size seems to be very
> small.
>
> You can also enable the native RocksDb metrics [1] to get a more detail
> view of the RocksDb memory consumption but be carefully because it may
> degrade the performance of your job.
>
> Best,
> Fabian
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#rocksdb-native-metrics
>
>
>


Re: RocksDB: Spike in Memory Usage Post Restart

2021-10-05 Thread Kevin Lam
i was reading a bit about RocksDb and it seems the Java version is somewhat
particular about how it should be cleaned up to ensure all resources are
cleaned up:

<https://github.com/facebook/rocksdb/wiki/RocksJava-Basics#memory-management>
ttps://github.com/facebook/rocksdb/wiki/RocksJava-Basics#memory-management
<https://github.com/facebook/rocksdb/wiki/RocksJava-Basics#memory-management>

   - "Many of the Java Objects used in the RocksJava API will be backed by
   C++ objects for which the Java Objects have ownership. As C++ has no notion
   of automatic garbage collection for its heap in the way that Java does, we
   must explicitly free the memory used by the C++ objects when we are
   finished with them."

Column families also have a specific close procedure

<https://github.com/facebook/rocksdb/wiki/RocksJava-Basics#opening-a-database-with-column-families>
https://github.com/facebook/rocksdb/wiki/RocksJava-Basics#opening-a-database-with-column-families

   - "It is important to note that when working with Column Families in
   RocksJava, there is a very specific order of destruction that must be
   obeyed for the database to correctly free all resources and shutdown."

When a running job fails and a running TaskManager restores from
checkpoint, is the old Embedded RocksDb being cleaned up properly? I wasn't
really sure where to look in the Flink source code to verify this.

On Mon, Oct 4, 2021 at 4:56 PM Kevin Lam  wrote:

> We tried with 1.14.0, unfortunately we still run into the issue. Any
> thoughts or suggestions?
>
> On Mon, Oct 4, 2021 at 9:09 AM Kevin Lam  wrote:
>
>> Hi Fabian,
>>
>> We're using our own image built from the official Flink docker image, so
>> we should have the code to use jemalloc in the docker entrypoint.
>>
>> I'm going to give 1.14 a try and will let you know how it goes.
>>
>> On Mon, Oct 4, 2021 at 8:29 AM Fabian Paul 
>> wrote:
>>
>>> Hi Kevin,
>>>
>>> We bumped the RocksDb version with Flink 1.14 which we thought increases
>>> the memory control [1]. In the past we also saw problems with the allocator
>>> used of the OS. We switched to use jemalloc within our docker images which
>>> has a better memory fragmentation [2]. Are you using the official Flink
>>> docker image or did you build your own?
>>>
>>> I am also pulling in yun tang who is more familiar with Flink’s state
>>> backend. Maybe he has an immediate idea about your problem.
>>>
>>> Best,
>>> Fabian
>>>
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-14482
>>> [2]
>>> https://lists.apache.org/thread.html/r596a19f8cf7278bcf9e30c3060cf00562677d4be072050444a5caf99%40%3Cdev.flink.apache.org%3E
>>> <https://lists.apache.org/thread.html/r596a19f8cf7278bcf9e30c3060cf00562677d4be072050444a5caf99@%3Cdev.flink.apache.org%3E>
>>>
>>>
>>>


Re: RocksDB: Spike in Memory Usage Post Restart

2021-10-04 Thread Kevin Lam
We tried with 1.14.0, unfortunately we still run into the issue. Any
thoughts or suggestions?

On Mon, Oct 4, 2021 at 9:09 AM Kevin Lam  wrote:

> Hi Fabian,
>
> We're using our own image built from the official Flink docker image, so
> we should have the code to use jemalloc in the docker entrypoint.
>
> I'm going to give 1.14 a try and will let you know how it goes.
>
> On Mon, Oct 4, 2021 at 8:29 AM Fabian Paul 
> wrote:
>
>> Hi Kevin,
>>
>> We bumped the RocksDb version with Flink 1.14 which we thought increases
>> the memory control [1]. In the past we also saw problems with the allocator
>> used of the OS. We switched to use jemalloc within our docker images which
>> has a better memory fragmentation [2]. Are you using the official Flink
>> docker image or did you build your own?
>>
>> I am also pulling in yun tang who is more familiar with Flink’s state
>> backend. Maybe he has an immediate idea about your problem.
>>
>> Best,
>> Fabian
>>
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-14482
>> [2]
>> https://lists.apache.org/thread.html/r596a19f8cf7278bcf9e30c3060cf00562677d4be072050444a5caf99%40%3Cdev.flink.apache.org%3E
>> <https://lists.apache.org/thread.html/r596a19f8cf7278bcf9e30c3060cf00562677d4be072050444a5caf99@%3Cdev.flink.apache.org%3E>
>>
>>
>>


Re: RocksDB: Spike in Memory Usage Post Restart

2021-10-04 Thread Kevin Lam
Hi Fabian,

We're using our own image built from the official Flink docker image, so we
should have the code to use jemalloc in the docker entrypoint.

I'm going to give 1.14 a try and will let you know how it goes.

On Mon, Oct 4, 2021 at 8:29 AM Fabian Paul  wrote:

> Hi Kevin,
>
> We bumped the RocksDb version with Flink 1.14 which we thought increases
> the memory control [1]. In the past we also saw problems with the allocator
> used of the OS. We switched to use jemalloc within our docker images which
> has a better memory fragmentation [2]. Are you using the official Flink
> docker image or did you build your own?
>
> I am also pulling in yun tang who is more familiar with Flink’s state
> backend. Maybe he has an immediate idea about your problem.
>
> Best,
> Fabian
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-14482
> [2]
> https://lists.apache.org/thread.html/r596a19f8cf7278bcf9e30c3060cf00562677d4be072050444a5caf99%40%3Cdev.flink.apache.org%3E
> 
>
>
>


Re: RocksDB: Spike in Memory Usage Post Restart

2021-10-01 Thread Kevin Lam
Hi Fabian,

Thanks for your response.

Sure, let me tell you a bit more about the job.

   - Flink version 1.13.1 (I also tried 1.13.2 because I saw FLINK-22886
   , but this didn't
   help)
   - We're running on kubernetes in an application cluster.
   taskmanager.memory.process.size = 16GB, but we give our task manager pods a
   memory limit of 20GB. Our full config is below [0]

We've followed the steps at
https://erikwramner.files.wordpress.com/2017/10/native-memory-leaks-in-java.pdf
,
https://docs.oracle.com/javase/8/docs/technotes/guides/troubleshoot/tooldescr007.html,
and
https://technology.blog.gov.uk/2015/12/11/using-jemalloc-to-get-to-the-bottom-of-a-memory-leak/
to try and diagnose but this didn't really give us something to go off of.

Notably, we baselined the jcmd memory profile (jcmd $(pgrep java)
VM.native_memory baseline) and then ran a diff before and after the
post-restart memory spike, and nothing in there reflects the few GB of
usage increase.

What was added to Flink 1.14? What other issues have you seen in the past?

Also I came across
https://medium.com/expedia-group-tech/solving-a-native-memory-leak-71fe4b6f9463
when researching rocksdb. It suggests that unclosed RocksDB iterators can
be a source of memory leaks. Is there any chance there are iterators being
left open post job restart?

[0]
```
jobmanager.memory.process.size: 16Gb

taskmanager.rpc.port: 6122
taskmanager.memory.process.size: 16Gb
taskmanager.memory.managed.fraction: 0.4
taskmanager.numberOfTaskSlots: 4

high-availability.storageDir: 
kubernetes.cluster-id: 
kubernetes.namespace: 
high-availability.jobmanager.port: 50010
high-availability:
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
restart-strategy: exponential-delay
resourcemanager.taskmanager-registration.timeout: 30 min

blob.server.port: 6124
queryable-state.proxy.ports: 6125

heartbeat.interval: 6
heartbeat.timeout: 12

web.timeout: 180
rest.flamegraph.enabled: true

state.backend: rocksdb
state.checkpoints.dir: 
state.savepoints.dir: 

state.backend.rocksdb.localdir: /rocksdb
state.backend.incremental: true
state.backend.fs.memory-threshold: 1m
state.backend.rocksdb.thread.num: 4
state.backend.rocksdb.checkpoint.transfer.thread.num: 4
state.backend.rocksdb.block.blocksize: 16KB
state.backend.rocksdb.block.cache-size: 64MB
state.backend.rocksdb.predefined-options: FLASH_SSD_OPTIMIZED

jobmanager.execution.failover-strategy: region

metrics.scope.jm: flink.jobmanager
metrics.scope.jm.job: flink.jobmanager.job
metrics.scope.tm: flink.taskmanager
metrics.scope.tm.job: flink.taskmanager.job
metrics.scope.task: flink.task
metrics.scope.operator: flink.operator

state.backend.rocksdb.metrics.actual-delayed-write-rate: true
state.backend.rocksdb.metrics.background-errors: true
state.backend.rocksdb.metrics.block-cache-capacity: true
state.backend.rocksdb.metrics.block-cache-pinned-usage: true
state.backend.rocksdb.metrics.block-cache-usage: true
state.backend.rocksdb.metrics.compaction-pending: true
state.backend.rocksdb.metrics.cur-size-active-mem-table: true
state.backend.rocksdb.metrics.cur-size-all-mem-tables: true
state.backend.rocksdb.metrics.estimate-live-data-size: true
state.backend.rocksdb.metrics.estimate-num-keys: true
state.backend.rocksdb.metrics.estimate-pending-compaction-bytes: true
state.backend.rocksdb.metrics.estimate-table-readers-mem: true
state.backend.rocksdb.metrics.is-write-stopped: true
state.backend.rocksdb.metrics.mem-table-flush-pending: true
state.backend.rocksdb.metrics.num-deletes-active-mem-table: true
state.backend.rocksdb.metrics.num-deletes-imm-mem-tables: true
state.backend.rocksdb.metrics.num-entries-active-mem-table: true
state.backend.rocksdb.metrics.num-entries-imm-mem-tables: true
state.backend.rocksdb.metrics.num-immutable-mem-table: true
state.backend.rocksdb.metrics.num-live-versions: true
state.backend.rocksdb.metrics.num-running-compactions: true
state.backend.rocksdb.metrics.num-running-flushes: true
state.backend.rocksdb.metrics.num-snapshots: true
state.backend.rocksdb.metrics.size-all-mem-tables: true

env.java.opts: -Djavax.net.ssl.keyStore=/app/kafka/certs/certificate.jks
-Djavax.net.ssl.keyStorePassword=changeit -Dcom.sun.management.jmxremote
-Dcom.sun.management.jmxremote.authenticate=false
-Dcom.sun.management.jmxremote.ssl=false
-Dcom.sun.management.jmxremote.local.only=false
-Dcom.sun.management.jmxremote.port=1099
-Dcom.sun.management.jmxremote.rmi.port=1099
-Djava.rmi.server.hostname=127.0.0.1 -XX:NativeMemoryTracking=detail
env.java.opts.taskmanager: -Dtaskmanager.host=10.12.72.181
-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/rocksdb/memdump.hprof
-Djava.rmi.server.hostname=127.0.0.1 -XX:NativeMemoryTracking=detail
jobmanager.rpc.address: flink-jobmanager
query.server.port: 6125
```

On Fri, Oct 1, 2021 at 9:38 AM Fabian Paul  wrote:

> Hi Kevin,
>
> You are right RocksDB is probably responsible for the memory c

RocksDB: Spike in Memory Usage Post Restart

2021-09-30 Thread Kevin Lam
Hi all,

We're debugging an issue with OOMs that occurs on our jobs shortly after a
restore from checkpoint. Our application is running on kubernetes and uses
RocksDB as it's state backend.

We reproduced the issue on a small cluster of 2 task managers. If we killed
a single task manager, we noticed that after restoring from checkpoint, the
untouched task manager has an elevated memory footprint (see the blue line
for the surviving task manager):

[image: image.png]
If we kill the newest TM (yellow line) again, after restoring the surviving
task manager gets OOM killed.

We looked at the OOMKiller Report and it seems that the memory is not
coming from the JVM but we're unsure of the source. It seems like something
is allocating native memory that the JVM is not aware of.

We're suspicious of RocksDB. Has anyone seen this kind of issue before? Is
it possible there's some kind of memory pressure or memory leak coming from
RocksDB that only presents itself when a job is restarted? Perhaps
something isn't cleaned up?

Any help would be appreciated.


Re: Not able to avoid Dynamic Class Loading

2021-09-22 Thread Kevin Lam
Sorry for the late reply here, I'm just returning to this now.

Interesting re: the avro version, we're using 1.10.0 in our application
jar. But maybe this is somehow being clobbered when we try to move it into
/lib vs. /usrlib to avoid dynamic class loading. Is it possible that's
happening?

On Fri, Aug 27, 2021 at 2:28 PM Arvid Heise  wrote:

> I guess the best option is to attach a debugger and set a breakpoint at
> the NotSerializableException. There definitively has to be a
> non-serializable component in that FlinkKafkaConsumer and it can only come
> from the DeserializationSchema or Properties.
> Maybe the consumer internally caches some values generated by your schema
> at some point but I couldn't think anything obvious. There is a high chance
> that it comes by your code and only activates on cluster.
> It would be nice to hear back from you when you have found that respective
> field. It should be 2 object references deep in FlinkKafkaConsumer (2
> writeObject0 before the first writeArray that most likely corresponds to
> your RecordSchema)
>
> Btw which Avro version are you using? It looks like Avro 1.10.X finally
> has serializable schema... Maybe this might also explain why it works in
> one submission and not in the other?
>
> On Fri, Aug 27, 2021 at 4:10 PM Kevin Lam  wrote:
>
>> There's no inner classes, and none of the fields
>> of DebeziumAvroRegistryDeserializationSchema have an Avro schema, even when
>> expanded, including KafkaClusterConfig. KafkaClusterConfig is just composed
>> of Strings and Booleans.
>>
>> DebeziumAvroRegistryDeserializationSchema has a field that initializes a
>> io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient but
>> this is marked @transient and lazy in Scala, similarly the deserializer
>> uses that client to initialize a transient+lazy field which builds a
>> KafkaAvroDeserializer
>>
>>>


Re: TaskManagers OOM'ing for Flink App with very large state only when restoring from checkpoint

2021-09-13 Thread Kevin Lam
Thanks for your replies Alexis and Guowei.

We're using 1.13.1 version of Flink, and using the DataStream API.

I'll try the savepoint, and take a look at that IO article, thank you.

Please let me know if anything else comes to mind!

On Mon, Sep 13, 2021 at 3:05 AM Alexis Sarda-Espinosa <
alexis.sarda-espin...@microfocus.com> wrote:

> I'm not very knowledgeable when it comes to Linux memory management, but
> do note that Linux (and by extension Kubernetes) takes disk IO into account
> when deciding whether a process is using more memory than it's allowed to,
> see e.g.
> https://faun.pub/how-much-is-too-much-the-linux-oomkiller-and-used-memory-d32186f29c9d
>
> Regards,
> Alexis.
>
> --
> *From:* Guowei Ma 
> *Sent:* Monday, September 13, 2021 8:35 AM
> *To:* Kevin Lam 
> *Cc:* user 
> *Subject:* Re: TaskManagers OOM'ing for Flink App with very large state
> only when restoring from checkpoint
>
> Hi, Kevin
>
> 1. Could you give me some specific information, such as what version of
> Flink is you using, and is it using DataStream or SQL?
> 2. As far as I know, RocksDB will put state on disk, so it will not
> consume memory all the time and cause OOM in theory.
> So you can see if there are any object leaks by analyzing the Jmap of
> TaskManger after Failover.
> 3. There is another way, you can trigger a save point first, and then
> resume the job from the save point to see if there is still OOM,
>  if not, then it is likely to be related to your application code.
>
> Best,
> Guowei
>
>
> On Sat, Sep 11, 2021 at 2:01 AM Kevin Lam  wrote:
>
> Hi all,
>
> We've seen scenarios where TaskManagers will begin to OOM, shortly after a
> job restore from checkpoint. Our flink app has a very large state (100s of
> GB) and we use RocksDB as a backend.
>
> Our repro is something like this: run the job for an hour and let it
> accumulate state, kill a task manager. The job restores properly, but then
> minutes later task managers begin to be killed on K8S due to OOM, and this
> causes a degenerate state where the job restores and new OOMs cause the job
> to restore again and it never recovers.
>
> We've tried increasing the TaskManager memory (doubled), and observed that
> OOMs still happen even when the allocated k8s container memory is not maxed
> out.
>
> Can you shed some light on what happens during a restore process? How are
> checkpoints loaded, and how does this affect the memory pressure of task
> managers (that for eg. have had a task running, got it cancelled, and
> re-assigned a new task as part of restore)?
>
> Any help is appreciated!
>
>


TaskManagers OOM'ing for Flink App with very large state only when restoring from checkpoint

2021-09-10 Thread Kevin Lam
Hi all,

We've seen scenarios where TaskManagers will begin to OOM, shortly after a
job restore from checkpoint. Our flink app has a very large state (100s of
GB) and we use RocksDB as a backend.

Our repro is something like this: run the job for an hour and let it
accumulate state, kill a task manager. The job restores properly, but then
minutes later task managers begin to be killed on K8S due to OOM, and this
causes a degenerate state where the job restores and new OOMs cause the job
to restore again and it never recovers.

We've tried increasing the TaskManager memory (doubled), and observed that
OOMs still happen even when the allocated k8s container memory is not maxed
out.

Can you shed some light on what happens during a restore process? How are
checkpoints loaded, and how does this affect the memory pressure of task
managers (that for eg. have had a task running, got it cancelled, and
re-assigned a new task as part of restore)?

Any help is appreciated!


Re: Not able to avoid Dynamic Class Loading

2021-08-27 Thread Kevin Lam
There's no inner classes, and none of the fields
of DebeziumAvroRegistryDeserializationSchema have an Avro schema, even when
expanded, including KafkaClusterConfig. KafkaClusterConfig is just composed
of Strings and Booleans.

DebeziumAvroRegistryDeserializationSchema has a field that initializes a
io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient but
this is marked @transient and lazy in Scala, similarly the deserializer
uses that client to initialize a transient+lazy field which builds a
KafkaAvroDeserializer

>


Re: Not able to avoid Dynamic Class Loading

2021-08-26 Thread Kevin Lam
I also tested serializing an instance of `OurSource` with
`org.apache.commons.lang3.SerializationUtils.clone`  and it worked fine.

On Thu, Aug 26, 2021 at 3:27 PM Kevin Lam  wrote:

> Hi Arvid,
>
> Got it, we don't use Avro.schema inside of
> DebeziumAvroRegistryDeserializationSchema, but I tried to test it with a
> unit test and `org.apache.commons.lang3.SerializationUtils.clone` runs
> successfully.
>
> I'm curious as to why things work (are serializable) when we use dynamic
> classloading, but do not work when we put this in lib/ and bypass the
> dynamic loading--any ideas there?
>
>
>
> On Thu, Aug 26, 2021 at 11:05 AM Arvid Heise  wrote:
>
>> Hi Kevin,
>>
>> the consumer needs to be serializable. Apparently you are also
>> serializing the Avro schema (probably as part of your
>> DebeziumAvroRegistryDeserializationSchema) and that fails. You may want to
>> copy our SerializableAvroSchema [1]
>>
>> Make sure that everything is serializable. You can check that in a unit
>> test by using org.apache.commons.lang3.SerializationUtils.clone.
>>
>> [1]
>> https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/SerializableAvroSchema.java#L34-L34
>>
>> On Thu, Aug 26, 2021 at 3:16 PM Kevin Lam  wrote:
>>
>>> Hi!
>>>
>>> We're using 1.13.1. We have a class in our user code that extends
>>> FlinkKafkaConsumer, that's built for reading avro records from Kafka.
>>> However it doesn't hold any Schema objects as fields so I'm a little
>>> confused.
>>>
>>> Something like this:
>>>
>>> ```
>>> class OurSource[T <: ClassTag: TypeInformation: Decoder](
>>>   val name: String,
>>>   val topic: String,
>>>   val clusterConfig: KafkaClusterConfig,
>>>   sslConfig: KafkaSSLConfig,
>>>   offsetReset: Option[OffsetReset.Value] = None,
>>>   offsetProvider: Option[OffsetProvider] = None
>>> ) extends FlinkKafkaConsumer[Option[T]](
>>> topic,
>>> new DebeziumAvroRegistryDeserializationSchema[T](
>>>   clusterConfig,
>>>   KafkaConfiguration.sslProperties(sslConfig)
>>> ),
>>> CollectionUtil.mapToProperties(
>>>   KafkaConfiguration.consumerProperties(clusterConfig, name,
>>> isolationLevel = "read_committed") ++
>>> KafkaConfiguration.sslProperties(sslConfig)
>>> )
>>>   )
>>> ```
>>>
>>> I understand how classloading may change, but why would that change
>>> whether we hit this serialization issue or not?
>>>
>>> On Wed, Aug 25, 2021 at 10:17 PM Caizhi Weng 
>>> wrote:
>>>
>>>> Hi!
>>>>
>>>> What Flink version are you using? In current Flink code base
>>>> FlinkKafkaConsumer does not contain fields related to Avro.
>>>>
>>>> Jars in usrlib has a higher priority to be loaded than jars in lib. So
>>>> if there is another FlinkKafkaConsumer class in your user jar then it might
>>>> affect class loading and thus affect this issue.
>>>>
>>>> Kevin Lam  于2021年8月25日周三 下午11:18写道:
>>>>
>>>>> Hi all,
>>>>>
>>>>> I'm trying to avoid dynamic class loading my user code [0] due to a
>>>>> suspected classloading leak, but when I put my application jar into /lib
>>>>> instead of /usrlib, I run into the following error:
>>>>>
>>>>> ```
>>>>> The main method caused an error: The implementation of the
>>>>> FlinkKafkaConsumer is not serializable. The object probably contains or
>>>>> references non serializable fields.
>>>>> ```
>>>>>
>>>>> which specifically seems to be caused by
>>>>> ```
>>>>> java.io.NotSerializableException: org.apache.avro.Schema$LongSchema
>>>>> ```
>>>>>
>>>>> What's curious to me about this is the error does not occur when we
>>>>> use dynamic classloading and put our application jar into /usrlib.
>>>>>
>>>>> Any ideas what's going on? It would seem to us that the method of
>>>>> loading the classes shouldn't impact whether or not something is
>>>>> serialized.
>>>>>
>>>>> Appreciate any help, thanks!
>>>>>
>>>>> [0]
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/debugging_classloading/#avoiding-dynamic-classloading-for-user-code
>>>>>
>>>>


Re: Not able to avoid Dynamic Class Loading

2021-08-26 Thread Kevin Lam
Hi Arvid,

Got it, we don't use Avro.schema inside of
DebeziumAvroRegistryDeserializationSchema, but I tried to test it with a
unit test and `org.apache.commons.lang3.SerializationUtils.clone` runs
successfully.

I'm curious as to why things work (are serializable) when we use dynamic
classloading, but do not work when we put this in lib/ and bypass the
dynamic loading--any ideas there?



On Thu, Aug 26, 2021 at 11:05 AM Arvid Heise  wrote:

> Hi Kevin,
>
> the consumer needs to be serializable. Apparently you are also serializing
> the Avro schema (probably as part of your
> DebeziumAvroRegistryDeserializationSchema) and that fails. You may want to
> copy our SerializableAvroSchema [1]
>
> Make sure that everything is serializable. You can check that in a unit
> test by using org.apache.commons.lang3.SerializationUtils.clone.
>
> [1]
> https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/SerializableAvroSchema.java#L34-L34
>
> On Thu, Aug 26, 2021 at 3:16 PM Kevin Lam  wrote:
>
>> Hi!
>>
>> We're using 1.13.1. We have a class in our user code that extends
>> FlinkKafkaConsumer, that's built for reading avro records from Kafka.
>> However it doesn't hold any Schema objects as fields so I'm a little
>> confused.
>>
>> Something like this:
>>
>> ```
>> class OurSource[T <: ClassTag: TypeInformation: Decoder](
>>   val name: String,
>>   val topic: String,
>>   val clusterConfig: KafkaClusterConfig,
>>   sslConfig: KafkaSSLConfig,
>>   offsetReset: Option[OffsetReset.Value] = None,
>>   offsetProvider: Option[OffsetProvider] = None
>> ) extends FlinkKafkaConsumer[Option[T]](
>> topic,
>> new DebeziumAvroRegistryDeserializationSchema[T](
>>   clusterConfig,
>>   KafkaConfiguration.sslProperties(sslConfig)
>> ),
>> CollectionUtil.mapToProperties(
>>   KafkaConfiguration.consumerProperties(clusterConfig, name,
>> isolationLevel = "read_committed") ++
>> KafkaConfiguration.sslProperties(sslConfig)
>> )
>>   )
>> ```
>>
>> I understand how classloading may change, but why would that change
>> whether we hit this serialization issue or not?
>>
>> On Wed, Aug 25, 2021 at 10:17 PM Caizhi Weng 
>> wrote:
>>
>>> Hi!
>>>
>>> What Flink version are you using? In current Flink code base
>>> FlinkKafkaConsumer does not contain fields related to Avro.
>>>
>>> Jars in usrlib has a higher priority to be loaded than jars in lib. So
>>> if there is another FlinkKafkaConsumer class in your user jar then it might
>>> affect class loading and thus affect this issue.
>>>
>>> Kevin Lam  于2021年8月25日周三 下午11:18写道:
>>>
>>>> Hi all,
>>>>
>>>> I'm trying to avoid dynamic class loading my user code [0] due to a
>>>> suspected classloading leak, but when I put my application jar into /lib
>>>> instead of /usrlib, I run into the following error:
>>>>
>>>> ```
>>>> The main method caused an error: The implementation of the
>>>> FlinkKafkaConsumer is not serializable. The object probably contains or
>>>> references non serializable fields.
>>>> ```
>>>>
>>>> which specifically seems to be caused by
>>>> ```
>>>> java.io.NotSerializableException: org.apache.avro.Schema$LongSchema
>>>> ```
>>>>
>>>> What's curious to me about this is the error does not occur when we use
>>>> dynamic classloading and put our application jar into /usrlib.
>>>>
>>>> Any ideas what's going on? It would seem to us that the method of
>>>> loading the classes shouldn't impact whether or not something is
>>>> serialized.
>>>>
>>>> Appreciate any help, thanks!
>>>>
>>>> [0]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/debugging_classloading/#avoiding-dynamic-classloading-for-user-code
>>>>
>>>


Re: Not able to avoid Dynamic Class Loading

2021-08-26 Thread Kevin Lam
Hi!

We're using 1.13.1. We have a class in our user code that extends
FlinkKafkaConsumer, that's built for reading avro records from Kafka.
However it doesn't hold any Schema objects as fields so I'm a little
confused.

Something like this:

```
class OurSource[T <: ClassTag: TypeInformation: Decoder](
  val name: String,
  val topic: String,
  val clusterConfig: KafkaClusterConfig,
  sslConfig: KafkaSSLConfig,
  offsetReset: Option[OffsetReset.Value] = None,
  offsetProvider: Option[OffsetProvider] = None
) extends FlinkKafkaConsumer[Option[T]](
topic,
new DebeziumAvroRegistryDeserializationSchema[T](
  clusterConfig,
  KafkaConfiguration.sslProperties(sslConfig)
),
CollectionUtil.mapToProperties(
  KafkaConfiguration.consumerProperties(clusterConfig, name,
isolationLevel = "read_committed") ++
KafkaConfiguration.sslProperties(sslConfig)
)
  )
```

I understand how classloading may change, but why would that change whether
we hit this serialization issue or not?

On Wed, Aug 25, 2021 at 10:17 PM Caizhi Weng  wrote:

> Hi!
>
> What Flink version are you using? In current Flink code base
> FlinkKafkaConsumer does not contain fields related to Avro.
>
> Jars in usrlib has a higher priority to be loaded than jars in lib. So if
> there is another FlinkKafkaConsumer class in your user jar then it might
> affect class loading and thus affect this issue.
>
> Kevin Lam  于2021年8月25日周三 下午11:18写道:
>
>> Hi all,
>>
>> I'm trying to avoid dynamic class loading my user code [0] due to a
>> suspected classloading leak, but when I put my application jar into /lib
>> instead of /usrlib, I run into the following error:
>>
>> ```
>> The main method caused an error: The implementation of the
>> FlinkKafkaConsumer is not serializable. The object probably contains or
>> references non serializable fields.
>> ```
>>
>> which specifically seems to be caused by
>> ```
>> java.io.NotSerializableException: org.apache.avro.Schema$LongSchema
>> ```
>>
>> What's curious to me about this is the error does not occur when we use
>> dynamic classloading and put our application jar into /usrlib.
>>
>> Any ideas what's going on? It would seem to us that the method of loading
>> the classes shouldn't impact whether or not something is serialized.
>>
>> Appreciate any help, thanks!
>>
>> [0]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/debugging_classloading/#avoiding-dynamic-classloading-for-user-code
>>
>


Not able to avoid Dynamic Class Loading

2021-08-25 Thread Kevin Lam
Hi all,

I'm trying to avoid dynamic class loading my user code [0] due to a
suspected classloading leak, but when I put my application jar into /lib
instead of /usrlib, I run into the following error:

```
The main method caused an error: The implementation of the
FlinkKafkaConsumer is not serializable. The object probably contains or
references non serializable fields.
```

which specifically seems to be caused by
```
java.io.NotSerializableException: org.apache.avro.Schema$LongSchema
```

What's curious to me about this is the error does not occur when we use
dynamic classloading and put our application jar into /usrlib.

Any ideas what's going on? It would seem to us that the method of loading
the classes shouldn't impact whether or not something is serialized.

Appreciate any help, thanks!

[0]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/debugging_classloading/#avoiding-dynamic-classloading-for-user-code


Re: Task Managers having trouble registering after restart

2021-08-24 Thread Kevin Lam
Thank you for pulling in Chesnay.

I haven't been able to confirm the issue doesn't happen yet, as I've found
it difficult to reproduce easily. I did have follow-up questions:

1/ If Kafka metrics are indeed the cause of the leak, is there a
workaround? We'd be interested in having these metrics available for
monitoring and alerting purposes.

2/ Do you have any tips on identifying/confirming where the leak is coming
from?



On Tue, Aug 24, 2021 at 3:48 AM Arvid Heise  wrote:

> Hi Kevin,
>
> The metrics are exposed similarly, so I expect the same issues as they
> come from Kafka's Consumer API itself.
>
> I'll pull in @Chesnay Schepler  who afaik debugged
> the leak a while ago.
>
> On Mon, Aug 23, 2021 at 9:24 PM Kevin Lam  wrote:
>
>> Actually, we are using the `FlinkKafkaConsumer` [0] rather than
>> `KafkaSource`. Is there a way to disable the consumer metrics using
>> `FlinkKafkaConsumer`? Do you expect that to have the same Metaspace issue?
>>
>>
>> [0]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.html
>>
>> On Mon, Aug 23, 2021 at 2:55 PM Kevin Lam  wrote:
>>
>>> Thanks Arvid! I will give this a try and report back.
>>>
>>> On Mon, Aug 23, 2021 at 11:07 AM Arvid Heise  wrote:
>>>
>>>> Hi Kevin,
>>>>
>>>> "java.lang.OutOfMemoryError: Metaspace" indicates that too many classes
>>>> have been loaded. [1]
>>>> If you only see that after a while, it's indicating that there is a
>>>> classloader leak. I suspect that this is because of Kafka metrics. There
>>>> have been some reports in the past.
>>>> You can try to see what happens when you disable the forwarding of the
>>>> Kafka metrics with register.consumer.metrics: false [2].
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/debugging_classloading/#unloading-of-dynamically-loaded-classes-in-user-code
>>>> [2]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#additional-properties
>>>>
>>>> On Tue, Aug 17, 2021 at 8:55 PM Kevin Lam 
>>>> wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>> I'm observing an issue sometimes, and it's been hard to reproduce,
>>>>> where task managers are not able to register with the Flink cluster. We
>>>>> provision only the number of task managers required to run a given
>>>>> application, and so the absence of any of the task managers causes the job
>>>>> to enter a crash loop where it fails to get the required task slots.
>>>>>
>>>>> The failure occurs after a job has been running for a while, and when
>>>>> there have been job and task manager restarts. We run in kubernetes so pod
>>>>> disruptions occur from time to time, however we're running using the high
>>>>> availability setup [0]
>>>>>
>>>>> Has anyone encountered this before? Any suggestions?
>>>>>
>>>>> Below are some error messages pulled from the task managers failing to
>>>>> re-register.
>>>>>
>>>>> ```
>>>>> ] - Starting DefaultLeaderRetrievalService with
>>>>> KubernetesLeaderRetrievalDriver{configMapName='streaming-sales-model-staging-resourcemanager-leader'}.
>>>>> 2021-08-16 13:15:10,112 INFO
>>>>>  org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] -
>>>>> Starting DefaultLeaderElectionService with
>>>>> KubernetesLeaderElectionDriver{configMapName='streaming-sales-model-staging-resourcemanager-leader'}.
>>>>> 2021-08-16 13:15:10,205 INFO
>>>>>  org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector
>>>>> [] - New leader elected 7f504af8-5f00-494e-8ecb-c3e67688bae8 for
>>>>> streaming-sales-model-staging-restserver-leader.
>>>>> 2021-08-16 13:15:10,205 INFO
>>>>>  org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector
>>>>> [] - New leader elected 7f504af8-5f00-494e-8ecb-c3e67688bae8 for
>>>>> streaming-sales-model-staging-resourcemanager-leader.
>>>>> 2021-08-16 13:15:10,205 INFO
>>>>>  org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector
>>>>> [] - Ne

Re: Task Managers having trouble registering after restart

2021-08-23 Thread Kevin Lam
Actually, we are using the `FlinkKafkaConsumer` [0] rather than
`KafkaSource`. Is there a way to disable the consumer metrics using
`FlinkKafkaConsumer`? Do you expect that to have the same Metaspace issue?


[0]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.html

On Mon, Aug 23, 2021 at 2:55 PM Kevin Lam  wrote:

> Thanks Arvid! I will give this a try and report back.
>
> On Mon, Aug 23, 2021 at 11:07 AM Arvid Heise  wrote:
>
>> Hi Kevin,
>>
>> "java.lang.OutOfMemoryError: Metaspace" indicates that too many classes
>> have been loaded. [1]
>> If you only see that after a while, it's indicating that there is a
>> classloader leak. I suspect that this is because of Kafka metrics. There
>> have been some reports in the past.
>> You can try to see what happens when you disable the forwarding of the
>> Kafka metrics with register.consumer.metrics: false [2].
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/debugging_classloading/#unloading-of-dynamically-loaded-classes-in-user-code
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#additional-properties
>>
>> On Tue, Aug 17, 2021 at 8:55 PM Kevin Lam  wrote:
>>
>>> Hi all,
>>>
>>> I'm observing an issue sometimes, and it's been hard to reproduce, where
>>> task managers are not able to register with the Flink cluster. We provision
>>> only the number of task managers required to run a given application, and
>>> so the absence of any of the task managers causes the job to enter a crash
>>> loop where it fails to get the required task slots.
>>>
>>> The failure occurs after a job has been running for a while, and when
>>> there have been job and task manager restarts. We run in kubernetes so pod
>>> disruptions occur from time to time, however we're running using the high
>>> availability setup [0]
>>>
>>> Has anyone encountered this before? Any suggestions?
>>>
>>> Below are some error messages pulled from the task managers failing to
>>> re-register.
>>>
>>> ```
>>> ] - Starting DefaultLeaderRetrievalService with
>>> KubernetesLeaderRetrievalDriver{configMapName='streaming-sales-model-staging-resourcemanager-leader'}.
>>> 2021-08-16 13:15:10,112 INFO
>>>  org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] -
>>> Starting DefaultLeaderElectionService with
>>> KubernetesLeaderElectionDriver{configMapName='streaming-sales-model-staging-resourcemanager-leader'}.
>>> 2021-08-16 13:15:10,205 INFO
>>>  org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector
>>> [] - New leader elected 7f504af8-5f00-494e-8ecb-c3e67688bae8 for
>>> streaming-sales-model-staging-restserver-leader.
>>> 2021-08-16 13:15:10,205 INFO
>>>  org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector
>>> [] - New leader elected 7f504af8-5f00-494e-8ecb-c3e67688bae8 for
>>> streaming-sales-model-staging-resourcemanager-leader.
>>> 2021-08-16 13:15:10,205 INFO
>>>  org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector
>>> [] - New leader elected 7f504af8-5f00-494e-8ecb-c3e67688bae8 for
>>> streaming-sales-model-staging-dispatcher-leader.
>>> 2021-08-16 13:15:10,211 INFO
>>>  org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService []
>>> - Starting DefaultLeaderRetrievalService with
>>> KubernetesLeaderRetrievalDriver{configMapName='streaming-sales-model-staging-dispatcher-leader'}.
>>> 2021-08-16 13:16:26,103 WARN
>>>  org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever []
>>> - Error while retrieving the leader gateway. Retrying to connect to
>>> akka.tcp://flink@flink-jobmanager:6123/user/rpc/dispatcher_1.
>>> 2021-08-16 13:16:30,978 WARN
>>>  org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever []
>>> - Error while retrieving the leader gateway. Retrying to connect to
>>> akka.tcp://flink@flink-jobmanager:6123/user/rpc/dispatcher_1.
>>> ```
>>>
>>> ```
>>> 2021-08-15 14:02:21,078 ERROR org.apache.kafka.common.utils.KafkaThread
>>>[] - Uncaught exception in thread
>>> 'kafka-producer-network-thread |
>>> trickle-producer-monorail_sales_facts_non_recent_v0_1-1629035259075':
>>> java.lang.N

Re: Task Managers having trouble registering after restart

2021-08-23 Thread Kevin Lam
Thanks Arvid! I will give this a try and report back.

On Mon, Aug 23, 2021 at 11:07 AM Arvid Heise  wrote:

> Hi Kevin,
>
> "java.lang.OutOfMemoryError: Metaspace" indicates that too many classes
> have been loaded. [1]
> If you only see that after a while, it's indicating that there is a
> classloader leak. I suspect that this is because of Kafka metrics. There
> have been some reports in the past.
> You can try to see what happens when you disable the forwarding of the
> Kafka metrics with register.consumer.metrics: false [2].
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/debugging_classloading/#unloading-of-dynamically-loaded-classes-in-user-code
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#additional-properties
>
> On Tue, Aug 17, 2021 at 8:55 PM Kevin Lam  wrote:
>
>> Hi all,
>>
>> I'm observing an issue sometimes, and it's been hard to reproduce, where
>> task managers are not able to register with the Flink cluster. We provision
>> only the number of task managers required to run a given application, and
>> so the absence of any of the task managers causes the job to enter a crash
>> loop where it fails to get the required task slots.
>>
>> The failure occurs after a job has been running for a while, and when
>> there have been job and task manager restarts. We run in kubernetes so pod
>> disruptions occur from time to time, however we're running using the high
>> availability setup [0]
>>
>> Has anyone encountered this before? Any suggestions?
>>
>> Below are some error messages pulled from the task managers failing to
>> re-register.
>>
>> ```
>> ] - Starting DefaultLeaderRetrievalService with
>> KubernetesLeaderRetrievalDriver{configMapName='streaming-sales-model-staging-resourcemanager-leader'}.
>> 2021-08-16 13:15:10,112 INFO
>>  org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] -
>> Starting DefaultLeaderElectionService with
>> KubernetesLeaderElectionDriver{configMapName='streaming-sales-model-staging-resourcemanager-leader'}.
>> 2021-08-16 13:15:10,205 INFO
>>  org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector
>> [] - New leader elected 7f504af8-5f00-494e-8ecb-c3e67688bae8 for
>> streaming-sales-model-staging-restserver-leader.
>> 2021-08-16 13:15:10,205 INFO
>>  org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector
>> [] - New leader elected 7f504af8-5f00-494e-8ecb-c3e67688bae8 for
>> streaming-sales-model-staging-resourcemanager-leader.
>> 2021-08-16 13:15:10,205 INFO
>>  org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector
>> [] - New leader elected 7f504af8-5f00-494e-8ecb-c3e67688bae8 for
>> streaming-sales-model-staging-dispatcher-leader.
>> 2021-08-16 13:15:10,211 INFO
>>  org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService []
>> - Starting DefaultLeaderRetrievalService with
>> KubernetesLeaderRetrievalDriver{configMapName='streaming-sales-model-staging-dispatcher-leader'}.
>> 2021-08-16 13:16:26,103 WARN
>>  org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever []
>> - Error while retrieving the leader gateway. Retrying to connect to
>> akka.tcp://flink@flink-jobmanager:6123/user/rpc/dispatcher_1.
>> 2021-08-16 13:16:30,978 WARN
>>  org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever []
>> - Error while retrieving the leader gateway. Retrying to connect to
>> akka.tcp://flink@flink-jobmanager:6123/user/rpc/dispatcher_1.
>> ```
>>
>> ```
>> 2021-08-15 14:02:21,078 ERROR org.apache.kafka.common.utils.KafkaThread
>>  [] - Uncaught exception in thread
>> 'kafka-producer-network-thread |
>> trickle-producer-monorail_sales_facts_non_recent_v0_1-1629035259075':
>> java.lang.NoClassDefFoundError: org/apache/kafka/clients/NetworkClient$1
>> at
>> org.apache.kafka.clients.NetworkClient.processDisconnection(NetworkClient.java:748)
>> ~[?:?]
>> at
>> org.apache.kafka.clients.NetworkClient.handleDisconnections(NetworkClient.java:899)
>> ~[?:?]
>> at
>> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:560) ~[?:?]
>> at
>> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:324)
>> ~[?:?]
>> at
>> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
>> ~[?:?]
>> at java.lang.Thread.run

Re: Job manager sometimes doesn't restore job from checkpoint post TaskManager failure

2021-08-23 Thread Kevin Lam
Hi,

I was able to understand what was causing this. We were using the restart
strategy `fixed-delay` with a maximum number of restarts set to 10. Using
exponential-delay resolved the issue of restarting the job from fresh.

On Thu, Aug 19, 2021 at 2:04 PM Chesnay Schepler  wrote:

> How do you deploy Flink on Kubernetes? Do you use the standalone
> <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/standalone/kubernetes/>
> or native
> <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/>
> mode?
>
> Is it really just task managers going down? It seems unlikely that the
> loss of a TM could have such an effect.
>
> Can you provide us with the JobManager logs at the time the TM crash
> occurred? They should contain some hints as to how Flink handled the TM
> failure.
>
> On 19/08/2021 16:06, Kevin Lam wrote:
>
> Hi all,
>
> I've noticed that sometimes when task managers go down--it looks like the
> job is not restored from checkpoint, but instead restarted from a fresh
> state (when I go to the job's checkpoint tab in the UI, I don't see the
> restore, and the number in the job overview all get reset). Under what
> circumstances does this happen?
>
> I've been trying to debug and we really want the job to restore from the
> checkpoint at all times for our use case.
>
> We're running Apache Flink 1.13 on Kubernetes in a high availability
> set-up.
>
> Thanks in advance!
>
>
>


Job manager sometimes doesn't restore job from checkpoint post TaskManager failure

2021-08-19 Thread Kevin Lam
Hi all,

I've noticed that sometimes when task managers go down--it looks like the
job is not restored from checkpoint, but instead restarted from a fresh
state (when I go to the job's checkpoint tab in the UI, I don't see the
restore, and the number in the job overview all get reset). Under what
circumstances does this happen?

I've been trying to debug and we really want the job to restore from the
checkpoint at all times for our use case.

We're running Apache Flink 1.13 on Kubernetes in a high availability
set-up.

Thanks in advance!


Task Managers having trouble registering after restart

2021-08-17 Thread Kevin Lam
Hi all,

I'm observing an issue sometimes, and it's been hard to reproduce, where
task managers are not able to register with the Flink cluster. We provision
only the number of task managers required to run a given application, and
so the absence of any of the task managers causes the job to enter a crash
loop where it fails to get the required task slots.

The failure occurs after a job has been running for a while, and when there
have been job and task manager restarts. We run in kubernetes so pod
disruptions occur from time to time, however we're running using the high
availability setup [0]

Has anyone encountered this before? Any suggestions?

Below are some error messages pulled from the task managers failing to
re-register.

```
] - Starting DefaultLeaderRetrievalService with
KubernetesLeaderRetrievalDriver{configMapName='streaming-sales-model-staging-resourcemanager-leader'}.
2021-08-16 13:15:10,112 INFO
 org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] -
Starting DefaultLeaderElectionService with
KubernetesLeaderElectionDriver{configMapName='streaming-sales-model-staging-resourcemanager-leader'}.
2021-08-16 13:15:10,205 INFO
 org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector
[] - New leader elected 7f504af8-5f00-494e-8ecb-c3e67688bae8 for
streaming-sales-model-staging-restserver-leader.
2021-08-16 13:15:10,205 INFO
 org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector
[] - New leader elected 7f504af8-5f00-494e-8ecb-c3e67688bae8 for
streaming-sales-model-staging-resourcemanager-leader.
2021-08-16 13:15:10,205 INFO
 org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector
[] - New leader elected 7f504af8-5f00-494e-8ecb-c3e67688bae8 for
streaming-sales-model-staging-dispatcher-leader.
2021-08-16 13:15:10,211 INFO
 org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService []
- Starting DefaultLeaderRetrievalService with
KubernetesLeaderRetrievalDriver{configMapName='streaming-sales-model-staging-dispatcher-leader'}.
2021-08-16 13:16:26,103 WARN
 org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever []
- Error while retrieving the leader gateway. Retrying to connect to
akka.tcp://flink@flink-jobmanager:6123/user/rpc/dispatcher_1.
2021-08-16 13:16:30,978 WARN
 org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever []
- Error while retrieving the leader gateway. Retrying to connect to
akka.tcp://flink@flink-jobmanager:6123/user/rpc/dispatcher_1.
```

```
2021-08-15 14:02:21,078 ERROR org.apache.kafka.common.utils.KafkaThread
   [] - Uncaught exception in thread
'kafka-producer-network-thread |
trickle-producer-monorail_sales_facts_non_recent_v0_1-1629035259075':
java.lang.NoClassDefFoundError: org/apache/kafka/clients/NetworkClient$1
at
org.apache.kafka.clients.NetworkClient.processDisconnection(NetworkClient.java:748)
~[?:?]
at
org.apache.kafka.clients.NetworkClient.handleDisconnections(NetworkClient.java:899)
~[?:?]
at
org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:560) ~[?:?]
at
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:324)
~[?:?]
at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
~[?:?]
at java.lang.Thread.run(Unknown Source) [?:?]
Caused by: java.lang.ClassNotFoundException:
org.apache.kafka.clients.NetworkClient$1
at java.net.URLClassLoader.findClass(Unknown Source) ~[?:?]
at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
~[flink-dist_2.12-1.13.1-shopify-3491d59.jar:1.13.1-shopify-3491d59]
at
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
~[flink-dist_2.12-1.13.1-shopify-3491d59.jar:1.13.1-shopify-3491d59]
at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
~[flink-dist_2.12-1.13.1-shopify-3491d59.jar:1.13.1-shopify-3491d59]
at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
... 6 more
```

```
connection to [null] failed with java.net.ConnectException: Connection
refused: flink-jobmanager/10.28.65.100:6123
2021-08-16 13:14:59,668 WARN  akka.remote.ReliableDeliverySupervisor
[] - Association with remote system
[akka.tcp://flink@flink-jobmanager:6123] has failed, address is now gated
for [50] ms. Reason: [Association failed with
[akka.tcp://flink@flink-jobmanager:6123]] Caused by:
[java.net.ConnectException: Connection refused: flink-jobmanager/
10.28.65.100:6123]
2021-08-16 13:14:59,669 INFO
 org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - Could
not resolve ResourceManager address
akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_0,
retrying in 1 ms: Could not connect to rpc endpoint under address
akka.tcp:

Re: FW: Error using KafkaProducer EXACTLY_ONCE semantic + TaskManager Failure

2021-08-04 Thread Kevin Lam
Hi Arvid,

I had 5.5.2 bundled into my application jar. I was able to get the
https://github.com/apache/flink/pull/16693 working by ensuring that
kafka-clients==2.4.1 was used just now. Thanks!!

On Wed, Aug 4, 2021 at 1:04 PM Arvid Heise  wrote:

> Hi Kevin,
>
> Which Kafka client version are you using? (=What is effectively bundled
> into your application jar?)
>
> On Wed, Aug 4, 2021 at 5:56 PM Kevin Lam  wrote:
>
>> Thanks Matthias.
>>
>> I just tried this backport (https://github.com/apache/flink/pull/16693)
>> and got the following error, with the reproduction I described in
>> https://lists.apache.org/thread.html/r528102e08d19d3ae446e5df75710009128c736735c0aaf310f95abeb%40%3Cuser.flink.apache.org%3E
>> (ie. starting job with exactly_once, waited for some checkpoints to
>> complete, and killed a task manager, job fails to recover from checkpoint)
>>
>> 2021-08-04 11:46:38
>> java.lang.RuntimeException: Incompatible KafkaProducer version
>> at
>> org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.createProducerIdAndEpoch(FlinkKafkaInternalProducer.java:299)
>> at
>> org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.resumeTransaction(FlinkKafkaInternalProducer.java:233)
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.recoverAndCommit(FlinkKafkaProducer.java:1029)
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.recoverAndCommit(FlinkKafkaProducer.java:99)
>> at
>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.recoverAndCommitInternal(TwoPhaseCommitSinkFunction.java:414)
>> at
>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:364)
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1195)
>> at
>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189)
>> at
>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171)
>> at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>> at
>> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:118)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:436)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:582)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:562)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537)
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
>> at java.base/java.lang.Thread.run(Unknown Source)
>> Caused by: java.lang.ClassNotFoundException:
>> org.apache.kafka.clients.producer.internals.ProducerIdAndEpoch
>> at java.base/java.net.URLClassLoader.findClass(Unknown Source)
>> at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
>> at
>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
>> at
>> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
>> at
>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
>> at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
>> at java.base/java.lang.Class.forName0(Native Method)
>> at java.base/java.lang.Class.forName(Unknown Source)
>> at
>> org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.createProducerIdAndEpoch(FlinkKafkaInternalProducer.java:291)
>> ... 20 more
>>
>> Any ideas?
>>
>> On Tue, Aug 3, 2021 at 10:34 AM Schwalbe Matthias <
>> matthias.schwa...@viseca.ch> wrote:
>>
>>> I don’t think that this risk is there, I guess Fabian a quick response …
>>> see also the later comments in the ticket.
>>>
&g

Re: FW: Error using KafkaProducer EXACTLY_ONCE semantic + TaskManager Failure

2021-08-04 Thread Kevin Lam
Thanks Matthias.

I just tried this backport (https://github.com/apache/flink/pull/16693) and
got the following error, with the reproduction I described in
https://lists.apache.org/thread.html/r528102e08d19d3ae446e5df75710009128c736735c0aaf310f95abeb%40%3Cuser.flink.apache.org%3E
(ie. starting job with exactly_once, waited for some checkpoints to
complete, and killed a task manager, job fails to recover from checkpoint)

2021-08-04 11:46:38
java.lang.RuntimeException: Incompatible KafkaProducer version
at
org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.createProducerIdAndEpoch(FlinkKafkaInternalProducer.java:299)
at
org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.resumeTransaction(FlinkKafkaInternalProducer.java:233)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.recoverAndCommit(FlinkKafkaProducer.java:1029)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.recoverAndCommit(FlinkKafkaProducer.java:99)
at
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.recoverAndCommitInternal(TwoPhaseCommitSinkFunction.java:414)
at
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:364)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1195)
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189)
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:118)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:436)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:582)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:562)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.lang.ClassNotFoundException:
org.apache.kafka.clients.producer.internals.ProducerIdAndEpoch
at java.base/java.net.URLClassLoader.findClass(Unknown Source)
at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
at
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
at java.base/java.lang.Class.forName0(Native Method)
at java.base/java.lang.Class.forName(Unknown Source)
at
org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.createProducerIdAndEpoch(FlinkKafkaInternalProducer.java:291)
... 20 more

Any ideas?

On Tue, Aug 3, 2021 at 10:34 AM Schwalbe Matthias <
matthias.schwa...@viseca.ch> wrote:

> I don’t think that this risk is there, I guess Fabian a quick response …
> see also the later comments in the ticket.
>
>
>
> Thias
>
>
>
> *From:* Kevin Lam 
> *Sent:* Dienstag, 3. August 2021 15:56
> *To:* Schwalbe Matthias 
> *Cc:* user ; fabianp...@ververica.com
> *Subject:* Re: FW: Error using KafkaProducer EXACTLY_ONCE semantic +
> TaskManager Failure
>
>
>
> Thank you for the replies!
>
>
>
> Thias, I will look into that issue and the workaround. Fabian Paul had
> replied on https://issues.apache.org/jira/browse/FLINK-23509 mentioning
> that our workaround could result in data loss--is that a risk with the
> workaround?
>
>
>
> Fabian Paul, yes here is some more information:
>
>
>
> Checkpoint config (see [0] for more):
>
> Frequency: 2 minutes
>
> No concurrent checkpoints
>
> Checkpoint duration roughly
>
>
>
> Overall parallelism 13 - 70 depending on the operator
>
>
>
> The full exception available in the job manager logs and exception tab in
> the UI:
>
>
>
> ```
>
> 2021-08-03 09:53:42
> org.apache.kafka.common.KafkaException: Unexpect

Error using KafkaProducer EXACTLY_ONCE semantic + TaskManager Failure

2021-07-29 Thread Kevin Lam
Hi user@,

We're developing a Flink application, and using the FlinkKafkaProducer.
Semantic.EXACTLY_ONCE producer semantic to output records to a Kafka topic
in an exactly-once way. We run our flink application on kubernetes.

I've observed that if a task manager fails (I've simulated this by killing
a task-manager pod), the job will not recover or restore from the most
recent checkpoint, and will instead enter a crash loop with the following
types of errors [0]. If I try the same experiment (run the job, kill a task
manager pod) with the AT_LEAST_ONCE semantic, the job recovers using the
most recent checkpoint as expected.

We've set the transaction.timeout.ms to be 1 hour on both the broker and
producer side.

Any insights into what we could be doing wrong or what's going on are
appreciated.

Thanks in advance!

[0]: ```
2021-07-28 16:55:32
org.apache.kafka.common.KafkaException: Unexpected error in
InitProducerIdResponse; Producer attempted an operation with an old epoch.
Either there is a newer producer with the same transactionalId, or the
producer's transaction has been expired by the broker.
at
org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:1352)
at
org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1260)
at
org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
at
org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:572)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:564)
at
org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:414)
at
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:312)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
at java.base/java.lang.Thread.run(Unknown Source)
```


Monitoring Exceptions using Bugsnag

2021-06-21 Thread Kevin Lam
Hi all,

I'm interested in instrumenting an Apache Flink application so that we can
monitor exceptions. I was wondering what the best practices are here? Is
there a good way to observe all the exceptions inside of a Flink
application, including Flink internals?

We are currently thinking of using Bugsnag, which has some steps to
integrate with java applications:
https://docs.bugsnag.com/platforms/java/other/, which works fine for
uncaught exceptions in the job manager / pipeline driver context, but
doesn't catch anything outside of that.

We're also interested in reporting on exceptions that occur in the job
execution context, eg. in task managers.

Any tips/suggestions? I'd love to learn more about exception tracking and
handling in Flink :)

(reposting because it looks like my other thread got deleted?)


Monitoring Exceptions using Bugsnag

2021-06-18 Thread Kevin Lam
Hi all,

I'm interested in instrumenting an Apache Flink application so that we can
monitor exceptions. I was wondering what the best practices are here? Is
there a good way to observe all the exceptions inside of a Flink
application, including Flink internals?

We are currently thinking of using Bugsnag, which has some steps to
integrate with java applications:
https://docs.bugsnag.com/platforms/java/other/, which works fine for
uncaught exceptions in the job manager / pipeline driver context, but
doesn't catch anything outside of that.

We're also interested in reporting on exceptions that occur in the job
execution context, eg. in task managers.

Any tips/suggestions? I'd love to learn more about exception tracking and
handling in Flink :)


Re: Reducing Task Manager Count Greatly Increases Savepoint Restore

2021-04-12 Thread Kevin Lam
That's really helpful, thanks Till!

On Thu, Apr 8, 2021 at 6:32 AM Till Rohrmann  wrote:

> Hi Kevin,
>
> when decreasing the TaskManager count I assume that you also decrease the
> parallelism of the Flink job. There are three aspects which can then cause
> a slower recovery.
>
> 1) Each Task gets a larger key range assigned. Therefore, each TaskManager
> has to download more data in order to restart the Task. Moreover, there are
> fewer nodes downloading larger portions of the data (less parallelization).
> 2) If you rescaled the parallelism, then it can happen that a Task gets a
> key range assigned which requires downloading of multiple key range parts
> from the previous run/savepoint. The new key range might not need all the
> data from the savepoint parts and hence you download some data which is not
> really used in the end.
> 3) When rescaling the job, then Flink has to rebuild the RocksDB instance
> which is an expensive and slow operation. What happens is that Flink
> creates for every savepoint part which it needs for its key range a RocksDB
> instance and then extracts the part which is only relevant for its key
> range into a new RocksDB instance. This causes a lot of read and write
> amplification.
>
> Cheers,
> Till
>
> On Wed, Apr 7, 2021 at 4:07 PM Kevin Lam  wrote:
>
>> Hi all,
>>
>> We are trying to benchmark savepoint size vs. restore time.
>>
>> One thing we've observed is that when we reduce the number of task
>> managers, the time to restore from a savepoint increases drastically:
>>
>> 1/ Restoring from 9.7tb savepoint onto 156 task managers takes 28 minutes
>> 2/ Restoring from the save savepoint onto 30 task managers takes over 3
>> hours
>>
>> *Is this expected? How does the restore process work? Is this just a
>> matter of having lower restore parallelism for 30 task managers vs 156 task
>> managers? *
>>
>> Some details
>>
>> - Running on kubernetes
>> - Used Rocksdb with a local ssd for state backend
>> - Savepoint is hosted on GCS
>> - The smaller task manager case is important to us because we expect to
>> deploy our application with a high number of task managers, and downscale
>> once a backfill is completed
>>
>> Differences between 1/ and 2/:
>>
>> 2/ has decreased task manager count 156 -> 30
>> 2/ has decreased operator parallelism by a factor of ~10
>> 2/ uses a striped SSD (3 ssds mounted as a single logical volume) to hold
>> rocksdb files
>>
>> Thanks in advance for your help!
>>
>


Reducing Task Manager Count Greatly Increases Savepoint Restore

2021-04-07 Thread Kevin Lam
Hi all,

We are trying to benchmark savepoint size vs. restore time.

One thing we've observed is that when we reduce the number of task
managers, the time to restore from a savepoint increases drastically:

1/ Restoring from 9.7tb savepoint onto 156 task managers takes 28 minutes
2/ Restoring from the save savepoint onto 30 task managers takes over 3
hours

*Is this expected? How does the restore process work? Is this just a matter
of having lower restore parallelism for 30 task managers vs 156 task
managers? *

Some details

- Running on kubernetes
- Used Rocksdb with a local ssd for state backend
- Savepoint is hosted on GCS
- The smaller task manager case is important to us because we expect to
deploy our application with a high number of task managers, and downscale
once a backfill is completed

Differences between 1/ and 2/:

2/ has decreased task manager count 156 -> 30
2/ has decreased operator parallelism by a factor of ~10
2/ uses a striped SSD (3 ssds mounted as a single logical volume) to hold
rocksdb files

Thanks in advance for your help!


Measuring the Size of State, Savepoint Size vs. Restore time

2021-03-31 Thread Kevin Lam
Hi all,

We're interested in doing some analysis on how the size of our savepoints
and state affects the time it takes to restore from a savepoint. We're
running Flink 1.12 and using RocksDB as a state backend, on Kubernetes.

What is the best way to measure the size of a Flink Application's state? Is
state.backend.rocksdb.metrics.total-sst-files-size

the right thing to look at?

We tried looking at state.backend.rocksdb.metrics.total-sst-files-size for
all our operators, after restoring from a savepoint, and we noticed that
the sum of all the sst files sizes is much much smaller than the total size
of our savepoint (7GB vs 10TB).  Where does that discrepancy come from?

Do you have any general advice on correlating savepoint size with restore
times?

Thanks in advance!


Re: OOM issues with Python Objects

2021-03-24 Thread Kevin Lam
Hi Dian,

I have unit tests for which both sets of code (Row subclass vs. custom
Python class) passes. The OOM occurs when reading a large amount of data
from a kafka topic.

At the moment I don't have a simple example to reproduce the issue, I'll
let you know.

On Tue, Mar 23, 2021 at 2:17 AM Dian Fu  wrote:

> Hi Kevin,
>
> Is it possible to provide a simple example to reproduce this issue?
>
> PS: It will use pickle to perform the serialization/deserialization if you
> don't specify the type info.
>
> Regards,
> Dian
>
>
> On Mon, Mar 22, 2021 at 10:55 PM Arvid Heise  wrote:
>
>> Hi Kevin,
>>
>> yes I understood that, but then your Python class contains a Row field,
>> where no mapping exists. I'm assuming PyFlink tries to do a deep conversion
>> and fails to do so by ending in some infinite loop.
>>
>> On Mon, Mar 22, 2021 at 3:48 PM Kevin Lam  wrote:
>>
>>> Thanks for the response Arvid! Point of clarification, *things do NOT
>>> OOM when I use the Row subclass*. Instead, the code that doesn't use
>>> the Row subclass is the code that OOMs (ie. the simple python class).
>>>
>>>
>>>
>>> On Mon, Mar 22, 2021 at 10:24 AM Arvid Heise  wrote:
>>>
>>>> Hi Kevin,
>>>>
>>>> I suspect that this is because Row is not supported as a Python field
>>>> [1]; it's supposed to be a dict that is mapped to a Row by Flink.
>>>> Maybe it runs in some infinite loop while trying serialize and hence
>>>> the OOM.
>>>>
>>>> Subclassing Row might be an undocumented feature.
>>>>
>>>> I'm also pulling in Dian who knows more about PyFlink.
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/datastream-api-users-guide/data_types.html
>>>>
>>>> On Fri, Mar 19, 2021 at 3:55 PM Kevin Lam 
>>>> wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>> I've encountered an interesting issue where I observe an OOM issue in
>>>>> my Flink Application when I use a DataStream of Python Objects, but when I
>>>>> make that Python Object a Subclass of pyflink.common.types.Row and provide
>>>>> TypeInformation, there is no issue.
>>>>>
>>>>> For the OOM scenario, no elements get processed, the application runs
>>>>> without printing output and then eventually crashes with 
>>>>> java.lang.OutOfMemoryError:
>>>>> Java heap space
>>>>>
>>>>> Any insights into why this might be happening? Appreciate any
>>>>> help/suggestions.
>>>>> I've included some code that illustrates the two situations below [0].
>>>>>
>>>>> Thanks in advance!
>>>>>
>>>>> [0]:
>>>>>
>>>>> Code Snippet A: OOM scenario
>>>>>
>>>>> class InputWrapper:
>>>>> """Helper class, used to make streams of the same type"""
>>>>>
>>>>> def __init__(self, key: str, contents: Row = None):
>>>>> self.key = key
>>>>> self.contents = contents
>>>>>
>>>>> x_ds = x_ds.map(
>>>>> lambda d: InputWrapper(key=d['key'], contents=d))
>>>>> y_ds = y_ds.map(
>>>>> lambda o: InputWrapper(key=o['key'], contents=o))
>>>>> union = x_ds.union(y_ds)
>>>>> union.print()
>>>>>
>>>>> Code Snippet B: Working scenario:
>>>>>
>>>>> class InputWrapper(Row):
>>>>> """Helper class, used to make streams of the same type"""
>>>>>
>>>>> def __init__(self, key: str, contents: Row = None):
>>>>> super().__init__(key, contents)
>>>>>
>>>>> x_ds = x_ds.map(
>>>>> lambda d: InputWrapper(key=d['key'], contents=d),
>>>>> output_type=InputWrapperTypeInfo())
>>>>> y_ds = y_ds.map(
>>>>> lambda o: InputWrapper(key=o['key'], contents=o),
>>>>> output_type=InputWrapperTypeInfo())
>>>>> union = x_ds.union(y_ds)
>>>>> union.print()
>>>>>
>>>>>
>>>>>


Re: OOM issues with Python Objects

2021-03-22 Thread Kevin Lam
Thanks for the response Arvid! Point of clarification, *things do NOT OOM
when I use the Row subclass*. Instead, the code that doesn't use the Row
subclass is the code that OOMs (ie. the simple python class).



On Mon, Mar 22, 2021 at 10:24 AM Arvid Heise  wrote:

> Hi Kevin,
>
> I suspect that this is because Row is not supported as a Python field [1];
> it's supposed to be a dict that is mapped to a Row by Flink.
> Maybe it runs in some infinite loop while trying serialize and hence the
> OOM.
>
> Subclassing Row might be an undocumented feature.
>
> I'm also pulling in Dian who knows more about PyFlink.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/datastream-api-users-guide/data_types.html
>
> On Fri, Mar 19, 2021 at 3:55 PM Kevin Lam  wrote:
>
>> Hi all,
>>
>> I've encountered an interesting issue where I observe an OOM issue in my
>> Flink Application when I use a DataStream of Python Objects, but when I
>> make that Python Object a Subclass of pyflink.common.types.Row and provide
>> TypeInformation, there is no issue.
>>
>> For the OOM scenario, no elements get processed, the application runs
>> without printing output and then eventually crashes with 
>> java.lang.OutOfMemoryError:
>> Java heap space
>>
>> Any insights into why this might be happening? Appreciate any
>> help/suggestions.
>> I've included some code that illustrates the two situations below [0].
>>
>> Thanks in advance!
>>
>> [0]:
>>
>> Code Snippet A: OOM scenario
>>
>> class InputWrapper:
>> """Helper class, used to make streams of the same type"""
>>
>> def __init__(self, key: str, contents: Row = None):
>> self.key = key
>> self.contents = contents
>>
>> x_ds = x_ds.map(
>> lambda d: InputWrapper(key=d['key'], contents=d))
>> y_ds = y_ds.map(
>> lambda o: InputWrapper(key=o['key'], contents=o))
>> union = x_ds.union(y_ds)
>> union.print()
>>
>> Code Snippet B: Working scenario:
>>
>> class InputWrapper(Row):
>> """Helper class, used to make streams of the same type"""
>>
>> def __init__(self, key: str, contents: Row = None):
>> super().__init__(key, contents)
>>
>> x_ds = x_ds.map(
>> lambda d: InputWrapper(key=d['key'], contents=d),
>> output_type=InputWrapperTypeInfo())
>> y_ds = y_ds.map(
>> lambda o: InputWrapper(key=o['key'], contents=o),
>> output_type=InputWrapperTypeInfo())
>> union = x_ds.union(y_ds)
>> union.print()
>>
>>
>>


OOM issues with Python Objects

2021-03-19 Thread Kevin Lam
Hi all,

I've encountered an interesting issue where I observe an OOM issue in my
Flink Application when I use a DataStream of Python Objects, but when I
make that Python Object a Subclass of pyflink.common.types.Row and provide
TypeInformation, there is no issue.

For the OOM scenario, no elements get processed, the application runs
without printing output and then eventually crashes with
java.lang.OutOfMemoryError:
Java heap space

Any insights into why this might be happening? Appreciate any
help/suggestions.
I've included some code that illustrates the two situations below [0].

Thanks in advance!

[0]:

Code Snippet A: OOM scenario

class InputWrapper:
"""Helper class, used to make streams of the same type"""

def __init__(self, key: str, contents: Row = None):
self.key = key
self.contents = contents

x_ds = x_ds.map(
lambda d: InputWrapper(key=d['key'], contents=d))
y_ds = y_ds.map(
lambda o: InputWrapper(key=o['key'], contents=o))
union = x_ds.union(y_ds)
union.print()

Code Snippet B: Working scenario:

class InputWrapper(Row):
"""Helper class, used to make streams of the same type"""

def __init__(self, key: str, contents: Row = None):
super().__init__(key, contents)

x_ds = x_ds.map(
lambda d: InputWrapper(key=d['key'], contents=d),
output_type=InputWrapperTypeInfo())
y_ds = y_ds.map(
lambda o: InputWrapper(key=o['key'], contents=o),
output_type=InputWrapperTypeInfo())
union = x_ds.union(y_ds)
union.print()


Re: Python API + Unit Testing

2021-03-19 Thread Kevin Lam
Hi Dian Fu,

I meant testing in application development. When I'm developing a Pyflink
Pipeline, are there any recommended approaches to testing the Flink
application? For instance, how should we test applications end-to-end?
Individual operators?
I'm interested in the Datastream API.

One approach I could see is using StreamingFileSinks, and validating the
output files for a bounded stream.

On Thu, Mar 18, 2021 at 10:04 PM Dian Fu  wrote:

> Hi,
>
> Do you mean how to run Python unit test? If so, you could refer to [1] for
> more details.
>
> Regards,
> Dian
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/Setting+up+a+Flink+development+environment
>
> 2021年3月18日 下午10:46,Kevin Lam  写道:
>
> Hi all,
>
> I noticed there isn't much in the way of testing discussed in the Python
> API docs
> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/>
> for Flink.
>
> Does the community have any best-practices or recommendations on how
> testing should be done with PyFlink?
>
> Thanks!
>
>
>


Python API + Unit Testing

2021-03-18 Thread Kevin Lam
Hi all,

I noticed there isn't much in the way of testing discussed in the Python
API docs

for Flink.

Does the community have any best-practices or recommendations on how
testing should be done with PyFlink?

Thanks!


Re: Working with DataStreams of Java objects in Pyflink?

2021-03-15 Thread Kevin Lam
Hi Shuiqiang Chen,

Thanks for the quick response. Oh I see, that's too bad POJO is not
currently supported.

I'd like to check if I understand your suggestion about RowType. You're
suggesting something like:

1/ Define subclasses of RowType in Java/Scala to hold our java objects we
want to manipulate in Python.
2/ When datastreams/sources emit objects of this type in pyflink, we can
mutate and read from these java defined RowTypes as needed, because Python
doesn't know how to handle arbitrary POJOs, but knows how to handle RowType
objects.

Is that correct? A simple example of extending/using RowType would be
helpful if you have a chance.

Thanks again for all your help, here and in the other threads on this
mailing list, really appreciate it!!

On Mon, Mar 15, 2021 at 11:59 AM Shuiqiang Chen  wrote:

> Hi Kevin,
>
> Currently, POJO type is not supported in Python DataStream API because it
> is hard to deal with the conversion between Python Objects and Java
> Objects. Maybe you can use a RowType to represent the POJO class such as
> Types.ROW_NAME([id, created_at, updated_at], [Types.LONG(), Types.LONG(),
> Types.LONG()]). We will try to support the POJO type in the future.
>
> Best,
> Shuiqiang
>
> Kevin Lam  于2021年3月15日周一 下午10:46写道:
>
>> Hi all,
>>
>> Looking to use Pyflink to work with some scala-defined objects being
>> emitted from a custom source. When trying to manipulate the objects in a
>> pyflink defined MapFunction
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/python/pyflink.datastream.html#pyflink.datastream.MapFunction>,
>> I'm hitting an error like:
>>
>> Caused by: java.lang.UnsupportedOperationException: The type information:
>> Option[<...>$Record(id: Long, created_at: Option[Long], updated_at:
>> Option[Long])] is not supported in PyFlink currently.
>>
>> The scala object is defined something like:
>>
>> ```
>> object <...> {
>>   case class Record(
>> id: Long,
>> created_at: Option[Long],
>> updated_at: Option[Long],
>> ...
>>   )
>> }
>> ```
>>
>> The pyflink code is something like:
>>
>> ```
>> class Mutate(MapFunction):
>>   def map(self,value):
>> print(value.id)
>> value.id = 123
>>
>> ...
>>
>> records = env.add_source(..)
>> records = records.map(Mutate()
>> ```
>>
>> Can you provide any advice on how to work with these kinds of objects in
>> Pyflink?
>>
>> Thanks in advance!
>>
>


Working with DataStreams of Java objects in Pyflink?

2021-03-15 Thread Kevin Lam
Hi all,

Looking to use Pyflink to work with some scala-defined objects being
emitted from a custom source. When trying to manipulate the objects in a
pyflink defined MapFunction
,
I'm hitting an error like:

Caused by: java.lang.UnsupportedOperationException: The type information:
Option[<...>$Record(id: Long, created_at: Option[Long], updated_at:
Option[Long])] is not supported in PyFlink currently.

The scala object is defined something like:

```
object <...> {
  case class Record(
id: Long,
created_at: Option[Long],
updated_at: Option[Long],
...
  )
}
```

The pyflink code is something like:

```
class Mutate(MapFunction):
  def map(self,value):
print(value.id)
value.id = 123

...

records = env.add_source(..)
records = records.map(Mutate()
```

Can you provide any advice on how to work with these kinds of objects in
Pyflink?

Thanks in advance!


Re: Python DataStream API Questions -- Java/Scala Interoperability?

2021-03-10 Thread Kevin Lam
A follow-up question--In the example you provided Shuiqiang, there were no
arguments passed to the constructor of the custom sink/source.

What's the best way to pass arguments to the constructor?

On Fri, Mar 5, 2021 at 4:29 PM Kevin Lam  wrote:

> Thanks Shuiqiang! That's really helpful, we'll give the connectors a try.
>
> On Wed, Mar 3, 2021 at 4:02 AM Shuiqiang Chen  wrote:
>
>> Hi Kevin,
>>
>> Thank you for your questions. Currently, users are not able to defined
>> custom source/sinks in Python. This is a greate feature that can unify the
>> end to end PyFlink application development in Python and is a large topic
>> that we have no plan to support at present.
>>
>> As you have noticed that `the Python DataStream API has several
>> connectors [2] that use Py4J+Java gateways to interoperate with Java
>> source/sinks`. These connectors are the extensions of the Python abstract
>> class named `SourceFunction` and `SinkFunction`. Thess two classes can
>> accept a Java source/sink instance and maintain it to enable the
>> interoperation between Python and Java.  They can also accept a string of
>> the full name of a Java/Scala defined Source/SinkFunction class and create
>> the corresponding java instance. Bellow are the definition of these classes:
>>
>> class JavaFunctionWrapper(object):
>> """
>> A wrapper class that maintains a Function implemented in Java.
>> """
>>
>> def __init__(self, j_function: Union[str, JavaObject]):
>> # TODO we should move this part to the get_java_function() to 
>> perform a lazy load.
>> if isinstance(j_function, str):
>> j_func_class = get_gateway().jvm.__getattr__(j_function)
>> j_function = j_func_class()
>> self._j_function = j_function
>>
>> def get_java_function(self):
>> return self._j_function
>>
>>
>>
>> class SourceFunction(JavaFunctionWrapper):
>> """
>> Base class for all stream data source in Flink.
>> """
>>
>> def __init__(self, source_func: Union[str, JavaObject]):
>> """
>> Constructor of SinkFunction.
>>
>> :param source_func: The java SourceFunction object.
>> """
>> super(SourceFunction, self).__init__(source_func)
>>
>>
>> class SinkFunction(JavaFunctionWrapper):
>> """
>> The base class for SinkFunctions.
>> """
>>
>> def __init__(self, sink_func: Union[str, JavaObject]):
>> """
>> Constructor of SinkFunction.
>>
>> :param sink_func: The java SinkFunction object or the full name of the
>> SinkFunction class.
>> """
>> super(SinkFunction, self).__init__(sink_func)
>>
>> Therefore, you are able to defined custom sources/sinks in Scala and
>> apply them in Python. Here is the recommended approach for implementation:
>>
>> class MyBigTableSink(SinkFunction):
>> def __init__(self, class_name: str):
>> super(MyBigTableSink, self).__init__(class_name)
>>
>>
>> def example():
>> env = StreamExecutionEnvironment.get_execution_environment()
>> env.add_jars('/the/path/of/your/MyBigTableSink.jar')
>> # ...
>> ds.add_sink(MyBigTableSink("com.mycompany.MyBigTableSink"))
>> env.execute("Application with Custom Sink")
>>
>>
>> if __name__ == '__main__':
>> example()
>>
>> Remember that you must add the jar of the Scala defined SinkFunction by
>> calling `env.add_jars()` before adding the SinkFunction. And your custom
>> sources/sinks function must be the extension of `SourceFunction` and
>> `SinkFunction`.
>>
>> Any further questions are welcomed!
>>
>> Best,
>> Shuiqiang
>>
>>
>> Kevin Lam  于2021年3月3日周三 上午2:50写道:
>>
>>> Hello everyone,
>>>
>>> I have some questions about the Python API that hopefully folks in the
>>> Apache Flink community can help with.
>>>
>>> A little background, I’m interested in using the Python Datastream API
>>> because of stakeholders who don’t have a background in Scala/Java, and
>>> would prefer Python if possible. Our team is open to maintaining Scala
>>> constructs on our end, however we are looking to expose Flink for stateful
>>> streaming via a Python API to end-users.
>>>
>>> Questions:
>>>
>>> 1/ The docs mention that custom 

Re: Running Pyflink job on K8s Flink Cluster Deployment?

2021-03-08 Thread Kevin Lam
rityContext:
> runAsUser:   # refers to user _flink_ from official flink
> image, change if necessary
>   volumes:
> - name: flink-config-volume
>   configMap:
> name: flink-config
> items:
>   - key: flink-conf.yaml
> path: flink-conf.yaml
>   - key: log4j-console.properties
> path: log4j-console.properties
>
> Task-manager.yaml
>
> apiVersion: apps/v1
> kind: Deployment
> metadata:
>   name: flink-taskmanager
> spec:
>   replicas: 2
>   selector:
> matchLabels:
>   app: flink
>   component: taskmanager
>   template:
> metadata:
>   labels:
> app: flink
> component: taskmanager
> spec:
>   containers:
>   - name: taskmanager
> image: pyflink:v1
> env:
> args: ["taskmanager"]
> ports:
> - containerPort: 6122
>   name: rpc
> - containerPort: 6125
>   name: query-state
> livenessProbe:
>   tcpSocket:
> port: 6122
>   initialDelaySeconds: 30
>   periodSeconds: 60
> volumeMounts:
> - name: flink-config-volume
>   mountPath: /opt/flink/conf/
> securityContext:
>   runAsUser:   # refers to user _flink_ from official flink
> image, change if necessary
>   volumes:
>   - name: flink-config-volume
> configMap:
>   name: flink-config
>   items:
>   - key: flink-conf.yaml
> path: flink-conf.yaml
>   - key: log4j-console.properties
> path: log4j-console.properties
>
> 3. Creating resources:
>
> $ kubectl create -f flink-configuration-configmap.yaml$ kubectl create -f 
> jobmanager-service.yaml# Create the deployments for the cluster$ kubectl 
> create -f job-manager.yaml$ kubectl create -f task-manager.yaml
>
> Best,
> Shuiqiang
>
> Shuiqiang Chen  于2021年3月6日周六 下午5:10写道:
>
>> Hi Kevin,
>>
>> You are able to run PyFlink applications on kuberetes cluster, both
>> native k8s mode and resource definition mode are supported since
>> release-1.12.0. Currently, Python and PyFlink are not enabled in official
>> flink docker image, that you might need to build a custom image with Python
>> and PyFlink install, please refer to Enbale Python in docker
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/docker.html#enabling-python>
>> .
>>
>> Generally, by setting the value of args field in
>> `jobmanager-application.yaml` to be args: ["standalone-job", "--python",
>> "my_python_app.py", , ] the job
>> manager will try to submit a PyFlink job with the specified python file
>> once it is started. You can check the pod status for jobmanger and
>> taskmanger via `kubectl get pods [-n namespace]`. The job manger pod will
>> turn to the completed state once the job is finished or error state if
>> there is something wrong, while the task manger pod will always be in the
>> running state.
>>
>> Finally, it requires you to tear down the cluster by deleting all created
>> resources (jobmanger/taskmanger jobs, flink-conf configmap,
>> jobmanger-service, etc).
>>
>> Best,
>> Shuiqiang
>>
>>
>>
>> Kevin Lam  于2021年3月6日周六 上午5:29写道:
>>
>>> Hello everyone,
>>>
>>> I'm looking to run a Pyflink application run in a distributed fashion,
>>> using kubernetes, and am currently facing issues. I've successfully gotten
>>> a Scala Flink Application to run using the manifests provided at [0]
>>>
>>> I attempted to run the application by updating the jobmanager command
>>> args from
>>>
>>>  args: ["standalone-job", "--job-classname", "com.job.ClassName", >> arguments>, ]
>>>
>>> to
>>>
>>> args: ["standalone-job", "--python", "my_python_app.py", >> arguments>, ]
>>>
>>> But this didn't work. It resulted in the following error:
>>>
>>> Caused by: java.lang.LinkageError: loader constraint violation: loader
>>> org.apache.flink.util.ChildFirstClassLoader @2d8f2f3a wants to load class
>>> org.apache.commons.cli.Options. A different class with the same name was
>>> previously loaded by 'app'. (org.apache.commons.cli.Options is in unnamed
>>> module of loader 'app'
>>>
>>> I was able to get things to 'run' by setting args to:
>>>
>>> args: ["python", "my_python_app.py", , ]
>>>
>>>
>>> But I'm not sure if things were running in a distributed fashion or not.
>>>
>>> 1/ Is there a good way to check if the task pods were being correctly
>>> utilized?
>>>
>>> 2/ Are there any similar examples to [0] for how to run Pyflink jobs on
>>> kubernetes?
>>>
>>> Open to any suggestions you may have. Note: we'd prefer not to run using
>>> the native K8S route outlined at [1] because we need to maintain the
>>> ability to customize certain aspects of the deployment (eg. mounting SSDs
>>> to some of the pods)
>>>
>>> Thanks in advance!
>>>
>>> [0]
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/kubernetes.html#application-cluster-resource-definitions
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/native_kubernetes.html#application-mode
>>>
>>>


Re: Python DataStream API Questions -- Java/Scala Interoperability?

2021-03-05 Thread Kevin Lam
Thanks Shuiqiang! That's really helpful, we'll give the connectors a try.

On Wed, Mar 3, 2021 at 4:02 AM Shuiqiang Chen  wrote:

> Hi Kevin,
>
> Thank you for your questions. Currently, users are not able to defined
> custom source/sinks in Python. This is a greate feature that can unify the
> end to end PyFlink application development in Python and is a large topic
> that we have no plan to support at present.
>
> As you have noticed that `the Python DataStream API has several connectors
> [2] that use Py4J+Java gateways to interoperate with Java source/sinks`.
> These connectors are the extensions of the Python abstract class named
> `SourceFunction` and `SinkFunction`. Thess two classes can accept a Java
> source/sink instance and maintain it to enable the interoperation between
> Python and Java.  They can also accept a string of the full name of a
> Java/Scala defined Source/SinkFunction class and create the corresponding
> java instance. Bellow are the definition of these classes:
>
> class JavaFunctionWrapper(object):
> """
> A wrapper class that maintains a Function implemented in Java.
> """
>
> def __init__(self, j_function: Union[str, JavaObject]):
> # TODO we should move this part to the get_java_function() to perform 
> a lazy load.
> if isinstance(j_function, str):
> j_func_class = get_gateway().jvm.__getattr__(j_function)
> j_function = j_func_class()
> self._j_function = j_function
>
> def get_java_function(self):
> return self._j_function
>
>
>
> class SourceFunction(JavaFunctionWrapper):
> """
> Base class for all stream data source in Flink.
> """
>
> def __init__(self, source_func: Union[str, JavaObject]):
> """
> Constructor of SinkFunction.
>
> :param source_func: The java SourceFunction object.
> """
> super(SourceFunction, self).__init__(source_func)
>
>
> class SinkFunction(JavaFunctionWrapper):
> """
> The base class for SinkFunctions.
> """
>
> def __init__(self, sink_func: Union[str, JavaObject]):
> """
> Constructor of SinkFunction.
>
> :param sink_func: The java SinkFunction object or the full name of the
> SinkFunction class.
> """
> super(SinkFunction, self).__init__(sink_func)
>
> Therefore, you are able to defined custom sources/sinks in Scala and apply
> them in Python. Here is the recommended approach for implementation:
>
> class MyBigTableSink(SinkFunction):
> def __init__(self, class_name: str):
> super(MyBigTableSink, self).__init__(class_name)
>
>
> def example():
> env = StreamExecutionEnvironment.get_execution_environment()
> env.add_jars('/the/path/of/your/MyBigTableSink.jar')
> # ...
> ds.add_sink(MyBigTableSink("com.mycompany.MyBigTableSink"))
>     env.execute("Application with Custom Sink")
>
>
> if __name__ == '__main__':
> example()
>
> Remember that you must add the jar of the Scala defined SinkFunction by
> calling `env.add_jars()` before adding the SinkFunction. And your custom
> sources/sinks function must be the extension of `SourceFunction` and
> `SinkFunction`.
>
> Any further questions are welcomed!
>
> Best,
> Shuiqiang
>
>
> Kevin Lam  于2021年3月3日周三 上午2:50写道:
>
>> Hello everyone,
>>
>> I have some questions about the Python API that hopefully folks in the
>> Apache Flink community can help with.
>>
>> A little background, I’m interested in using the Python Datastream API
>> because of stakeholders who don’t have a background in Scala/Java, and
>> would prefer Python if possible. Our team is open to maintaining Scala
>> constructs on our end, however we are looking to expose Flink for stateful
>> streaming via a Python API to end-users.
>>
>> Questions:
>>
>> 1/ The docs mention that custom Sources and Sinks cannot be defined in
>> Python, but must be written in Java/Scala [1]. What is the recommended
>> approach for interoperating between custom sinks/sources written in Scala,
>> with the Python API? If nothing is currently supported, is it on the road
>> map?
>>
>> 2/ Also, I’ve noted that the Python DataStream API has several connectors
>> [2] that use Py4J+Java gateways to interoperate with Java source/sinks. Is
>> there a way for users to build their own connectors? What would this
>> process entail?
>>
>> Ideally, we’d like to be able to define custom sources/sinks in Scala and
>> use them in our P

Running Pyflink job on K8s Flink Cluster Deployment?

2021-03-05 Thread Kevin Lam
Hello everyone,

I'm looking to run a Pyflink application run in a distributed fashion,
using kubernetes, and am currently facing issues. I've successfully gotten
a Scala Flink Application to run using the manifests provided at [0]

I attempted to run the application by updating the jobmanager command args
from

 args: ["standalone-job", "--job-classname", "com.job.ClassName",
, ]

to

args: ["standalone-job", "--python", "my_python_app.py", , ]

But this didn't work. It resulted in the following error:

Caused by: java.lang.LinkageError: loader constraint violation: loader
org.apache.flink.util.ChildFirstClassLoader @2d8f2f3a wants to load class
org.apache.commons.cli.Options. A different class with the same name was
previously loaded by 'app'. (org.apache.commons.cli.Options is in unnamed
module of loader 'app'

I was able to get things to 'run' by setting args to:

args: ["python", "my_python_app.py", , ]


But I'm not sure if things were running in a distributed fashion or not.

1/ Is there a good way to check if the task pods were being correctly
utilized?

2/ Are there any similar examples to [0] for how to run Pyflink jobs on
kubernetes?

Open to any suggestions you may have. Note: we'd prefer not to run using
the native K8S route outlined at [1] because we need to maintain the
ability to customize certain aspects of the deployment (eg. mounting SSDs
to some of the pods)

Thanks in advance!

[0]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/kubernetes.html#application-cluster-resource-definitions

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/native_kubernetes.html#application-mode


Python DataStream API Questions -- Java/Scala Interoperability?

2021-03-02 Thread Kevin Lam
Hello everyone,

I have some questions about the Python API that hopefully folks in the
Apache Flink community can help with.

A little background, I’m interested in using the Python Datastream API
because of stakeholders who don’t have a background in Scala/Java, and
would prefer Python if possible. Our team is open to maintaining Scala
constructs on our end, however we are looking to expose Flink for stateful
streaming via a Python API to end-users.

Questions:

1/ The docs mention that custom Sources and Sinks cannot be defined in
Python, but must be written in Java/Scala [1]. What is the recommended
approach for interoperating between custom sinks/sources written in Scala,
with the Python API? If nothing is currently supported, is it on the road
map?

2/ Also, I’ve noted that the Python DataStream API has several connectors
[2] that use Py4J+Java gateways to interoperate with Java source/sinks. Is
there a way for users to build their own connectors? What would this
process entail?

Ideally, we’d like to be able to define custom sources/sinks in Scala and
use them in our Python API Flink Applications. For example, defining a
BigTable sink in Scala for use in the Python API:


[3]

Where MyBigTableSink is just somehow importing a Scala defined sink.

More generally, we’re interested in learning more about Scala/Python
interoperability in Flink, and how we can expose the power of Flink’s Scala
APIs to Python. Open to any suggestions, strategies, etc.

Looking forward to any thoughts!


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/python_table_api_connectors.html#user-defined-sources--sinks

[2]
https://github.com/apache/flink/blob/b23c31075aeb8cf3dbedd4f1f3571d5ebff99c3d/flink-python/pyflink/datastream/connectors.py

[3] Plaintext paste of code in screenshot, in case of attachment issues:
```
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import MyBigTableSink

def example():
env = StreamExecutionEnvironment.get_execution_environment()
...
ds.add_sink(MyBigTableSink, ...)
env.execute("Application with Custom Sink")

if __name__ == '__main__':
example()
```