IllegalStateException: invalid BLOB
Hello, We're facing the bug reported in https://issues.apache.org/jira/browse/FLINK-32212 More specifically, when kubernetes decides to drain a node, a job manager restart (but not the task manager), the job fails with: java.lang.IllegalStateException: The library registration references a different set of library BLOBs than previous registrations for this job: old:[p-46b02bb8d9740e39d3fe3b3782b0bd335b35ad9f-eefd1e86ec70ea3aaf3e3dce568fe172] new:[p-46b02bb8d9740e39d3fe3b3782b0bd335b35ad9f-a91193b75fdb799d7b4b8de3f4984597] deleting the pod(s) running the TM(s) resolves the problem, but is not ideal for us. Any status on FLINK-32212 to solve this problem? Kind regards Lars
Checkpointing while loading causing issues
Hello, When restarting jobs (e.g. after upgrade) with "large" state a task can take some time to "initialize" (depending on the state size). During this time I noticed that Flink attempts to checkpoint. In many cases checkpointing will fail repeatedly, and cause the job to hit the tolerable-failed-checkpoints limit and restart. The only way to overcome the issue seems to be to increase the checkpoint interval, but this is suboptimal. Could Flink wait to trigger checkpointing when one or more task is initializing? Lars
There is no savepoint operation with triggerId
Hello, My job manager is constantly complaining with the following error: "Exception occurred in REST handler: There is no savepoint operation with triggerId= for job bc0cb60b710e64e23195aa1610ad790a". "logger_name":"org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers$SavepointStatusHandler" Checkpointing and savepointing seems to work as expected, but my logs are flooded of these errors. Any tips ? L
Stream enrichment with ingest mode
Dear all, A reoccurring challenge we have with stream enrichment in Flink is a robust mechanism to estimate that all messages of the source(s) have been consumed/processed before output is collected. A simple example is two sources of catalogue metadata: - source A delivers products, - source B delivers product categories, For a process function to enrich the categories with the number of products in each category, we would do a KeyedCoProcessFunction (or a RichCoFlatMap), keyed by category ID, and put both the category and products in state. Then count all products for each keyed state and collect the result. Typically, however, we don't want to start counting before all products are included in state (to avoid emitting incomplete aggregations downstream). Therefore we use the event lag time (i.e. processing time - current watermark) to indicate "ingest mode" of the processor (e.g. lag time > 30 seconds). When in "ingest mode" we will trigger a timer, and return without collecting. Finally, the timer fires when the watermark has advanced sufficiently. This strategy of "ingest mode" (and timers) seems to be more complicated when you have multiple process functions (with the same need of ingest mode) downstream of the first one processor. The reason seems to be that watermarks are passed from the first process function even though no elements are collected. Therefore, when elements finally arrive at the second process function, the current watermark has already advanced, so the same strategy of watermarks is less robust. I'm curious how others in the community handle this "challenge" of initial ingest. Any ideas are greatly appreciated. Note: we use a custom watermark generator that emits watermarks derived from event time, and advances the watermarks when the source is idle for a longer period (e.g. 30 seconds). Thanks ! L
Re: Metrics with labels
Registering the counter is fine, e.g. in `open()`: lazy val responseCounter: Counter = getRuntimeContext .getMetricGroup .addGroup("response_code") .counter("myResponseCounter") then, in def asyncInvoke(), I can still only do responseCounter.inc(), but what I want is responseCounter.withLabelValue(200).inc() I don't find the docs very useful. Here is a useful example of how I would expect labels to work: https://pkg.go.dev/github.com/prometheus/client_golang/prometheus#hdr-A_Basic_Example On Tue, Oct 17, 2023 at 8:32 PM Chesnay Schepler wrote: > > I think this is a general issue with the Flink metrics. > > Not quite. There are a few instance in Flink were code wasn't updated to > encode metadata as additional labels, and the RocksDB metrics may be one of > them. > Also for RocksDB, you could try setting > "state.backend.rocksdb.metrics.column-family-as-variable: true" to resolve > this particular problem. > > > If I define a custom metric, it is not supported to use labels > > You can do so via MetricGroup#addGroup(String key, String value). > See > https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/ops/metrics/#user-variables > > On 17/10/2023 14:31, Lars Skjærven wrote: > > Hello, > > We're experiencing difficulties in using Flink metrics in a generic way > since various properties are included in the name of the metric itself. > This makes it difficult to generate sensible (and general) dashboards (with > aggregations). > > One example is the metric for rocksdb estimated live data size ( > state.backend.rocksdb.metrics.estimate-live-data-size). the name appears > as : > flink_taskmanager_job_task_operator__state_rocksdb_estimate_live_data_size > . > > If, on the other hand, the state name was included as label, this would > facilitate aggregation across states, i.e.: > > flink_taskmanager_job_task_operator_state_rocksdb_estimate_live_data_size{state_descriptor="my_state_descriptor"} > > I think this is a general issue with the Flink metrics. If I define a > custom metric, it is not supported to use labels ( > https://prometheus.io/docs/practices/naming/#labels) in a dynamic way. > > Thanks ! > > Lars > > >
Metrics with labels
Hello, We're experiencing difficulties in using Flink metrics in a generic way since various properties are included in the name of the metric itself. This makes it difficult to generate sensible (and general) dashboards (with aggregations). One example is the metric for rocksdb estimated live data size ( state.backend.rocksdb.metrics.estimate-live-data-size). the name appears as : flink_taskmanager_job_task_operator__state_rocksdb_estimate_live_data_size . If, on the other hand, the state name was included as label, this would facilitate aggregation across states, i.e.: flink_taskmanager_job_task_operator_state_rocksdb_estimate_live_data_size{state_descriptor="my_state_descriptor"} I think this is a general issue with the Flink metrics. If I define a custom metric, it is not supported to use labels ( https://prometheus.io/docs/practices/naming/#labels) in a dynamic way. Thanks ! Lars
Kafka coordinator not available
Hello, I experienced CoordinatorNotAvailableException in my flink jobs after our kafka supplier (aiven) did a maintenance update of the cluster. This update is performed by starting up new kafka nodes, copying over data, and switching over internally. The flink jobs runs as expected, with the only issue that they are unsuccessful in committing group offsets. Restarting the job from checkpoint/savepoint resolves the issue, but I would rather not restart all jobs after every kafka maintenance update. Any ideas ? Kind regards, Lars Skjærven
Kubernetes Operator resource limits and requests
Hello, How can we define *limit* and *request* for the kubernetes pods as described here: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/#requests-and-limits Looks like we can only set one value for CPU and memory: https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/reference/#taskmanagerspec limits and request seems to be supported in the Spotify version of the operator: https://github.com/spotify/flink-on-k8s-operator/blob/master/docs/crd.md#taskmanagerspec Kind regards, Lars
Re: Could not restore keyed state backend for KeyedProcessOperator
Same error again today. Any tips ? I'm considering downgrading to Flink 1.14 ? On Wed, Dec 14, 2022 at 11:51 AM Lars Skjærven wrote: > As far as I understand we are not specifying anything on restore mode. so > I guess default (NO_CLAIM) is what we're using. > > We're using ververica platform to handle deploys, and things are a bit > obscure on what happens underneath. > > It happened again this morning: > > Caused by: java.io.FileNotFoundException: Item not found: > 'gs://bucketname/namespace/flink-jobs/namespaces/default/jobs/fbdde9e7-cf5a-44b4-a3d4-d3ed517432a0/checkpoints/fbdde9e7cf5a44b4a3d4d3ed517432a0/shared/ae551eda-a588-45be-ba08-32bfbc50e965'. > Note, it is possible that the live version is still available but the > requested generation is deleted. > > > On Tue, Dec 13, 2022 at 11:37 PM Martijn Visser > wrote: > >> Hi Lars, >> >> Have you used any of the new restore modes that were introduced with >> 1.15? https://flink.apache.org/2022/05/06/restore-modes.html >> >> Best regards, >> >> Martijn >> >> On Fri, Dec 9, 2022 at 2:52 PM Lars Skjærven wrote: >> >>> Lifecycle rulesNone >>> >>> On Fri, Dec 9, 2022 at 3:17 AM Hangxiang Yu wrote: >>> >>>> Hi, Lars. >>>> Could you check whether you have configured the lifecycle of google >>>> cloud storage[1] which is not recommended in the flink checkpoint usage? >>>> >>>> [1] https://cloud.google.com/storage/docs/lifecycle >>>> >>>> On Fri, Dec 9, 2022 at 2:02 AM Lars Skjærven wrote: >>>> >>>>> Hello, >>>>> We had an incident today with a job that could not restore after crash >>>>> (for unknown reason). Specifically, it fails due to a missing checkpoint >>>>> file. We've experienced this a total of three times with Flink 1.15.2, but >>>>> never with 1.14.x. Last time was during a node upgrade, but that was not >>>>> the case this time. >>>>> >>>>> I've not been able to reproduce this issue. I've checked that I can >>>>> kill the taskmanager and jobmanager (using kubectl delete pod), and the >>>>> job >>>>> restores as expected. >>>>> >>>>> The job is running with kubernetes high availability, rocksdb and >>>>> incremental checkpointing. >>>>> >>>>> Any tips are highly appreciated. >>>>> >>>>> Thanks, >>>>> Lars >>>>> >>>>> Caused by: org.apache.flink.util.FlinkException: Could not restore >>>>> keyed state backend for >>>>> KeyedProcessOperator_bf374b554824ef28e76619f4fa153430_(2/2) from any of >>>>> the >>>>> 1 provided restore options. >>>>> at >>>>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160) >>>>> at >>>>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:346) >>>>> at >>>>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:164) >>>>> ... 11 more >>>>> Caused by: org.apache.flink.runtime.state.BackendBuildingException: >>>>> Caught unexpected exception. >>>>> at >>>>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:395) >>>>> at >>>>> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:483) >>>>> at >>>>> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:97) >>>>> at >>>>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:329) >>>>> at >>>>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) >>>>> at >>>>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) >>>>> ... 13 more >>>>> Caused by: java.io.FileNotFoundException: Item not found: >>>>> 'gs://some-bucket-name/flink-jobs/namespaces/default/jobs/d60a6c94-ddbc-42a1-947e-90f62749835a/checkpoints/d60a6c94ddbc42a1947e90f62749835a/shared/3cb2bb55-b4b0-44e5-948a-5d38ec088253'. >>>>> Note, it is possible that the live version is still available but the >>>>> requested generation is deleted. >>>>> at >>>>> com.google.cloud.hadoop.gcsio.GoogleCloudStorageExceptions.createFileNotFoundException(GoogleCloudStorageExceptions.java:46) >>>>> >>>>> >>>> >>>> -- >>>> Best, >>>> Hangxiang. >>>> >>>
Re: Could not restore keyed state backend for KeyedProcessOperator
As far as I understand we are not specifying anything on restore mode. so I guess default (NO_CLAIM) is what we're using. We're using ververica platform to handle deploys, and things are a bit obscure on what happens underneath. It happened again this morning: Caused by: java.io.FileNotFoundException: Item not found: 'gs://bucketname/namespace/flink-jobs/namespaces/default/jobs/fbdde9e7-cf5a-44b4-a3d4-d3ed517432a0/checkpoints/fbdde9e7cf5a44b4a3d4d3ed517432a0/shared/ae551eda-a588-45be-ba08-32bfbc50e965'. Note, it is possible that the live version is still available but the requested generation is deleted. On Tue, Dec 13, 2022 at 11:37 PM Martijn Visser wrote: > Hi Lars, > > Have you used any of the new restore modes that were introduced with 1.15? > https://flink.apache.org/2022/05/06/restore-modes.html > > Best regards, > > Martijn > > On Fri, Dec 9, 2022 at 2:52 PM Lars Skjærven wrote: > >> Lifecycle rulesNone >> >> On Fri, Dec 9, 2022 at 3:17 AM Hangxiang Yu wrote: >> >>> Hi, Lars. >>> Could you check whether you have configured the lifecycle of google >>> cloud storage[1] which is not recommended in the flink checkpoint usage? >>> >>> [1] https://cloud.google.com/storage/docs/lifecycle >>> >>> On Fri, Dec 9, 2022 at 2:02 AM Lars Skjærven wrote: >>> >>>> Hello, >>>> We had an incident today with a job that could not restore after crash >>>> (for unknown reason). Specifically, it fails due to a missing checkpoint >>>> file. We've experienced this a total of three times with Flink 1.15.2, but >>>> never with 1.14.x. Last time was during a node upgrade, but that was not >>>> the case this time. >>>> >>>> I've not been able to reproduce this issue. I've checked that I can >>>> kill the taskmanager and jobmanager (using kubectl delete pod), and the job >>>> restores as expected. >>>> >>>> The job is running with kubernetes high availability, rocksdb and >>>> incremental checkpointing. >>>> >>>> Any tips are highly appreciated. >>>> >>>> Thanks, >>>> Lars >>>> >>>> Caused by: org.apache.flink.util.FlinkException: Could not restore >>>> keyed state backend for >>>> KeyedProcessOperator_bf374b554824ef28e76619f4fa153430_(2/2) from any of the >>>> 1 provided restore options. >>>> at >>>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160) >>>> at >>>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:346) >>>> at >>>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:164) >>>> ... 11 more >>>> Caused by: org.apache.flink.runtime.state.BackendBuildingException: >>>> Caught unexpected exception. >>>> at >>>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:395) >>>> at >>>> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:483) >>>> at >>>> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:97) >>>> at >>>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:329) >>>> at >>>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) >>>> at >>>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) >>>> ... 13 more >>>> Caused by: java.io.FileNotFoundException: Item not found: >>>> 'gs://some-bucket-name/flink-jobs/namespaces/default/jobs/d60a6c94-ddbc-42a1-947e-90f62749835a/checkpoints/d60a6c94ddbc42a1947e90f62749835a/shared/3cb2bb55-b4b0-44e5-948a-5d38ec088253'. >>>> Note, it is possible that the live version is still available but the >>>> requested generation is deleted. >>>> at >>>> com.google.cloud.hadoop.gcsio.GoogleCloudStorageExceptions.createFileNotFoundException(GoogleCloudStorageExceptions.java:46) >>>> >>>> >>> >>> -- >>> Best, >>> Hangxiang. >>> >>
Re: Could not restore keyed state backend for KeyedProcessOperator
Lifecycle rulesNone On Fri, Dec 9, 2022 at 3:17 AM Hangxiang Yu wrote: > Hi, Lars. > Could you check whether you have configured the lifecycle of google cloud > storage[1] which is not recommended in the flink checkpoint usage? > > [1] https://cloud.google.com/storage/docs/lifecycle > > On Fri, Dec 9, 2022 at 2:02 AM Lars Skjærven wrote: > >> Hello, >> We had an incident today with a job that could not restore after crash >> (for unknown reason). Specifically, it fails due to a missing checkpoint >> file. We've experienced this a total of three times with Flink 1.15.2, but >> never with 1.14.x. Last time was during a node upgrade, but that was not >> the case this time. >> >> I've not been able to reproduce this issue. I've checked that I can kill >> the taskmanager and jobmanager (using kubectl delete pod), and the job >> restores as expected. >> >> The job is running with kubernetes high availability, rocksdb and >> incremental checkpointing. >> >> Any tips are highly appreciated. >> >> Thanks, >> Lars >> >> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed >> state backend for >> KeyedProcessOperator_bf374b554824ef28e76619f4fa153430_(2/2) from any of the >> 1 provided restore options. >> at >> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160) >> at >> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:346) >> at >> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:164) >> ... 11 more >> Caused by: org.apache.flink.runtime.state.BackendBuildingException: >> Caught unexpected exception. >> at >> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:395) >> at >> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:483) >> at >> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:97) >> at >> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:329) >> at >> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) >> at >> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) >> ... 13 more >> Caused by: java.io.FileNotFoundException: Item not found: >> 'gs://some-bucket-name/flink-jobs/namespaces/default/jobs/d60a6c94-ddbc-42a1-947e-90f62749835a/checkpoints/d60a6c94ddbc42a1947e90f62749835a/shared/3cb2bb55-b4b0-44e5-948a-5d38ec088253'. >> Note, it is possible that the live version is still available but the >> requested generation is deleted. >> at >> com.google.cloud.hadoop.gcsio.GoogleCloudStorageExceptions.createFileNotFoundException(GoogleCloudStorageExceptions.java:46) >> >> > > -- > Best, > Hangxiang. >
Could not restore keyed state backend for KeyedProcessOperator
Hello, We had an incident today with a job that could not restore after crash (for unknown reason). Specifically, it fails due to a missing checkpoint file. We've experienced this a total of three times with Flink 1.15.2, but never with 1.14.x. Last time was during a node upgrade, but that was not the case this time. I've not been able to reproduce this issue. I've checked that I can kill the taskmanager and jobmanager (using kubectl delete pod), and the job restores as expected. The job is running with kubernetes high availability, rocksdb and incremental checkpointing. Any tips are highly appreciated. Thanks, Lars Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for KeyedProcessOperator_bf374b554824ef28e76619f4fa153430_(2/2) from any of the 1 provided restore options. at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:346) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:164) ... 11 more Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught unexpected exception. at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:395) at org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:483) at org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:97) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:329) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) ... 13 more Caused by: java.io.FileNotFoundException: Item not found: 'gs://some-bucket-name/flink-jobs/namespaces/default/jobs/d60a6c94-ddbc-42a1-947e-90f62749835a/checkpoints/d60a6c94ddbc42a1947e90f62749835a/shared/3cb2bb55-b4b0-44e5-948a-5d38ec088253'. Note, it is possible that the live version is still available but the requested generation is deleted. at com.google.cloud.hadoop.gcsio.GoogleCloudStorageExceptions.createFileNotFoundException(GoogleCloudStorageExceptions.java:46)
Switching kafka brokers
Hello, What is the recommended approach for migrating flink jobs to a new kafka server? I was naively hoping to use Kafka Mirror Maker to sync the old server with the new server, and simply continue from savepoint with updated URL's. Unfortunately, the kafka offsets are not identical for log compacted topics when using mirror maker. Any tips ? L
Re: Cassandra sink with Flink 1.15
Thanks ! For reference, solved with mapping to Flink tuples. On Wed, Sep 7, 2022 at 2:27 PM Chesnay Schepler wrote: > Are you running into this in the IDE, or when submitting the job to a > Flink cluster? > > If it is the first, then you're probably affected by the Scala-free Flink > efforts. Either add an explicit dependency on flink-streaming-scala or > migrate to Flink tuples. > > On 07/09/2022 14:17, Lars Skjærven wrote: > > Hello, > > When upgrading from 1.14 to 1.15 we bumped into a type issue when > attempting to sink to Cassandra (scala 2.12.13). This was working nicely in > 1.14. Any tip is highly appreciated. > > Using a MapFunction() to generate the stream of tuples: > > CassandraSink > .addSink( > mystream.map(new ToTupleMapper) > )... > > Exception: No support for the type of the given DataStream: > GenericType > > Or with a lambda function: > > CassandraSink > .addSink( > mystream.map((v: MyCaseClass => (v.key v.someLongValue)) > )... > > Caused by: org.apache.flink.api.common.functions.InvalidTypesException: > The generic type parameters of 'Tuple2' are missing. In many cases lambda > methods don't provide enough information for automatic type extraction when > Java generics are involved. An easy workaround is to use an (anonymous) > class instead that implements the > 'org.apache.flink.api.common.functions.MapFunction' interface. Otherwise > the type has to be specified explicitly using type information. > > > > > >
Cassandra sink with Flink 1.15
Hello, When upgrading from 1.14 to 1.15 we bumped into a type issue when attempting to sink to Cassandra (scala 2.12.13). This was working nicely in 1.14. Any tip is highly appreciated. Using a MapFunction() to generate the stream of tuples: CassandraSink .addSink( mystream.map(new ToTupleMapper) )... Exception: No support for the type of the given DataStream: GenericType Or with a lambda function: CassandraSink .addSink( mystream.map((v: MyCaseClass => (v.key v.someLongValue)) )... Caused by: org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Tuple2' are missing. In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved. An easy workaround is to use an (anonymous) class instead that implements the 'org.apache.flink.api.common.functions.MapFunction' interface. Otherwise the type has to be specified explicitly using type information.
Window function - flush on job stop
We're doing a stream.keyBy().window().aggregate() to aggregate customer feedback into sessions. Every now and then we have to update the job, e.g. change the key, so that we can't easlily continue from the previous state. Cancelling the job (without restarting from last savepoint) will result in loosing ongoing sessions. So we typically go back a few hours when we restart to minimize the loss. Is there any way of making the job flush it's content (sessions) on job cancellation? That will result in splitting ongoing sessions in two, which is perfectly fine for our purpose. Any thoughts ? Lars
WindowOperator TestHarness
Hello, We're trying to write a test for an implementation of *AggregateFunction* following a *EventTimeSessionWindows.withGap*. We gave it a try using *WindowOperator*() which we hoped could be used as an argument to *KeyedOneInputStreamOperatorTestHarness*. We're a bit stuck, and we're hoping someone has a tip or two. Specifically, we can't find the right *InternalWindowFunction* to pass to WindowOperator(). Below, *MyAggregator* is our implementation of the *AggregateFunction. * Does anyone have a template, or guide, to test a windowed aggregate function? Kind regards, Lars val myWindowOperator = new WindowOperator( EventTimeSessionWindows.withGap(Time.seconds(10)), new TimeWindow.Serializer(), new KeySelector[MyInputType, (String, String)] { override def getKey(value: MyInputType): (String, String) = { (value.a, value.b) } }, Types.TUPLE(Types.STRING).createSerializer( new ExecutionConfig ), new AggregatingStateDescriptor[MyInputType, MyAggregateState, MyOutputType]( "test", new MyAggregator, classOf[MyAggregateState], ), ???, EventTimeTrigger.create(), 0, null ) testHarness = new KeyedOneInputStreamOperatorTestHarness[(String, String), MyInputType, MyOutputType]( myWindowOperator, new KeySelector[MyInputType, (String, String)] { override def getKey(value: MyInputType): (String, String) = { (value.a, value.b) } }, createTuple2TypeInformation(Types.STRING, Types.STRING) )
Re: Scala Case Class Serialization
Thanks for quick response. Please find attached a minimal example illustrating the issue. I've added implicit TypeInformation, and checked that I'm importing the scala variant only. Matthias: Just my superficial impression from [1]. Will look into TypeInfoFactory. Thanks again! package com.mystuff import org.apache.flink.api.common.functions.RichFlatMapFunction import org.apache.flink.api.common.state.{MapState, MapStateDescriptor} import org.apache.flink.api.common.typeinfo.{TypeInformation} import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.util.Collector case class TestCaseClass(id: String, pos: Int) class MyRichFlatMap extends RichFlatMapFunction[TestCaseClass, String] { implicit val ttestclass: TypeInformation[TestCaseClass] = createTypeInformation[TestCaseClass] lazy val myState: MapState[String, TestCaseClass] = getRuntimeContext.getMapState( new MapStateDescriptor[String, TestCaseClass]("test-state", classOf[String], ttestclass.getTypeClass) ) override def flatMap(value: TestCaseClass, out: Collector[String]): Unit = { myState.put(value.id, value) myState.get(value.id) out.collect(value.id) } } object TestJob { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.createLocalEnvironment() env.getConfig.disableGenericTypes() val s = Seq[TestCaseClass]( TestCaseClass(id = "1", pos = 1), TestCaseClass(id = "2", pos = 2), TestCaseClass(id = "3", pos = 3), ) env .fromCollection[TestCaseClass](s) .keyBy(s => s.id) .flatMap(new MyRichFlatMap) .print() env.execute("Test Job") } } [1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#defining-type-information-using-a-factory On Tue, Dec 7, 2021 at 2:25 PM Roman Grebennikov wrote: > Hi Lars, > > can you please show a small reproducer of the way you construct the > DataStream, and which imports do you use? > > We also often experience similar performance issues with scala, but > usually they are related to accidental usage of Flink Java API. A couple of > hints from my experience: > 1. Make sure that you always use the scala DataStream, and not the java > one. > 2. All operations on scala datastream require an implicit > TypeInformation[T] parameter, which is usually generated automatically for > you if you do an "import org.apache.flink.api.scala._" by the > createTypeInformation[T] macro. So make sure you have this import present. > 3. You can do a "env.getConfig.disableGenericTypes" and flink will throw > an exception each time it have to fall back to generic kryo serialization. > Backtrace will highlight you an exact place in your code where it have to > do a kryo fallback. > > Also Flink will always revert to Kryo in case if you use sum types (or > ADTs, or "sealed traits"). Shameless plug: we made a library to support > that: https://github.com/findify/flink-adt > > Roman Grebennikov | g...@dfdx.me > > > On Tue, Dec 7, 2021, at 11:20, Matthias Pohl wrote: > > Hi Lars, > not sure about the out-of-the-box support for case classes with primitive > member types (could you refer to the section which made you conclude > this?). I haven't used Scala with Flink, yet. So maybe, others can give > more context. > But have you looked into using the TypeInfoFactory to define the schema > [1]? > > Best, > Matthias > > [1] > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#defining-type-information-using-a-factory > > On Tue, Dec 7, 2021 at 10:03 AM Lars Skjærven wrote: > > Hello, > We're running Flink 1.14 with scala, and we're suspecting that performance > is suffering due to serialization of some scala case classes. Specifically > we're seeing that our Case Class "cannot be used as a POJO type because not > all fields are valid POJO fields, and must be processed as GenericType", > and that the case class "does not contain a setter for field X". I'm > interpreting these log messages as performance warnings. > > A simple case class example we're writing to state that triggers the > mentioned 'warnings': > > case class Progress(position: Int, eventTime: Int, alive: Boolean) > > I'm understanding the docs that case classes with primitive types should > be supported "out of the box". > > Any tips on how to proceed ? > > Kind regards, > Lars > > > >
Scala Case Class Serialization
Hello, We're running Flink 1.14 with scala, and we're suspecting that performance is suffering due to serialization of some scala case classes. Specifically we're seeing that our Case Class "cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType", and that the case class "does not contain a setter for field X". I'm interpreting these log messages as performance warnings. A simple case class example we're writing to state that triggers the mentioned 'warnings': case class Progress(position: Int, eventTime: Int, alive: Boolean) I'm understanding the docs that case classes with primitive types should be supported "out of the box". Any tips on how to proceed ? Kind regards, Lars
KafkaSink.builder setDeliveryGuarantee is not a member
Hello, upgrading to 1.14 I bumped into an issue with the kafka sink builder when defining delivery guarantee: value setDeliveryGuarantee is not a member of org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchemaBuilder[] Seems to be working with the default value (i.e. without mentioning setDeliveryGuarantee), but compile error when including it. Is it better to leave it with the default, and let the application cluster config define this ? I believe I build the KafkaSink according to the docs: import org.apache.flink.connector.base.DeliveryGuarantee import org.apache.flink.connector.kafka.sink.{KafkaRecordSerializationSchema, KafkaSink} import org.apache.flink.connector.kafka.source.KafkaSource import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer val kafkaSink: KafkaSink[SomePBStuff] = KafkaSink.builder[SomePBStuff]() .setBootstrapServers("...") .setRecordSerializer( KafkaRecordSerializationSchema .builder[SomePBStuff]() .setTopic("mytopic") .setKeySerializationSchema((v: SomePBStuff) => v.key.getBytes(StandardCharsets.UTF_8)) .setValueSerializationSchema((v: SomePBStuff) => v.toByteArray) .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) .build() ) .build() in build.sbt I have: ThisBuild / scalaVersion := "2.12.13" val flinkVersion = "1.14.0" val flinkDependencies = Seq( "org.apache.flink" % "flink-runtime" % flinkVersion % Test, "org.apache.flink" %% "flink-scala" % flinkVersion % "provided", "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided", "org.apache.flink" %% "flink-streaming-java" % flinkVersion % "provided", "org.apache.flink" %% "flink-clients" % flinkVersion % "provided", "org.apache.flink" %% "flink-connector-kafka" % flinkVersion, "org.apache.flink" %% "flink-gelly-scala" % flinkVersion, "org.apache.flink" %% "flink-test-utils" % flinkVersion % Test, )
Building a flink connector
We're in need of a Google Bigtable flink connector. Do you have any tips on how this could be done, e.g. general guidelines on how to write a connector ? Thanks, Lars
Re: KafkaSource builder and checkpointing with parallelism > kafka partitions
Thanks for the feedback. > May I ask why you have less partitions than the parallelism? I would be happy to learn more about your use-case to better understand the > motivation. The use case is that topic A, contains just a few messages with product metadata that rarely gets updated, while topic B contains user interactions with the products (and many more messages). For topic A we thought that one partition will be sufficient to keep the metadata, while we have 32 partitions for topic B. Due to the load on topic B, we're use a parallelism of 2-8. Thanks, Lars On Thu, Sep 16, 2021 at 9:09 AM Fabian Paul wrote: > Hi all, > > The problem you are seeing Lars is somewhat intended behaviour, > unfortunately. With the batch/stream unification every Kafka partition is > treated > as kind of workload assignment. If one subtask receives a signal that > there is no workload anymore it goes into the FINISHED state. > As already pointed this restriction will lift in the near future. > > I went through the code and I think in your case you can configure the > following configuration [1] which should show an equal behaviour than the > old source. This will prevent the enumerator from sending a final signal > to the subtasks and they will not go into finished state. > > May I ask why you have less partitions than the parallelism? I would be > happy to learn more about your use-case to better understand the > motivation. > > Best, > Fabian > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#dynamic-partition-discovery
Re: KafkaSource builder and checkpointing with parallelism > kafka partitions
Got it. So the workaround for now (1.13.2) is to fall back to FlinkKafkaConsumer if I read you correctly. Thanks L On Wed, Sep 15, 2021 at 2:58 PM Matthias Pohl wrote: > Hi Lars, > I guess you are looking > for execution.checkpointing.checkpoints-after-tasks-finish.enabled [1]. > This configuration parameter is going to be introduced in the upcoming > Flink 1.14 release. > > Best, > Matthias > > [1] > https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#execution-checkpointing-checkpoints-after-tasks-finish-enabled > > On Wed, Sep 15, 2021 at 11:26 AM Lars Skjærven wrote: > >> Using KafkaSource builder with a job parallelism larger than the number >> of kafka partitions, the job is unable to checkpoint. >> >> With a job parallelism of 4, 3 of the tasks are marked as FINISHED for >> the kafka topic with one partition. For this reason checkpointing seems to >> be disabled. >> >> When using FlinkKafkaConsumer (instead of KafkaSource builder) we don't >> see this behavior, and all 4 tasks have status RUNNING. >> >> Is there any way of using KafkaSource builder ang get the same behavior >> as FlinkKafkaConsumer for the number of tasks being used ? >> >> Code with KafkaSource.builder: >> >> val metadataSource = KafkaSource.builder[Metadata]() >> .setBootstrapServers("kafka-server") >> .setGroupId("my-group") >> .setTopics("my-topic") >> .setDeserializer(new MetadataDeserializationSchema) >> .setStartingOffsets(OffsetsInitializer.earliest()) >> .build() >> >> Code with FlinkKafkaConsumer: >> val metadataSource = new FlinkKafkaConsumer[Metadata]( >> "my-topic", >> new MetadataDeserializationSchema, >> "my-server) >> .setStartFromEarliest() >> >> Thanks in advance, >> Lars >> >
KafkaSource builder and checkpointing with parallelism > kafka partitions
Using KafkaSource builder with a job parallelism larger than the number of kafka partitions, the job is unable to checkpoint. With a job parallelism of 4, 3 of the tasks are marked as FINISHED for the kafka topic with one partition. For this reason checkpointing seems to be disabled. When using FlinkKafkaConsumer (instead of KafkaSource builder) we don't see this behavior, and all 4 tasks have status RUNNING. Is there any way of using KafkaSource builder ang get the same behavior as FlinkKafkaConsumer for the number of tasks being used ? Code with KafkaSource.builder: val metadataSource = KafkaSource.builder[Metadata]() .setBootstrapServers("kafka-server") .setGroupId("my-group") .setTopics("my-topic") .setDeserializer(new MetadataDeserializationSchema) .setStartingOffsets(OffsetsInitializer.earliest()) .build() Code with FlinkKafkaConsumer: val metadataSource = new FlinkKafkaConsumer[Metadata]( "my-topic", new MetadataDeserializationSchema, "my-server) .setStartFromEarliest() Thanks in advance, Lars
Re: KafkaSourceBuilder causing invalid negative offset on checkpointing
Unfortunately, I only have the truncated stack trace available (from the flink UI). L 2021-04-27 16:32:02 java.lang.Exception: Could not perform checkpoint 10 for operator Source: progress source (4/6)#9. at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:924) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$8(StreamTask.java:885) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 10 for operator Source: progress source (4/6)#9. Failure reason: Checkpoint was declined. at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:241) at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:162) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371) at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:686) at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:607) at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:572) at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:298) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$9(StreamTask.java:1004) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:988) at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:912) ... 10 more Caused by: java.lang.IllegalArgumentException: Invalid negative offset at org.apache.kafka.clients.consumer.OffsetAndMetadata.(OffsetAndMetadata.java:50) at org.apache.kafka.clients.consumer.OffsetAndMetadata.(OffsetAndMetadata.java:69) at org.apache.flink.connector.kafka.source.reader.KafkaSourceReader.snapshotState(KafkaSourceReader.java:96) at org.apache.flink.streaming.api.operators.SourceOperator.snapshotState(SourceOperator.java:288) at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:205) ... 20 more From: Till Rohrmann Sent: Thursday, April 29, 2021 18:44 To: Lars Skjærven Cc: Becket Qin ; user@flink.apache.org Subject: Re: KafkaSourceBuilder causing invalid negative offset on checkpointing Thanks for the additional information Lars. Could you maybe also share the full stack traces of the errors you see when the checkpoint fails? @Becket Qin<mailto:becket@gmail.com> is it a known issue with the new Kafka sources trying to checkpoint negative offsets? Cheers, Till On Thu, Apr 29, 2021 at 1:06 PM Lars Skjærven mailto:lars.skjer...@tv2.no>> wrote: Thanks Till. Here is how we created the KafkaSource: val sensorSource = KafkaSource.builder[SensorInput]() .setBootstrapServers(myConfig.kafkaBrokers) .setGroupId(myConfig.kafkaGroupId) .setTopics(myConfig.kafkaTopicIn) .setDeserializer(new SensorInputPBDeserializationSchema) .setStartingOffsets(OffsetsInitializer.earliest()) .build() The stream was built with env.fromSource(sensorSource , WatermarkStrategy.forMonotonousTimestamps(), "sensor events") The SensorInputPBDeserializationSchema is a basic KafkaRecordDeserializer that does SensorInputPB.parseFrom(record.value()) and finally collector.collect(v) >From here on we're doing a keyed windowed aggregation with >.keyBy(...).window(EventTimeSessionWindows.withGap(Time.seconds(60))).aggregat
Re: KafkaSourceBuilder causing invalid negative offset on checkpointing
Thanks Till. Here is how we created the KafkaSource: val sensorSource = KafkaSource.builder[SensorInput]() .setBootstrapServers(myConfig.kafkaBrokers) .setGroupId(myConfig.kafkaGroupId) .setTopics(myConfig.kafkaTopicIn) .setDeserializer(new SensorInputPBDeserializationSchema) .setStartingOffsets(OffsetsInitializer.earliest()) .build() The stream was built with env.fromSource(sensorSource , WatermarkStrategy.forMonotonousTimestamps(), "sensor events") The SensorInputPBDeserializationSchema is a basic KafkaRecordDeserializer that does SensorInputPB.parseFrom(record.value()) and finally collector.collect(v) >From here on we're doing a keyed windowed aggregation with >.keyBy(...).window(EventTimeSessionWindows.withGap(Time.seconds(60))).aggregate(new > SensorEventAggregator) L From: Till Rohrmann Sent: Thursday, April 29, 2021 09:16 To: Lars Skjærven ; Becket Qin Cc: user@flink.apache.org Subject: Re: KafkaSourceBuilder causing invalid negative offset on checkpointing Hi Lars, The KafkaSourceBuilder constructs the new KafkaSource which has not been fully hardened in 1.12.2. In fact, it should not be documented yet. I think you are running into an instability/bug of. The new Kafka source should be hardened a lot more in the 1.13.0 release. Could you tell us exactly how you created the KafkaSource so that we can verify that this problem has been properly fixed in the 1.13.0 release? I am also pulling in Becket who is the original author of this connector. He might be able to tell you more. Cheers, Till On Wed, Apr 28, 2021 at 10:36 AM Lars Skjærven mailto:lars.skjer...@tv2.no>> wrote: Hello, I ran into some issues when using the new KafkaSourceBuilder (running Flink 1.12.2, scala 2.12.13, on ververica platform in a container with java 8). Initially it generated warnings on kafka configuration, but the job was able to consume and produce messages. The configuration 'client.id.prefix' was supplied but isn't a known config. The configuration 'partition.discovery.interval.ms<http://partition.discovery.interval.ms>' was supplied but isn't a known config. Finally the job crashed with a checkpointing error: java.lang.Exception: Could not perform checkpoint 10 for operator Source: progress source (4/6)#9. Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 10 for operator Source: progress source (4/6)#9. Failure reason: Checkpoint was declined. ... Caused by: java.lang.IllegalArgumentException: Invalid negative offset Switching back to using FlinkKafkaConsumer, the warnings on the kafka config disapeared, and the job was able to checkpoint successfully. I'm wondering if the warnings and the errors are connected, and if there is a compatibility issue between the newer KafkaSourceBuilder and Kafka 2.5 ? Thanks, L
KafkaSourceBuilder causing invalid negative offset on checkpointing
Hello, I ran into an issue when using the new KafkaSourceBuilder (running Flink 1.12.2, scala 2.12.13, on ververica platform in a container with java 8). Initially it generated warnings on kafka configuration, but the job was able to consume and produce messages. The configuration 'client.id.prefix' was supplied but isn't a known config. The configuration 'partition.discovery.interval.ms' was supplied but isn't a known config. Finally the job crashed with a checkpointing error: java.lang.Exception: Could not perform checkpoint 10 for operator Source: progress source (4/6)#9. Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 10 for operator Source: progress source (4/6)#9. Failure reason: Checkpoint was declined. ... Caused by: java.lang.IllegalArgumentException: Invalid negative offset Switching back to using FlinkKafkaConsumer, the warnings on the kafka config disappeared, and the job was able to checkpoint successfully. I'm wondering if the warnings and the errors are related, and if there is a compatibility issue between the newer KafkaSourceBuilder and Kafka 2.5 ? Thanks, L
KafkaSourceBuilder causing invalid negative offset on checkpointing
Hello, I ran into some issues when using the new KafkaSourceBuilder (running Flink 1.12.2, scala 2.12.13, on ververica platform in a container with java 8). Initially it generated warnings on kafka configuration, but the job was able to consume and produce messages. The configuration 'client.id.prefix' was supplied but isn't a known config. The configuration 'partition.discovery.interval.ms' was supplied but isn't a known config. Finally the job crashed with a checkpointing error: java.lang.Exception: Could not perform checkpoint 10 for operator Source: progress source (4/6)#9. Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 10 for operator Source: progress source (4/6)#9. Failure reason: Checkpoint was declined. ... Caused by: java.lang.IllegalArgumentException: Invalid negative offset Switching back to using FlinkKafkaConsumer, the warnings on the kafka config disapeared, and the job was able to checkpoint successfully. I'm wondering if the warnings and the errors are connected, and if there is a compatibility issue between the newer KafkaSourceBuilder and Kafka 2.5 ? Thanks, L