IllegalStateException: invalid BLOB

2024-05-21 Thread Lars Skjærven
Hello,

We're facing the bug reported in
https://issues.apache.org/jira/browse/FLINK-32212

More specifically, when kubernetes decides to drain a node, a job manager
restart (but not the task manager), the job fails with:

java.lang.IllegalStateException: The library registration references a
different set of library BLOBs than previous registrations for this job:
old:[p-46b02bb8d9740e39d3fe3b3782b0bd335b35ad9f-eefd1e86ec70ea3aaf3e3dce568fe172]
new:[p-46b02bb8d9740e39d3fe3b3782b0bd335b35ad9f-a91193b75fdb799d7b4b8de3f4984597]

deleting the pod(s) running the TM(s) resolves the problem, but is not
ideal for us.

Any status on FLINK-32212 to solve this problem?

Kind regards
Lars


Checkpointing while loading causing issues

2024-05-14 Thread Lars Skjærven
Hello,

When restarting jobs (e.g. after upgrade) with "large" state a task can
take some time to "initialize" (depending on the state size). During this
time I noticed that Flink attempts to checkpoint. In many cases
checkpointing will fail repeatedly, and cause the job to hit the
tolerable-failed-checkpoints limit and restart. The only way to overcome
the issue seems to be to increase the checkpoint interval, but this is
suboptimal.

Could Flink wait to trigger checkpointing when one or more task is
initializing?

Lars


There is no savepoint operation with triggerId

2024-03-25 Thread Lars Skjærven
Hello,
My job manager is constantly complaining with the following error:

"Exception occurred in REST handler: There is no savepoint operation with
triggerId= for job bc0cb60b710e64e23195aa1610ad790a".

"logger_name":"org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers$SavepointStatusHandler"

Checkpointing and savepointing seems to work as expected, but my logs are
flooded of these errors. Any tips ?

L


Stream enrichment with ingest mode

2024-02-13 Thread Lars Skjærven
Dear all,

A reoccurring challenge we have with stream enrichment in Flink is a robust
mechanism to estimate that all messages of the source(s) have been
consumed/processed before output is collected.

A simple example is two sources of catalogue metadata:
- source A delivers products,
- source B delivers product categories,

For a process function to enrich the categories with the number of products
in each category, we would do a KeyedCoProcessFunction (or a
RichCoFlatMap), keyed by category ID, and put both the category and
products in state. Then count all products for each keyed state and collect
the result.

Typically, however, we don't want to start counting before all products are
included in state (to avoid emitting incomplete aggregations downstream).
Therefore we use the event lag time (i.e. processing time - current
watermark) to indicate "ingest mode" of the processor (e.g. lag time > 30
seconds). When in "ingest mode" we will trigger a timer, and return without
collecting. Finally, the timer fires when the watermark has advanced
sufficiently.

This strategy of "ingest mode" (and timers) seems to be more complicated
when you have multiple process functions (with the same need of ingest
mode) downstream of the first one processor. The reason seems to be that
watermarks are passed from the first process function even though no
elements are collected. Therefore, when elements finally arrive at the
second process function, the current watermark has already advanced, so the
same strategy of watermarks is less robust.

I'm curious how others in the community handle this "challenge" of initial
ingest. Any ideas are greatly appreciated.

Note: we use a custom watermark generator that emits watermarks derived
from event time, and advances the watermarks when the source is idle for a
longer period (e.g. 30 seconds).

Thanks !

L


Re: Metrics with labels

2023-10-30 Thread Lars Skjærven
Registering the counter is fine, e.g. in `open()`:

lazy val responseCounter: Counter = getRuntimeContext
  .getMetricGroup
  .addGroup("response_code")
  .counter("myResponseCounter")

then, in def asyncInvoke(), I can still only do responseCounter.inc(), but
what I want is responseCounter.withLabelValue(200).inc()

I don't find the docs very useful. Here is a useful example of how I would
expect labels to work:
https://pkg.go.dev/github.com/prometheus/client_golang/prometheus#hdr-A_Basic_Example




On Tue, Oct 17, 2023 at 8:32 PM Chesnay Schepler  wrote:

> > I think this is a general issue with the Flink metrics.
>
> Not quite. There are a few instance in Flink were code wasn't updated to
> encode metadata as additional labels, and the RocksDB metrics may be one of
> them.
> Also for RocksDB, you could try setting
> "state.backend.rocksdb.metrics.column-family-as-variable: true" to resolve
> this particular problem.
>
> > If I define a custom metric, it is not supported to use labels
>
> You can do so via MetricGroup#addGroup(String key, String value).
> See
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/ops/metrics/#user-variables
>
> On 17/10/2023 14:31, Lars Skjærven wrote:
>
> Hello,
>
> We're experiencing difficulties in using Flink metrics in a generic way
> since various properties are included in the name of the metric itself.
> This makes it difficult to generate sensible (and general) dashboards (with
> aggregations).
>
> One example is the metric for rocksdb estimated live data size (
> state.backend.rocksdb.metrics.estimate-live-data-size). the name appears
> as :
> flink_taskmanager_job_task_operator__state_rocksdb_estimate_live_data_size
> .
>
> If, on the other hand, the state name was included as label, this would
> facilitate aggregation across states, i.e.:
>
> flink_taskmanager_job_task_operator_state_rocksdb_estimate_live_data_size{state_descriptor="my_state_descriptor"}
>
> I think this is a general issue with the Flink metrics. If I define a
> custom metric, it is not supported to use labels (
> https://prometheus.io/docs/practices/naming/#labels) in a dynamic way.
>
> Thanks !
>
> Lars
>
>
>


Metrics with labels

2023-10-17 Thread Lars Skjærven
Hello,

We're experiencing difficulties in using Flink metrics in a generic way
since various properties are included in the name of the metric itself.
This makes it difficult to generate sensible (and general) dashboards (with
aggregations).

One example is the metric for rocksdb estimated live data size (
state.backend.rocksdb.metrics.estimate-live-data-size). the name appears as
:
flink_taskmanager_job_task_operator__state_rocksdb_estimate_live_data_size
.

If, on the other hand, the state name was included as label, this would
facilitate aggregation across states, i.e.:
flink_taskmanager_job_task_operator_state_rocksdb_estimate_live_data_size{state_descriptor="my_state_descriptor"}

I think this is a general issue with the Flink metrics. If I define a
custom metric, it is not supported to use labels (
https://prometheus.io/docs/practices/naming/#labels) in a dynamic way.

Thanks !

Lars


Kafka coordinator not available

2023-07-20 Thread Lars Skjærven
Hello,
I experienced
CoordinatorNotAvailableException in my flink jobs after our kafka supplier
(aiven) did a maintenance update of the cluster. This update is performed
by starting up new kafka nodes, copying over data, and switching over
internally. The flink jobs runs as expected,  with the only issue that they
are unsuccessful in committing group offsets.

Restarting the job from checkpoint/savepoint resolves the issue, but I
would rather not restart all jobs after every kafka maintenance update.

Any ideas ?

Kind regards,
Lars Skjærven


Kubernetes Operator resource limits and requests

2023-02-08 Thread Lars Skjærven
Hello,

How can we define *limit* and *request* for the kubernetes pods as
described here:
https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/#requests-and-limits

Looks like we can only set one value for CPU and memory:
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/reference/#taskmanagerspec

limits and request seems to be supported in the Spotify version of the
operator:
https://github.com/spotify/flink-on-k8s-operator/blob/master/docs/crd.md#taskmanagerspec

Kind regards,
Lars


Re: Could not restore keyed state backend for KeyedProcessOperator

2022-12-15 Thread Lars Skjærven
Same error again today. Any tips ? I'm considering downgrading to Flink
1.14 ?

On Wed, Dec 14, 2022 at 11:51 AM Lars Skjærven  wrote:

> As far as I understand we are not specifying anything on restore mode. so
> I guess default (NO_CLAIM) is what we're using.
>
> We're using ververica platform to handle deploys, and things are a bit
> obscure on what happens underneath.
>
> It happened again this morning:
>
> Caused by: java.io.FileNotFoundException: Item not found: 
> 'gs://bucketname/namespace/flink-jobs/namespaces/default/jobs/fbdde9e7-cf5a-44b4-a3d4-d3ed517432a0/checkpoints/fbdde9e7cf5a44b4a3d4d3ed517432a0/shared/ae551eda-a588-45be-ba08-32bfbc50e965'.
>  Note, it is possible that the live version is still available but the 
> requested generation is deleted.
>
>
> On Tue, Dec 13, 2022 at 11:37 PM Martijn Visser 
> wrote:
>
>> Hi Lars,
>>
>> Have you used any of the new restore modes that were introduced with
>> 1.15? https://flink.apache.org/2022/05/06/restore-modes.html
>>
>> Best regards,
>>
>> Martijn
>>
>> On Fri, Dec 9, 2022 at 2:52 PM Lars Skjærven  wrote:
>>
>>> Lifecycle rulesNone
>>>
>>> On Fri, Dec 9, 2022 at 3:17 AM Hangxiang Yu  wrote:
>>>
>>>> Hi, Lars.
>>>> Could you check whether you have configured the lifecycle of google
>>>> cloud storage[1] which is not recommended in the flink checkpoint usage?
>>>>
>>>> [1] https://cloud.google.com/storage/docs/lifecycle
>>>>
>>>> On Fri, Dec 9, 2022 at 2:02 AM Lars Skjærven  wrote:
>>>>
>>>>> Hello,
>>>>> We had an incident today with a job that could not restore after crash
>>>>> (for unknown reason). Specifically, it fails due to a missing checkpoint
>>>>> file. We've experienced this a total of three times with Flink 1.15.2, but
>>>>> never with 1.14.x. Last time was during a node upgrade, but that was not
>>>>> the case this time.
>>>>>
>>>>> I've not been able to reproduce this issue. I've checked that I can
>>>>> kill the taskmanager and jobmanager (using kubectl delete pod), and the 
>>>>> job
>>>>> restores as expected.
>>>>>
>>>>> The job is running with kubernetes high availability, rocksdb and
>>>>> incremental checkpointing.
>>>>>
>>>>> Any tips are highly appreciated.
>>>>>
>>>>> Thanks,
>>>>> Lars
>>>>>
>>>>> Caused by: org.apache.flink.util.FlinkException: Could not restore
>>>>> keyed state backend for
>>>>> KeyedProcessOperator_bf374b554824ef28e76619f4fa153430_(2/2) from any of 
>>>>> the
>>>>> 1 provided restore options.
>>>>> at
>>>>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
>>>>> at
>>>>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:346)
>>>>> at
>>>>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:164)
>>>>> ... 11 more
>>>>> Caused by: org.apache.flink.runtime.state.BackendBuildingException:
>>>>> Caught unexpected exception.
>>>>> at
>>>>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:395)
>>>>> at
>>>>> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:483)
>>>>> at
>>>>> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:97)
>>>>> at
>>>>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:329)
>>>>> at
>>>>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
>>>>> at
>>>>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>>>>> ... 13 more
>>>>> Caused by: java.io.FileNotFoundException: Item not found:
>>>>> 'gs://some-bucket-name/flink-jobs/namespaces/default/jobs/d60a6c94-ddbc-42a1-947e-90f62749835a/checkpoints/d60a6c94ddbc42a1947e90f62749835a/shared/3cb2bb55-b4b0-44e5-948a-5d38ec088253'.
>>>>> Note, it is possible that the live version is still available but the
>>>>> requested generation is deleted.
>>>>> at
>>>>> com.google.cloud.hadoop.gcsio.GoogleCloudStorageExceptions.createFileNotFoundException(GoogleCloudStorageExceptions.java:46)
>>>>>
>>>>>
>>>>
>>>> --
>>>> Best,
>>>> Hangxiang.
>>>>
>>>


Re: Could not restore keyed state backend for KeyedProcessOperator

2022-12-14 Thread Lars Skjærven
As far as I understand we are not specifying anything on restore mode. so I
guess default (NO_CLAIM) is what we're using.

We're using ververica platform to handle deploys, and things are a bit
obscure on what happens underneath.

It happened again this morning:

Caused by: java.io.FileNotFoundException: Item not found:
'gs://bucketname/namespace/flink-jobs/namespaces/default/jobs/fbdde9e7-cf5a-44b4-a3d4-d3ed517432a0/checkpoints/fbdde9e7cf5a44b4a3d4d3ed517432a0/shared/ae551eda-a588-45be-ba08-32bfbc50e965'.
Note, it is possible that the live version is still available but the
requested generation is deleted.


On Tue, Dec 13, 2022 at 11:37 PM Martijn Visser 
wrote:

> Hi Lars,
>
> Have you used any of the new restore modes that were introduced with 1.15?
> https://flink.apache.org/2022/05/06/restore-modes.html
>
> Best regards,
>
> Martijn
>
> On Fri, Dec 9, 2022 at 2:52 PM Lars Skjærven  wrote:
>
>> Lifecycle rulesNone
>>
>> On Fri, Dec 9, 2022 at 3:17 AM Hangxiang Yu  wrote:
>>
>>> Hi, Lars.
>>> Could you check whether you have configured the lifecycle of google
>>> cloud storage[1] which is not recommended in the flink checkpoint usage?
>>>
>>> [1] https://cloud.google.com/storage/docs/lifecycle
>>>
>>> On Fri, Dec 9, 2022 at 2:02 AM Lars Skjærven  wrote:
>>>
>>>> Hello,
>>>> We had an incident today with a job that could not restore after crash
>>>> (for unknown reason). Specifically, it fails due to a missing checkpoint
>>>> file. We've experienced this a total of three times with Flink 1.15.2, but
>>>> never with 1.14.x. Last time was during a node upgrade, but that was not
>>>> the case this time.
>>>>
>>>> I've not been able to reproduce this issue. I've checked that I can
>>>> kill the taskmanager and jobmanager (using kubectl delete pod), and the job
>>>> restores as expected.
>>>>
>>>> The job is running with kubernetes high availability, rocksdb and
>>>> incremental checkpointing.
>>>>
>>>> Any tips are highly appreciated.
>>>>
>>>> Thanks,
>>>> Lars
>>>>
>>>> Caused by: org.apache.flink.util.FlinkException: Could not restore
>>>> keyed state backend for
>>>> KeyedProcessOperator_bf374b554824ef28e76619f4fa153430_(2/2) from any of the
>>>> 1 provided restore options.
>>>> at
>>>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
>>>> at
>>>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:346)
>>>> at
>>>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:164)
>>>> ... 11 more
>>>> Caused by: org.apache.flink.runtime.state.BackendBuildingException:
>>>> Caught unexpected exception.
>>>> at
>>>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:395)
>>>> at
>>>> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:483)
>>>> at
>>>> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:97)
>>>> at
>>>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:329)
>>>> at
>>>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
>>>> at
>>>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>>>> ... 13 more
>>>> Caused by: java.io.FileNotFoundException: Item not found:
>>>> 'gs://some-bucket-name/flink-jobs/namespaces/default/jobs/d60a6c94-ddbc-42a1-947e-90f62749835a/checkpoints/d60a6c94ddbc42a1947e90f62749835a/shared/3cb2bb55-b4b0-44e5-948a-5d38ec088253'.
>>>> Note, it is possible that the live version is still available but the
>>>> requested generation is deleted.
>>>> at
>>>> com.google.cloud.hadoop.gcsio.GoogleCloudStorageExceptions.createFileNotFoundException(GoogleCloudStorageExceptions.java:46)
>>>>
>>>>
>>>
>>> --
>>> Best,
>>> Hangxiang.
>>>
>>


Re: Could not restore keyed state backend for KeyedProcessOperator

2022-12-09 Thread Lars Skjærven
Lifecycle rulesNone

On Fri, Dec 9, 2022 at 3:17 AM Hangxiang Yu  wrote:

> Hi, Lars.
> Could you check whether you have configured the lifecycle of google cloud
> storage[1] which is not recommended in the flink checkpoint usage?
>
> [1] https://cloud.google.com/storage/docs/lifecycle
>
> On Fri, Dec 9, 2022 at 2:02 AM Lars Skjærven  wrote:
>
>> Hello,
>> We had an incident today with a job that could not restore after crash
>> (for unknown reason). Specifically, it fails due to a missing checkpoint
>> file. We've experienced this a total of three times with Flink 1.15.2, but
>> never with 1.14.x. Last time was during a node upgrade, but that was not
>> the case this time.
>>
>> I've not been able to reproduce this issue. I've checked that I can kill
>> the taskmanager and jobmanager (using kubectl delete pod), and the job
>> restores as expected.
>>
>> The job is running with kubernetes high availability, rocksdb and
>> incremental checkpointing.
>>
>> Any tips are highly appreciated.
>>
>> Thanks,
>> Lars
>>
>> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
>> state backend for
>> KeyedProcessOperator_bf374b554824ef28e76619f4fa153430_(2/2) from any of the
>> 1 provided restore options.
>> at
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
>> at
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:346)
>> at
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:164)
>> ... 11 more
>> Caused by: org.apache.flink.runtime.state.BackendBuildingException:
>> Caught unexpected exception.
>> at
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:395)
>> at
>> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:483)
>> at
>> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:97)
>> at
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:329)
>> at
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
>> at
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>> ... 13 more
>> Caused by: java.io.FileNotFoundException: Item not found:
>> 'gs://some-bucket-name/flink-jobs/namespaces/default/jobs/d60a6c94-ddbc-42a1-947e-90f62749835a/checkpoints/d60a6c94ddbc42a1947e90f62749835a/shared/3cb2bb55-b4b0-44e5-948a-5d38ec088253'.
>> Note, it is possible that the live version is still available but the
>> requested generation is deleted.
>> at
>> com.google.cloud.hadoop.gcsio.GoogleCloudStorageExceptions.createFileNotFoundException(GoogleCloudStorageExceptions.java:46)
>>
>>
>
> --
> Best,
> Hangxiang.
>


Could not restore keyed state backend for KeyedProcessOperator

2022-12-08 Thread Lars Skjærven
Hello,
We had an incident today with a job that could not restore after crash (for
unknown reason). Specifically, it fails due to a missing checkpoint file.
We've experienced this a total of three times with Flink 1.15.2, but never
with 1.14.x. Last time was during a node upgrade, but that was not the case
this time.

I've not been able to reproduce this issue. I've checked that I can kill
the taskmanager and jobmanager (using kubectl delete pod), and the job
restores as expected.

The job is running with kubernetes high availability, rocksdb and
incremental checkpointing.

Any tips are highly appreciated.

Thanks,
Lars

Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
state backend for
KeyedProcessOperator_bf374b554824ef28e76619f4fa153430_(2/2) from any of the
1 provided restore options.
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:346)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:164)
... 11 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught
unexpected exception.
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:395)
at
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:483)
at
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:97)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:329)
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
... 13 more
Caused by: java.io.FileNotFoundException: Item not found:
'gs://some-bucket-name/flink-jobs/namespaces/default/jobs/d60a6c94-ddbc-42a1-947e-90f62749835a/checkpoints/d60a6c94ddbc42a1947e90f62749835a/shared/3cb2bb55-b4b0-44e5-948a-5d38ec088253'.
Note, it is possible that the live version is still available but the
requested generation is deleted.
at
com.google.cloud.hadoop.gcsio.GoogleCloudStorageExceptions.createFileNotFoundException(GoogleCloudStorageExceptions.java:46)


Switching kafka brokers

2022-10-06 Thread Lars Skjærven
Hello,

What is the recommended approach for migrating flink jobs to a new kafka
server? I was naively hoping to use Kafka Mirror Maker to sync the old
server with the new server, and simply continue from savepoint with updated
URL's. Unfortunately, the kafka offsets are not identical for log compacted
topics when using mirror maker. Any tips ?

L


Re: Cassandra sink with Flink 1.15

2022-09-08 Thread Lars Skjærven
Thanks !

For reference, solved with mapping to Flink tuples.



On Wed, Sep 7, 2022 at 2:27 PM Chesnay Schepler  wrote:

> Are you running into this in the IDE, or when submitting the job to a
> Flink cluster?
>
> If it is the first, then you're probably affected by the Scala-free Flink
> efforts. Either add an explicit dependency on flink-streaming-scala or
> migrate to Flink tuples.
>
> On 07/09/2022 14:17, Lars Skjærven wrote:
>
> Hello,
>
> When upgrading from 1.14 to 1.15 we bumped into a type issue when
> attempting to sink to Cassandra (scala 2.12.13). This was working nicely in
> 1.14. Any tip is highly appreciated.
>
> Using a MapFunction() to generate the stream of tuples:
>
> CassandraSink
>  .addSink(
> mystream.map(new ToTupleMapper)
>   )...
>
> Exception: No support for the type of the given DataStream:
> GenericType
>
> Or with a lambda function:
>
> CassandraSink
>  .addSink(
> mystream.map((v: MyCaseClass => (v.key v.someLongValue))
>   )...
>
> Caused by: org.apache.flink.api.common.functions.InvalidTypesException:
> The generic type parameters of 'Tuple2' are missing. In many cases lambda
> methods don't provide enough information for automatic type extraction when
> Java generics are involved. An easy workaround is to use an (anonymous)
> class instead that implements the
> 'org.apache.flink.api.common.functions.MapFunction' interface. Otherwise
> the type has to be specified explicitly using type information.
>
>
>
>
>
>


Cassandra sink with Flink 1.15

2022-09-07 Thread Lars Skjærven
Hello,

When upgrading from 1.14 to 1.15 we bumped into a type issue when
attempting to sink to Cassandra (scala 2.12.13). This was working nicely in
1.14. Any tip is highly appreciated.

Using a MapFunction() to generate the stream of tuples:

CassandraSink
 .addSink(
mystream.map(new ToTupleMapper)
  )...

Exception: No support for the type of the given DataStream:
GenericType

Or with a lambda function:

CassandraSink
 .addSink(
mystream.map((v: MyCaseClass => (v.key v.someLongValue))
  )...

Caused by: org.apache.flink.api.common.functions.InvalidTypesException: The
generic type parameters of 'Tuple2' are missing. In many cases lambda
methods don't provide enough information for automatic type extraction when
Java generics are involved. An easy workaround is to use an (anonymous)
class instead that implements the
'org.apache.flink.api.common.functions.MapFunction' interface. Otherwise
the type has to be specified explicitly using type information.


Window function - flush on job stop

2022-01-21 Thread Lars Skjærven
We're doing a stream.keyBy().window().aggregate() to aggregate customer
feedback into sessions. Every now and then we have to update the job, e.g.
change the key, so that we can't easlily continue from the previous state.

Cancelling the job (without restarting from last savepoint) will result in
loosing ongoing sessions. So we typically go back a few hours when we
restart to minimize the loss.

Is there any way of making the job flush it's content (sessions) on job
cancellation? That will result in splitting ongoing sessions in two, which
is perfectly fine for our purpose.

Any thoughts ?

Lars


WindowOperator TestHarness

2021-12-10 Thread Lars Skjærven
Hello,

We're trying to write a test for an implementation of *AggregateFunction*
following a *EventTimeSessionWindows.withGap*. We gave it a try using
*WindowOperator*() which we hoped could be used as an argument to
*KeyedOneInputStreamOperatorTestHarness*. We're a bit stuck, and we're
hoping someone has a tip or two. Specifically, we can't find the right
*InternalWindowFunction* to pass to WindowOperator(). Below, *MyAggregator*
is our implementation of the
*AggregateFunction. *

Does anyone have a template, or guide, to test a windowed aggregate
function?

Kind regards,
Lars


val myWindowOperator = new WindowOperator(
  EventTimeSessionWindows.withGap(Time.seconds(10)),
  new TimeWindow.Serializer(),
  new KeySelector[MyInputType, (String, String)] {
override def getKey(value: MyInputType): (String, String) = {
  (value.a, value.b)
}
  },
  Types.TUPLE(Types.STRING).createSerializer(
new ExecutionConfig
  ),
  new AggregatingStateDescriptor[MyInputType, MyAggregateState,
MyOutputType](
"test", new MyAggregator, classOf[MyAggregateState],
  ),
  ???,
  EventTimeTrigger.create(),
  0,
  null
)

testHarness = new KeyedOneInputStreamOperatorTestHarness[(String,
String), MyInputType, MyOutputType](
  myWindowOperator,
  new KeySelector[MyInputType, (String, String)] {
override def getKey(value: MyInputType): (String, String) = {
  (value.a, value.b)
}
  },
  createTuple2TypeInformation(Types.STRING, Types.STRING)
)


Re: Scala Case Class Serialization

2021-12-07 Thread Lars Skjærven
Thanks for quick response. Please find attached a minimal example
illustrating the issue. I've added implicit TypeInformation, and checked
that I'm importing the scala variant only.

Matthias: Just my superficial impression from [1]. Will look into
TypeInfoFactory.

Thanks again!

package com.mystuff
import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.api.common.state.{MapState, MapStateDescriptor}
import org.apache.flink.api.common.typeinfo.{TypeInformation}
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.util.Collector

case class TestCaseClass(id: String, pos: Int)

class MyRichFlatMap extends RichFlatMapFunction[TestCaseClass, String] {
  implicit val ttestclass: TypeInformation[TestCaseClass] =
createTypeInformation[TestCaseClass]

  lazy val myState: MapState[String, TestCaseClass] =
getRuntimeContext.getMapState(
new MapStateDescriptor[String, TestCaseClass]("test-state",
classOf[String], ttestclass.getTypeClass)
  )

  override def flatMap(value: TestCaseClass, out: Collector[String]): Unit
= {
myState.put(value.id, value)
myState.get(value.id)
out.collect(value.id)
  }
}

object TestJob {

  def main(args: Array[String]): Unit = {

val env = StreamExecutionEnvironment.createLocalEnvironment()
env.getConfig.disableGenericTypes()

val s = Seq[TestCaseClass](
  TestCaseClass(id = "1", pos = 1),
  TestCaseClass(id = "2", pos = 2),
  TestCaseClass(id = "3", pos = 3),
)

env
  .fromCollection[TestCaseClass](s)
  .keyBy(s => s.id)
  .flatMap(new MyRichFlatMap)
  .print()

env.execute("Test Job")
  }
}

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#defining-type-information-using-a-factory

On Tue, Dec 7, 2021 at 2:25 PM Roman Grebennikov  wrote:

> Hi Lars,
>
> can you please show a small reproducer of the way you construct the
> DataStream, and which imports do you use?
>
> We also often experience similar performance issues with scala, but
> usually they are related to accidental usage of Flink Java API. A couple of
> hints from my experience:
> 1. Make sure that you always use the scala DataStream, and not the java
> one.
> 2. All operations on scala datastream require an implicit
> TypeInformation[T] parameter, which is usually generated automatically for
> you if you do an "import org.apache.flink.api.scala._" by the
> createTypeInformation[T] macro. So make sure you have this import present.
> 3. You can do a "env.getConfig.disableGenericTypes" and flink will throw
> an exception each time it have to fall back to generic kryo serialization.
> Backtrace will highlight you an exact place in your code where it have to
> do a kryo fallback.
>
> Also Flink will always revert to Kryo in case if you use sum types (or
> ADTs, or "sealed traits"). Shameless plug: we made a library to support
> that: https://github.com/findify/flink-adt
>
> Roman Grebennikov | g...@dfdx.me
>
>
> On Tue, Dec 7, 2021, at 11:20, Matthias Pohl wrote:
>
> Hi Lars,
> not sure about the out-of-the-box support for case classes with primitive
> member types (could you refer to the section which made you conclude
> this?). I haven't used Scala with Flink, yet. So maybe, others can give
> more context.
> But have you looked into using the TypeInfoFactory to define the schema
> [1]?
>
> Best,
> Matthias
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#defining-type-information-using-a-factory
>
> On Tue, Dec 7, 2021 at 10:03 AM Lars Skjærven  wrote:
>
> Hello,
> We're running Flink 1.14 with scala, and we're suspecting that performance
> is suffering due to serialization of some scala case classes. Specifically
> we're seeing that our Case Class "cannot be used as a POJO type because not
> all fields are valid POJO fields, and must be processed as GenericType",
> and that the case class "does not contain a setter for field X". I'm
> interpreting these log messages as performance warnings.
>
> A simple case class example we're writing to state that triggers the
> mentioned 'warnings':
>
> case class Progress(position: Int, eventTime: Int, alive: Boolean)
>
> I'm understanding the docs that case classes with primitive types should
> be supported "out of the box".
>
> Any tips on how to proceed ?
>
> Kind regards,
> Lars
>
>
>
>


Scala Case Class Serialization

2021-12-07 Thread Lars Skjærven
Hello,
We're running Flink 1.14 with scala, and we're suspecting that performance
is suffering due to serialization of some scala case classes. Specifically
we're seeing that our Case Class "cannot be used as a POJO type because not
all fields are valid POJO fields, and must be processed as GenericType",
and that the case class "does not contain a setter for field X". I'm
interpreting these log messages as performance warnings.

A simple case class example we're writing to state that triggers the
mentioned 'warnings':
case class Progress(position: Int, eventTime: Int, alive: Boolean)

I'm understanding the docs that case classes with primitive types should be
supported "out of the box".

Any tips on how to proceed ?

Kind regards,
Lars


KafkaSink.builder setDeliveryGuarantee is not a member

2021-12-02 Thread Lars Skjærven
Hello,
upgrading to 1.14 I bumped into an issue with the kafka sink builder when
defining delivery guarantee:

value setDeliveryGuarantee is not a member of
org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchemaBuilder[]


Seems to be working with the default value (i.e. without mentioning
setDeliveryGuarantee), but compile error when including it.

Is it better to leave it with the default, and let the application cluster
config define this ?

I believe I build the KafkaSink according to the docs:

import org.apache.flink.connector.base.DeliveryGuarantee
import
org.apache.flink.connector.kafka.sink.{KafkaRecordSerializationSchema,
KafkaSink}
import org.apache.flink.connector.kafka.source.KafkaSource
import
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer

val kafkaSink: KafkaSink[SomePBStuff] = KafkaSink.builder[SomePBStuff]()
  .setBootstrapServers("...")
  .setRecordSerializer(
KafkaRecordSerializationSchema
  .builder[SomePBStuff]()
  .setTopic("mytopic")
  .setKeySerializationSchema((v: SomePBStuff) =>
v.key.getBytes(StandardCharsets.UTF_8))
  .setValueSerializationSchema((v: SomePBStuff) => v.toByteArray)
  .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
  .build()
  )
  .build()


in build.sbt I have:

ThisBuild / scalaVersion := "2.12.13"
val flinkVersion = "1.14.0"

val flinkDependencies = Seq(
  "org.apache.flink" % "flink-runtime" % flinkVersion % Test,

  "org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
  "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided",
  "org.apache.flink" %% "flink-streaming-java" % flinkVersion % "provided",
  "org.apache.flink" %% "flink-clients" % flinkVersion % "provided",

  "org.apache.flink" %% "flink-connector-kafka" % flinkVersion,
  "org.apache.flink" %% "flink-gelly-scala" % flinkVersion,
  "org.apache.flink" %% "flink-test-utils" % flinkVersion % Test,
)


Building a flink connector

2021-09-17 Thread Lars Skjærven
We're in need of a Google Bigtable flink connector. Do you have any tips on
how this could be done, e.g. general guidelines on how to write a connector
?

Thanks,
Lars


Re: KafkaSource builder and checkpointing with parallelism > kafka partitions

2021-09-16 Thread Lars Skjærven
Thanks for the feedback.

> May I ask why you have less partitions than the parallelism? I would be
happy to learn more about your use-case to better understand the
> motivation.

The use case is that topic A, contains just a few messages with product
metadata that rarely gets updated, while topic B contains user interactions
with the products (and many more messages). For topic A we thought that one
partition will be sufficient to keep the metadata, while we have 32
partitions for topic B. Due to the load on topic B, we're use a parallelism
of 2-8.

Thanks,
Lars



On Thu, Sep 16, 2021 at 9:09 AM Fabian Paul 
wrote:

> Hi all,
>
> The problem you are seeing Lars is somewhat intended behaviour,
> unfortunately. With the batch/stream unification every Kafka partition is
> treated
> as kind of workload assignment. If one subtask receives a signal that
> there is no workload anymore it goes into the FINISHED state.
> As already pointed this restriction will lift in the near future.
>
> I went through the code and I think in your case you can configure the
> following configuration [1] which should show an equal behaviour than the
> old source. This will prevent the enumerator from sending a final signal
> to the subtasks and they will not go into finished state.
>
> May I ask why you have less partitions than the parallelism? I would be
> happy to learn more about your use-case to better understand the
> motivation.
>
> Best,
> Fabian
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#dynamic-partition-discovery


Re: KafkaSource builder and checkpointing with parallelism > kafka partitions

2021-09-15 Thread Lars Skjærven
Got it. So the workaround for now (1.13.2) is to fall back to
FlinkKafkaConsumer if I read you correctly.
Thanks
L

On Wed, Sep 15, 2021 at 2:58 PM Matthias Pohl 
wrote:

> Hi Lars,
> I guess you are looking
> for execution.checkpointing.checkpoints-after-tasks-finish.enabled [1].
> This configuration parameter is going to be introduced in the upcoming
> Flink 1.14 release.
>
> Best,
> Matthias
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#execution-checkpointing-checkpoints-after-tasks-finish-enabled
>
> On Wed, Sep 15, 2021 at 11:26 AM Lars Skjærven  wrote:
>
>> Using KafkaSource builder with a job parallelism larger than the number
>> of kafka partitions, the job is unable to checkpoint.
>>
>> With a job parallelism of 4, 3 of the tasks are marked as FINISHED for
>> the kafka topic with one partition. For this reason checkpointing seems to
>> be disabled.
>>
>> When using FlinkKafkaConsumer (instead of KafkaSource builder) we don't
>> see this behavior, and all 4 tasks have status RUNNING.
>>
>> Is there any way of using KafkaSource builder ang get the same behavior
>> as FlinkKafkaConsumer for the number of tasks being used ?
>>
>> Code with KafkaSource.builder:
>>
>> val metadataSource = KafkaSource.builder[Metadata]()
>>   .setBootstrapServers("kafka-server")
>>   .setGroupId("my-group")
>>   .setTopics("my-topic")
>>   .setDeserializer(new MetadataDeserializationSchema)
>>   .setStartingOffsets(OffsetsInitializer.earliest())
>>   .build()
>>
>> Code with FlinkKafkaConsumer:
>> val metadataSource = new FlinkKafkaConsumer[Metadata](
>>   "my-topic",
>>   new MetadataDeserializationSchema,
>>   "my-server)
>>   .setStartFromEarliest()
>>
>> Thanks in advance,
>> Lars
>>
>


KafkaSource builder and checkpointing with parallelism > kafka partitions

2021-09-15 Thread Lars Skjærven
Using KafkaSource builder with a job parallelism larger than the number of
kafka partitions, the job is unable to checkpoint.

With a job parallelism of 4, 3 of the tasks are marked as FINISHED for the
kafka topic with one partition. For this reason checkpointing seems to be
disabled.

When using FlinkKafkaConsumer (instead of KafkaSource builder) we don't see
this behavior, and all 4 tasks have status RUNNING.

Is there any way of using KafkaSource builder ang get the same behavior as
FlinkKafkaConsumer for the number of tasks being used ?

Code with KafkaSource.builder:

val metadataSource = KafkaSource.builder[Metadata]()
  .setBootstrapServers("kafka-server")
  .setGroupId("my-group")
  .setTopics("my-topic")
  .setDeserializer(new MetadataDeserializationSchema)
  .setStartingOffsets(OffsetsInitializer.earliest())
  .build()

Code with FlinkKafkaConsumer:
val metadataSource = new FlinkKafkaConsumer[Metadata](
  "my-topic",
  new MetadataDeserializationSchema,
  "my-server)
  .setStartFromEarliest()

Thanks in advance,
Lars


Re: KafkaSourceBuilder causing invalid negative offset on checkpointing

2021-04-29 Thread Lars Skjærven
Unfortunately, I only have the truncated stack trace available (from the flink 
UI).
L



2021-04-27 16:32:02
java.lang.Exception: Could not perform checkpoint 10 for operator Source: 
progress source (4/6)#9.
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:924)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$8(StreamTask.java:885)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not 
complete snapshot 10 for operator Source: progress source (4/6)#9. Failure 
reason: Checkpoint was declined.
at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:241)
at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:162)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371)
at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:686)
at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:607)
at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:572)
at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:298)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$9(StreamTask.java:1004)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:988)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:912)
... 10 more
Caused by: java.lang.IllegalArgumentException: Invalid negative offset
at 
org.apache.kafka.clients.consumer.OffsetAndMetadata.(OffsetAndMetadata.java:50)
at 
org.apache.kafka.clients.consumer.OffsetAndMetadata.(OffsetAndMetadata.java:69)
at 
org.apache.flink.connector.kafka.source.reader.KafkaSourceReader.snapshotState(KafkaSourceReader.java:96)
at 
org.apache.flink.streaming.api.operators.SourceOperator.snapshotState(SourceOperator.java:288)
at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:205)
... 20 more



From: Till Rohrmann 
Sent: Thursday, April 29, 2021 18:44
To: Lars Skjærven 
Cc: Becket Qin ; user@flink.apache.org 

Subject: Re: KafkaSourceBuilder causing invalid negative offset on checkpointing

Thanks for the additional information Lars. Could you maybe also share the full 
stack traces of the errors you see when the checkpoint fails?

@Becket Qin<mailto:becket@gmail.com> is it a known issue with the new Kafka 
sources trying to checkpoint negative offsets?

Cheers,
Till

On Thu, Apr 29, 2021 at 1:06 PM Lars Skjærven 
mailto:lars.skjer...@tv2.no>> wrote:
Thanks Till.

Here is how we created the KafkaSource:

val sensorSource = KafkaSource.builder[SensorInput]()
  .setBootstrapServers(myConfig.kafkaBrokers)
  .setGroupId(myConfig.kafkaGroupId)
  .setTopics(myConfig.kafkaTopicIn)
  .setDeserializer(new SensorInputPBDeserializationSchema)
  .setStartingOffsets(OffsetsInitializer.earliest())
  .build()

The stream was built with

env.fromSource(sensorSource , WatermarkStrategy.forMonotonousTimestamps(), 
"sensor events")

The SensorInputPBDeserializationSchema is a basic KafkaRecordDeserializer that 
does SensorInputPB.parseFrom(record.value()) and finally collector.collect(v)

>From here on we're doing a keyed windowed aggregation with 
>.keyBy(...).window(EventTimeSessionWindows.withGap(Time.seconds(60))).aggregat

Re: KafkaSourceBuilder causing invalid negative offset on checkpointing

2021-04-29 Thread Lars Skjærven
Thanks Till.

Here is how we created the KafkaSource:

val sensorSource = KafkaSource.builder[SensorInput]()
  .setBootstrapServers(myConfig.kafkaBrokers)
  .setGroupId(myConfig.kafkaGroupId)
  .setTopics(myConfig.kafkaTopicIn)
  .setDeserializer(new SensorInputPBDeserializationSchema)
  .setStartingOffsets(OffsetsInitializer.earliest())
  .build()

The stream was built with

env.fromSource(sensorSource , WatermarkStrategy.forMonotonousTimestamps(), 
"sensor events")

The SensorInputPBDeserializationSchema is a basic KafkaRecordDeserializer that 
does SensorInputPB.parseFrom(record.value()) and finally collector.collect(v)

>From here on we're doing a keyed windowed aggregation with 
>.keyBy(...).window(EventTimeSessionWindows.withGap(Time.seconds(60))).aggregate(new
> SensorEventAggregator)

L



From: Till Rohrmann 
Sent: Thursday, April 29, 2021 09:16
To: Lars Skjærven ; Becket Qin 
Cc: user@flink.apache.org 
Subject: Re: KafkaSourceBuilder causing invalid negative offset on checkpointing

Hi Lars,

The KafkaSourceBuilder constructs the new KafkaSource which has not been fully 
hardened in 1.12.2. In fact, it should not be documented yet. I think you are 
running into an instability/bug of. The new Kafka source should be hardened a 
lot more in the 1.13.0 release.

Could you tell us exactly how you created the KafkaSource so that we can verify 
that this problem has been properly fixed in the 1.13.0 release? I am also 
pulling in Becket who is the original author of this connector. He might be 
able to tell you more.

Cheers,
Till

On Wed, Apr 28, 2021 at 10:36 AM Lars Skjærven 
mailto:lars.skjer...@tv2.no>> wrote:
Hello,
I ran into some issues when using the new KafkaSourceBuilder (running Flink 
1.12.2, scala 2.12.13, on ververica platform in a container with java 8). 
Initially it generated warnings on kafka configuration, but the job was able to 
consume and produce messages.


The configuration 'client.id.prefix' was supplied but isn't a known config.
The configuration 
'partition.discovery.interval.ms<http://partition.discovery.interval.ms>' was 
supplied but isn't a known config.


Finally the job crashed with a checkpointing error:


java.lang.Exception: Could not perform checkpoint 10 for operator Source: 
progress source (4/6)#9.

Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not 
complete snapshot 10 for operator Source: progress source (4/6)#9. Failure 
reason: Checkpoint was declined.
...
Caused by: java.lang.IllegalArgumentException: Invalid negative offset



Switching back to using FlinkKafkaConsumer, the warnings on the kafka config 
disapeared, and the job was able to checkpoint successfully.

I'm wondering if the warnings and the errors are connected, and if there is a 
compatibility issue between the newer KafkaSourceBuilder and Kafka 2.5 ?

Thanks,
L



KafkaSourceBuilder causing invalid negative offset on checkpointing

2021-04-28 Thread Lars Skjærven
Hello,
I ran into an issue when using the new KafkaSourceBuilder (running Flink
1.12.2, scala 2.12.13, on ververica platform in a container with java 8).
Initially it generated warnings on kafka configuration, but the job was
able to consume and produce messages.

The configuration 'client.id.prefix' was supplied but isn't a known
config.
The configuration 'partition.discovery.interval.ms' was supplied but
isn't a known config.


Finally the job crashed with a checkpointing error:

java.lang.Exception: Could not perform checkpoint 10 for operator
Source: progress source (4/6)#9.

Caused by: org.apache.flink.runtime.checkpoint.CheckpointException:
Could not complete snapshot 10 for operator Source: progress source
(4/6)#9. Failure reason: Checkpoint was declined.
...
Caused by: java.lang.IllegalArgumentException: Invalid negative offset



Switching back to using FlinkKafkaConsumer, the warnings on the kafka
config disappeared, and the job was able to checkpoint successfully.

I'm wondering if the warnings and the errors are related, and if there is a
compatibility issue between the newer KafkaSourceBuilder and Kafka 2.5 ?

Thanks,
L


KafkaSourceBuilder causing invalid negative offset on checkpointing

2021-04-28 Thread Lars Skjærven
Hello,
I ran into some issues when using the new KafkaSourceBuilder (running Flink 
1.12.2, scala 2.12.13, on ververica platform in a container with java 8). 
Initially it generated warnings on kafka configuration, but the job was able to 
consume and produce messages.


The configuration 'client.id.prefix' was supplied but isn't a known config.
The configuration 'partition.discovery.interval.ms' was supplied but isn't a 
known config.


Finally the job crashed with a checkpointing error:


java.lang.Exception: Could not perform checkpoint 10 for operator Source: 
progress source (4/6)#9.

Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not 
complete snapshot 10 for operator Source: progress source (4/6)#9. Failure 
reason: Checkpoint was declined.
...
Caused by: java.lang.IllegalArgumentException: Invalid negative offset



Switching back to using FlinkKafkaConsumer, the warnings on the kafka config 
disapeared, and the job was able to checkpoint successfully.

I'm wondering if the warnings and the errors are connected, and if there is a 
compatibility issue between the newer KafkaSourceBuilder and Kafka 2.5 ?

Thanks,
L