Re: [DISCUSS] Status of Statefun Project

2023-08-23 Thread Filip Karnicki
Hi Gordon

Any chance we could get this reviewed and released to the central repo?
We're currently forced to use a Flink version that has a nasty bug causing
an operational nightmare

Many thanks
Fil

On Sat, 19 Aug 2023 at 01:38, Galen Warren via user 
wrote:

> Gotcha, makes sense as to the original division.
>
> >> Can this be solved by simply passing in the path to the artifacts
>
> This definitely works if we're going to be copying the artifacts on the
> host side -- into the build context -- and then from the context into the
> image. It only gets tricky to have a potentially varying path to the
> artifacts if we're trying to *directly *include the artifacts in the
> Docker context -- then we have a situation where the Docker context must
> contain both the artifacts and playground files, with (potentially)
> different root locations.
>
> Maybe the simplest thing to do here is just to leave the playground as-is
> and then copy the artifacts into the Docker context manually, prior to
> building the playground images. I'm fine with that. It will mean that each
> Statefun release will require two PRs and two sets of build/publish steps
> instead of one, but if everyone else is fine with that I am, too. Unless
> anyone objects, I'll go ahead and queue up a PR for the playground that
> makes these changes.
>
> Also, I should mention -- in case it's not clear -- that I have already
> built and run the playground examples with the code from the PR and
> everything worked. So that PR is ready to move forward with review, etc.,
> at this point.
>
> Thanks.
>
>
>
>
>
>
>
> On Fri, Aug 18, 2023 at 4:16 PM Tzu-Li (Gordon) Tai 
> wrote:
>
>> Hi Galen,
>>
>> The original intent of having a separate repo for the playground repo,
>> was that StateFun users can just go to that and start running simple
>> examples without any other distractions from the core code. I personally
>> don't have a strong preference here and can understand how it would make
>> the workflow more streamlined, but just FYI on the reasoning why are
>> separate in the first place.
>>
>> re: paths for locating StateFun artifacts.
>> Can this be solved by simply passing in the path to the artifacts? As
>> well as the image tag for the locally build base StateFun image. They could
>> probably be environment variables.
>>
>> Cheers,
>> Gordon
>>
>> On Fri, Aug 18, 2023 at 12:13 PM Galen Warren via user <
>> user@flink.apache.org> wrote:
>>
>>> Yes, exactly! And in addition to the base Statefun jars and the jar for
>>> the Java SDK, it does an equivalent copy/register operation for each of the
>>> other SDK libraries (Go, Python, Javascript) so that those libraries are
>>> also available when building the playground examples.
>>>
>>> One more question: In order to copy the various build artifacts into the
>>> Docker containers, those artifacts need to be part of the Docker context.
>>> With the playground being a separate project, that's slightly tricky to do,
>>> as there is no guarantee (other than convention) about the relative paths
>>> of *flink-statefun* and* flink-statefun-playground *in someone's local
>>> filesystem. The way I've set this up locally is to copy the playground into
>>> the* flink-statefun* project -- i.e. to *flink-statefun*/playground --
>>> such that I can set the Docker context to the root folder of
>>> *flink-statefun* and then have access to any local code and/or build
>>> artifacts.
>>>
>>> I'm wondering if there might be any appetite for making that move
>>> permanent, i.e. moving the playground to *flink-statefun*/playground
>>> and deprecating the standalone playground project. In addition to making
>>> the problem of building with unreleased artifacts a bit simpler to solve,
>>> it would also simplify the process of releasing a new Statefun version,
>>> since the entire process could be handled with a single PR and associated
>>> build/deploy tasks. In other words, a single PR could both update and
>>> deploy the Statefun package and the playground code and images.
>>>
>>> As it stands, at least two PRs would be required for each Statefun
>>> version update -- one for *flink-statefun* and one for
>>> *flink-statefun-playground*.
>>>
>>> Anyway, just an idea. Maybe there's an important reason for these
>>> projects to remain separate. If we do want to keep the playground project
>>> where it is, I could solve the copying problem by requiring the two
>>> projects to be siblings in the file system and by pre-copying the local
>>> build artifacts into a location accessible by the existing Docker contexts.
>>> This would still leave us with the need to have two PRs and releases
>>> instead of one, though.
>>>
>>> Thanks for your help!
>>>
>>>
>>> On Fri, Aug 18, 2023 at 2:45 PM Tzu-Li (Gordon) Tai 
>>> wrote:
>>>
 Hi Galen,

 > locally built code is copied into the build containers
 so that it can be accessed during the build.

 That's exactly what we had been doing for release testing, yes. Sorry 

Re: Flink restored from an initially-specified checkpoint

2023-08-04 Thread Filip Karnicki
update: Our current line of thinking is that need this to be set in order
to the checkpoint to live across job manager failures

   -

   ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: Delete the
   checkpoint when the job is cancelled. The checkpoint state will only be
   available if the job fails.


On Fri, 4 Aug 2023 at 05:03, Filip Karnicki 
wrote:

> Hi, we recently went live with a job on a shared cluster, which is managed
> with Yarn
>
> The job was started using
>
> flink run -s hdfs://PATH_TO_A_CHECKPOINT_FROM_A_PREVIOUS_RUN_HERE
>
>
> Everything worked fine for a few days, but then the job needed to be
> restored for whatever reason
>
> 2023-08-03 16:34:44,525 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
> Terminating cluster entrypoint process YarnJobClusterEntrypoint with exit
> code 2.
>
> org.apache.flink.util.FlinkException: Cannot deregister application.
> Resource manager service is not available.
>
>
> It seems that while restoring (yarn 'attempt' 02), Flink used the original
> checkpoint we provided as the value of the -s parameter, and not the most
> recent checkpoint for that job. This caused a few days worth of data to be
> reprocessed.
>
>
> 2023-08-03 16:34:55,259 INFO
> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils []
> - Recovering checkpoints from
> ZooKeeperStateHandleStore{namespace='flink/application_/jobs/JOBID-WW/checkpoints'}.
>
> 2023-08-03 16:34:55,262 INFO
> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils []
> - Found 0 checkpoints in
> ZooKeeperStateHandleStore{namespace='flink/application_/jobs/JOBID-WW/checkpoints'}.
>
> 2023-08-03 16:34:55,262 INFO
> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils []
> - Trying to fetch 0 checkpoints from storage.
>
> 2023-08-03 16:34:55,262 INFO
> org.apache.flink.runtime.util.ZooKeeperUtils [] -
> Initialized DefaultCompletedCheckpointStore in
> 'ZooKeeperStateHandleStore{namespace='flink/application_/jobs/JOBID-WW/checkpoints'}'
> with /checkpoints.
>
> 2023-08-03 16:34:55,293 INFO
> org.apache.flink.runtime.jobmaster.JobMaster [] - Running
> initialization on master for  (JOBID-WW).
>
> 2023-08-03 16:34:55,293 INFO
> org.apache.flink.runtime.jobmaster.JobMaster [] -
> Successfully ran initialization on master in 0 ms.
>
> 2023-08-03 16:34:55,313 INFO
> org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology [] -
> Built 1 pipelined regions in 0 ms
>
> 2023-08-03 16:34:55,347 INFO
> org.apache.flink.yarn.YarnResourceManagerDriver  [] - Recovered
> 0 containers from previous attempts ([]).
>
> 2023-08-03 16:34:55,347 INFO
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
> Recovered 0 workers from previous attempt.
>
> 2023-08-03 16:34:55,369 INFO
> org.apache.flink.runtime.jobmaster.JobMaster [] - Using
> job/cluster config to configure application-defined state backend:
> EmbeddedRocksDBStateBackend{, localRocksDbDirectories=null,
> enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=8,
> writeBatchSize=2097152}
>
> 2023-08-03 16:34:55,369 INFO
> org.apache.hadoop.conf.Configuration [] -
> resource-types.xml not found
>
> 2023-08-03 16:34:55,370 INFO
> org.apache.hadoop.yarn.util.resource.ResourceUtils   [] - Unable to
> find 'resource-types.xml'.
>
> 2023-08-03 16:34:55,371 INFO
> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] -
> Using predefined options: DEFAULT.
>
> 2023-08-03 16:34:55,371 INFO
> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] -
> Using application-defined options factory:
> DefaultConfigurableOptionsFactory{configuredOptions={state.backend.rocksdb.thread.num=16}}.
>
> 2023-08-03 16:34:55,371 INFO
> org.apache.flink.runtime.jobmaster.JobMaster [] - Using
> application-defined state backend: EmbeddedRocksDBStateBackend{,
> localRocksDbDirectories=null, enableIncrementalCheckpointing=TRUE,
> numberOfTransferThreads=8, writeBatchSize=2097152}
>
> 2023-08-03 16:34:55,371 INFO
> org.apache.flink.runtime.state.StateBackendLoader[] - State
> backend loader loads the state backend as EmbeddedRocksDBStateBackend
>
> 2023-08-03 16:34:55,375 INFO
> org.apache.flink.runtime.jobmaster.JobMaster [] -
> Checkpoint storage is set to 'filesystem': (checkpoints
> hdfs:/apps//flink/checkpoints/yy-job)
>
> 2023-08-03 16:34:55,377 INFO
> org.apache.flink.runt

Flink restored from an initially-specified checkpoint

2023-08-03 Thread Filip Karnicki
Hi, we recently went live with a job on a shared cluster, which is managed
with Yarn

The job was started using

flink run -s hdfs://PATH_TO_A_CHECKPOINT_FROM_A_PREVIOUS_RUN_HERE


Everything worked fine for a few days, but then the job needed to be
restored for whatever reason

2023-08-03 16:34:44,525 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
Terminating cluster entrypoint process YarnJobClusterEntrypoint with exit
code 2.

org.apache.flink.util.FlinkException: Cannot deregister application.
Resource manager service is not available.


It seems that while restoring (yarn 'attempt' 02), Flink used the original
checkpoint we provided as the value of the -s parameter, and not the most
recent checkpoint for that job. This caused a few days worth of data to be
reprocessed.


2023-08-03 16:34:55,259 INFO
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils []
- Recovering checkpoints from
ZooKeeperStateHandleStore{namespace='flink/application_/jobs/JOBID-WW/checkpoints'}.

2023-08-03 16:34:55,262 INFO
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils []
- Found 0 checkpoints in
ZooKeeperStateHandleStore{namespace='flink/application_/jobs/JOBID-WW/checkpoints'}.

2023-08-03 16:34:55,262 INFO
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils []
- Trying to fetch 0 checkpoints from storage.

2023-08-03 16:34:55,262 INFO
org.apache.flink.runtime.util.ZooKeeperUtils [] -
Initialized DefaultCompletedCheckpointStore in
'ZooKeeperStateHandleStore{namespace='flink/application_/jobs/JOBID-WW/checkpoints'}'
with /checkpoints.

2023-08-03 16:34:55,293 INFO
org.apache.flink.runtime.jobmaster.JobMaster [] - Running
initialization on master for  (JOBID-WW).

2023-08-03 16:34:55,293 INFO
org.apache.flink.runtime.jobmaster.JobMaster [] -
Successfully ran initialization on master in 0 ms.

2023-08-03 16:34:55,313 INFO
org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology [] -
Built 1 pipelined regions in 0 ms

2023-08-03 16:34:55,347 INFO
org.apache.flink.yarn.YarnResourceManagerDriver  [] - Recovered
0 containers from previous attempts ([]).

2023-08-03 16:34:55,347 INFO
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
Recovered 0 workers from previous attempt.

2023-08-03 16:34:55,369 INFO
org.apache.flink.runtime.jobmaster.JobMaster [] - Using
job/cluster config to configure application-defined state backend:
EmbeddedRocksDBStateBackend{, localRocksDbDirectories=null,
enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=8,
writeBatchSize=2097152}

2023-08-03 16:34:55,369 INFO
org.apache.hadoop.conf.Configuration [] -
resource-types.xml not found

2023-08-03 16:34:55,370 INFO
org.apache.hadoop.yarn.util.resource.ResourceUtils   [] - Unable to
find 'resource-types.xml'.

2023-08-03 16:34:55,371 INFO
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] -
Using predefined options: DEFAULT.

2023-08-03 16:34:55,371 INFO
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] -
Using application-defined options factory:
DefaultConfigurableOptionsFactory{configuredOptions={state.backend.rocksdb.thread.num=16}}.

2023-08-03 16:34:55,371 INFO
org.apache.flink.runtime.jobmaster.JobMaster [] - Using
application-defined state backend: EmbeddedRocksDBStateBackend{,
localRocksDbDirectories=null, enableIncrementalCheckpointing=TRUE,
numberOfTransferThreads=8, writeBatchSize=2097152}

2023-08-03 16:34:55,371 INFO
org.apache.flink.runtime.state.StateBackendLoader[] - State
backend loader loads the state backend as EmbeddedRocksDBStateBackend

2023-08-03 16:34:55,375 INFO
org.apache.flink.runtime.jobmaster.JobMaster [] -
Checkpoint storage is set to 'filesystem': (checkpoints
hdfs:/apps//flink/checkpoints/yy-job)

2023-08-03 16:34:55,377 INFO
org.apache.flink.runtime.externalresource.ExternalResourceUtils [] -
Enabled external resources: []

2023-08-03 16:34:55,390 INFO
org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl [] - Upper
bound of the thread pool size is 500

2023-08-03 16:34:55,407 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - No
checkpoint found during restore.

2023-08-03 16:34:55,408 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Starting
job JOBID-WW from savepoint [PATH TO THE 3-DAY-OLD-CHECKPOINT WE
USED TO LAUNCH WITH]



I see some binary-looking HA files in HDFS that seem to have references to
the correct, latest checkpoint rather than the initial one.

Does anyone have an idea as to what could be causing the recovery to use
the initial checkpoint?

Many thanks
Fil


Re: [DISCUSS] Status of Statefun Project

2023-07-21 Thread Filip Karnicki
This is great news, as we're using statefun as well.

Please don't hesitate to let me know if you need me to do some additional
testing on a real life prod-like setup.

On Sat, 24 Jun 2023 at 18:41, Galen Warren via user 
wrote:

> Great -- thanks!
>
> I'm going to be out of town for about a week but I'll take a look at this
> when I'm back.
>
> On Tue, Jun 20, 2023 at 8:46 AM Martijn Visser 
> wrote:
>
>> Hi Galen,
>>
>> Yes, I'll be more than happy to help with Statefun releases.
>>
>> Best regards,
>>
>> Martijn
>>
>> On Tue, Jun 20, 2023 at 2:21 PM Galen Warren 
>> wrote:
>>
>>> Thanks.
>>>
>>> Martijn, to answer your question, I'd need to do a small amount of work
>>> to get a PR ready, but not much. Happy to do it if we're deciding to
>>> restart Statefun releases -- are we?
>>>
>>> -- Galen
>>>
>>> On Sat, Jun 17, 2023 at 9:47 AM Tzu-Li (Gordon) Tai 
>>> wrote:
>>>
 > Perhaps he could weigh in on whether the combination of automated
 tests plus those smoke tests should be sufficient for testing with new
 Flink versions

 What we usually did at the bare minimum for new StateFun releases was
 the following:

1. Build tests (including the smoke tests in the e2e module, which
covers important tests like exactly-once verification)
2. Updating the flink-statefun-playground repo and manually running
all language examples there.

 If upgrading Flink versions was the only change in the release, I'd
 probably say that this is sufficient.

 Best,
 Gordon

 On Thu, Jun 15, 2023 at 5:25 AM Martijn Visser <
 martijnvis...@apache.org> wrote:

> Let me know if you have a PR for a Flink update :)
>
> On Thu, Jun 8, 2023 at 5:52 PM Galen Warren via user <
> user@flink.apache.org> wrote:
>
>> Thanks Martijn.
>>
>> Personally, I'm already using a local fork of Statefun that is
>> compatible with Flink 1.16.x, so I wouldn't have any need for a released
>> version compatible with 1.15.x. I'd be happy to do the PRs to modify
>> Statefun to work with new versions of Flink as they come along.
>>
>> As for testing, Statefun does have unit tests and Gordon also sent me
>> instructions a while back for how to do some additional smoke tests which
>> are pretty straightforward. Perhaps he could weigh in on whether the
>> combination of automated tests plus those smoke tests should be 
>> sufficient
>> for testing with new Flink versions (I believe the answer is yes).
>>
>> -- Galen
>>
>>
>>
>> On Thu, Jun 8, 2023 at 8:01 AM Martijn Visser <
>> martijnvis...@apache.org> wrote:
>>
>>> Hi all,
>>>
>>> Apologies for the late reply.
>>>
>>> I'm willing to help out with merging requests in Statefun to keep
>>> them
>>> compatible with new Flink releases and create new releases. I do
>>> think that
>>> validation of the functionality of these releases depends a lot on
>>> those
>>> who do these compatibility updates, with PMC members helping out
>>> with the
>>> formal process.
>>>
>>> > Why can't the Apache Software Foundation allow community members
>>> to bring
>>> it up to date?
>>>
>>> There's nothing preventing anyone from reviewing any of the current
>>> PRs or
>>> opening new ones. However, none of them are approved [1], so there's
>>> also
>>> nothing to merge.
>>>
>>> > I believe that there are people and companies on this mailing list
>>> interested in supporting Apache Flink Stateful Functions.
>>>
>>> If so, then now is the time to show.
>>>
>>> Would there be a preference to create a release with Galen's merged
>>> compatibility update to Flink 1.15.2, or do we want to skip that and
>>> go
>>> straight to a newer version?
>>>
>>> Best regards,
>>>
>>> Martijn
>>>
>>> [1]
>>>
>>> https://github.com/apache/flink-statefun/pulls?q=is%3Apr+is%3Aopen+review%3Aapproved
>>>
>>> On Tue, Jun 6, 2023 at 3:55 PM Marco Villalobos <
>>> mvillalo...@kineteque.com>
>>> wrote:
>>>
>>> > Why can't the Apache Software Foundation allow community members
>>> to bring
>>> > it up to date?
>>> >
>>> > What's the process for that?
>>> >
>>> > I believe that there are people and companies on this mailing list
>>> > interested in supporting Apache Flink Stateful Functions.
>>> >
>>> > You already had two people on this thread express interest.
>>> >
>>> > At the very least, we could keep the library versions up to date.
>>> >
>>> > There are only a small list of new features that might be
>>> worthwhile:
>>> >
>>> > 1. event time processing
>>> > 2. state rest api
>>> >
>>> >
>>> > On Jun 6, 2023, at 3:06 AM, Chesnay Schepler 
>>> wrote:
>>> >
>>> > If you were to fork it 

Re: question about Async IO

2022-11-02 Thread Filip Karnicki
Hi Galen

I was thinking about the same thing recently and reached a point where I
see that async io does not have access to the keyed state because:

"* State related apis in
[[org.apache.flink.api.common.functions.RuntimeContext]] are not supported
 * yet because the key may get changed while accessing states in the
working thread."

I don't think that the key can change at any random time here, because of

"A common confusion that we want to explicitly point out here is that the
AsyncFunction is not called in a multi-threaded fashion. There exists only
one instance of the AsyncFunction and it is called sequentially for each
record in the respective partition of the stream"
From:
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/asyncio/

So if the RichAsyncFunctionRuntimeContext had access to a
KeyedStateBackend and since it's basically a facade on top of
RuntimeContext. we could (maybe) change the method signature for something
like getState to include the key, and run
keyedStateBackend.setCurrentKey(key) before continuing with anything else.


Anyone - please stop me if I'm talking nonsense


On Fri, 14 Oct 2022 at 21:36, Krzysztof Chmielewski <
krzysiek.chmielew...@gmail.com> wrote:

> Hi Galen,
> i will tell from my experience as a Flink user and developer of Flink jobs.
>
>
>
> *"if the input to an AsyncFunction is a keyed stream, can I assume that
> all input elements with the same key will be handled by the same instance
> of the async operator"*
> From what I know (and someone can correct me if I'm wrong) this is
> possible. However you have to make sure that there is no Re-balance or
> re-shuffle between those operators. For example operators after first
> .keyBy(..) call must have same parallelism level.
>
> Regarding:
> " I have a situation where I would like to enforce that async operations
> associated with a particular key happen sequentially,"
>
> This is also possible as far as I know. In fact I was implementing
> streaming pipeline with similar requirements like
> *"maintaining order of events withing keyBy group across multiple
> operators including Async operators". *
> We achieved that with same thing -> making sure that all operators in
> entire pipeline except Source and Sink had exact same parallelism level.
> Additional thing to remember here is that if you call .keyBy(...) again
> but with different key extractor, then original order might not be
> preserved since keyBy will execute re-shuffle/re-balance.
>
> We were also using reinterpretAsKeyedStream feature [1] after async
> operators to avoid calling ".keyBay" multiple times in pipeline. Calling
> .keyBy always has negative impact on performance.
> With reinterpretAsKeyedStream we were able to use keyed operators with
> access to keyed state after Async operators.
>
> Hope that helped.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/experimental/
>
> Regards,
> Krzysztof Chmielewski
>
>
>
>
>
>
>
> pt., 14 paź 2022 o 19:11 Galen Warren 
> napisał(a):
>
>> I have a question about Flink's Async IO support: Async I/O | Apache
>> Flink
>> 
>> .
>>
>> I understand that access to state is not supported in an AsyncFunction.
>> However, if the input to an AsyncFunction is a keyed stream, can I assume
>> that all input elements with the same key will be handled by the same
>> instance of the async operator, as would normally be the case with keyed
>> streams/operators?
>>
>> I'm asking because I have a situation where I would like to enforce that
>> async operations associated with a particular key happen sequentially, i.e.
>> if two elements come through with the same key, I need  the async operation
>> for the second to happen after the async operation for the first one
>> completes. I think I can achieve this using a local map of "in flight"
>> async operations in the operator itself, but only if I can rely on all
>> input elements with the same key being processed by the same async operator.
>>
>> If anyone can confirm how this works, I'd appreciate it. Thanks.
>>
>


Re: State Processor API - VoidNamespaceSerializer must be compatible with the old namespace serializer LongSerializer

2022-10-27 Thread Filip Karnicki
Hi Gordon

I would like to carry over *all* of the internal states for the
FunctionGroupOperator, only changing my own state. I was under the
impression that the only way to do that is to call

mySvepoint.removeOperator("operator-uid").withOperator("operator-uid",
transformation)

(where transformation is):
val transformation =
OperatorTransformation.bootstrapWith(myKeyedState).keyBy((ks: KeyedState)
=> ks.key).transform(new KeyedStateBootstrapFunction[String,
MyKeyedStateClass] (... // here use the values of MyKeyedStateClass to
re-create the internal statefun state)

meaning that I'd lose all the internal state in "operator-uid" unless I
read it in first, and populated it back in the `transformation`. Am I
thinking about this all wrong?

Many thanks
Fil

(Thias, I'm assuming your answer around HybridSource is to another thread,
so I'm removing it from this reply)

On Wed, 26 Oct 2022 at 19:01, Tzu-Li (Gordon) Tai 
wrote:

> Hi Filip,
>
> I think what you are seeing is expected. The State Processor API was
> intended to allow access only to commonly used user-facing state
> structures, while Stateful Functions uses quite a bit of Flink internal
> features, including for its state maintenance.
> The list state in question in StateFun's FunctionGroupOperator is an
> internal kind of state normally used in the context of Flink window states
> that are namespaced. Normal user-facing list states are not namespaced.
>
> Just curious, which specific state in FunctionGroupOperator are you trying
> to transform? I assume all other internal state in FunctionGroupOperator
> you want to remain untouched, and only wish to carry them over to be
> included in the transformed savepoint?
>
> Thanks,
> Gordon
>
>
> On Wed, Oct 26, 2022 at 3:50 AM Filip Karnicki 
> wrote:
>
>> Hi Thias
>>
>> Thank you for your reply. I can re-create a simplified use case at home
>> and stick it on github if you think it will help.
>>
>> What I'm trying to access is pretty internal to Flink Stateful Functions.
>> It seems that a custom operator (
>> https://github.com/apache/flink-statefun/blob/09a5cba521e9f994896c746ec9f8cc6479403612/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.java#L188)
>> is accessing a KeyedStateBackend and creating an InternalListState, which
>> I'm not sure I'll be able to get my hands on using the State Processor API.
>>
>> The only reason why I need to get my hands on all the states from this
>> Stateful Functions operator is because later I (think I) have to use
>> .removeOperator(uid) on a savepoint and replace it .withOperator(uid,
>> myTransformation) in order to transform my own, non-stateful-functions
>> keyed state which also belongs to this operator.
>>
>> Kind regards
>> Fil
>>
>> On Tue, 25 Oct 2022 at 16:24, Schwalbe Matthias <
>> matthias.schwa...@viseca.ch> wrote:
>>
>>> Hi Filip,
>>>
>>>
>>>
>>> It looks like, your state primitive is used in the context of Windows:
>>>
>>> Keyed state works like this:
>>>
>>>- It uses a cascade of key types to store and retrieve values:
>>>   - The key (set by .keyBy)
>>>   - A namespace (usually a VoidNamespace), unless it is used in
>>>   context of a specific window
>>>   - An optional key of the state primitive (if it is a MapState)
>>>
>>>
>>>
>>> In your case the state primitive is (probably) declared in the context
>>> of a window and hence when loading the state by means of StateProcessorAPI
>>> you also need to specify the correct Namespace TypeInformation.
>>>
>>> If I am in doubt, how a state primitive is set up, I let the debugger
>>> stop in a process function and walk up the call stack to find the proper
>>> components implementing it.
>>>
>>>
>>>
>>> If you share a little more of your code it is much easier to provide
>>> specific guidance 
>>>
>>> (e.g. ‘savepoint’ is never used again in your code snippet …)
>>>
>>>
>>>
>>> Sincere greeting
>>>
>>>
>>>
>>> Thias
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> *From:* Filip Karnicki 
>>> *Sent:* Tuesday, October 25, 2022 10:08 AM
>>> *To:* user 
>>> *Subject:* State Processor API - VoidNamespaceSerializer must be
>>> compatible with the old namespace serializer LongSerializer
>>>
>>>
>>>
>>> Hi, I'm trying

Re: State Processor API - VoidNamespaceSerializer must be compatible with the old namespace serializer LongSerializer

2022-10-26 Thread Filip Karnicki
Hi Thias

Thank you for your reply. I can re-create a simplified use case at home and
stick it on github if you think it will help.

What I'm trying to access is pretty internal to Flink Stateful Functions.
It seems that a custom operator (
https://github.com/apache/flink-statefun/blob/09a5cba521e9f994896c746ec9f8cc6479403612/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.java#L188)
is accessing a KeyedStateBackend and creating an InternalListState, which
I'm not sure I'll be able to get my hands on using the State Processor API.

The only reason why I need to get my hands on all the states from this
Stateful Functions operator is because later I (think I) have to use
.removeOperator(uid) on a savepoint and replace it .withOperator(uid,
myTransformation) in order to transform my own, non-stateful-functions
keyed state which also belongs to this operator.

Kind regards
Fil

On Tue, 25 Oct 2022 at 16:24, Schwalbe Matthias 
wrote:

> Hi Filip,
>
>
>
> It looks like, your state primitive is used in the context of Windows:
>
> Keyed state works like this:
>
>- It uses a cascade of key types to store and retrieve values:
>   - The key (set by .keyBy)
>   - A namespace (usually a VoidNamespace), unless it is used in
>   context of a specific window
>   - An optional key of the state primitive (if it is a MapState)
>
>
>
> In your case the state primitive is (probably) declared in the context of
> a window and hence when loading the state by means of StateProcessorAPI you
> also need to specify the correct Namespace TypeInformation.
>
> If I am in doubt, how a state primitive is set up, I let the debugger stop
> in a process function and walk up the call stack to find the proper
> components implementing it.
>
>
>
> If you share a little more of your code it is much easier to provide
> specific guidance 
>
> (e.g. ‘savepoint’ is never used again in your code snippet …)
>
>
>
> Sincere greeting
>
>
>
> Thias
>
>
>
>
>
>
>
> *From:* Filip Karnicki 
> *Sent:* Tuesday, October 25, 2022 10:08 AM
> *To:* user 
> *Subject:* State Processor API - VoidNamespaceSerializer must be
> compatible with the old namespace serializer LongSerializer
>
>
>
> Hi, I'm trying to load a list state using the State Processor API (Flink
> 1.14.3)
>
>
>
> Cluster settings:
>
>
>
> state.backend: rocksdb
>
> state.backend.incremental: true
>
> (...)
>
>
>
> Code:
>
>
>
> val env = ExecutionEnvironment.getExecutionEnvironment
>
> val savepoint = Savepoint.load(env, pathToSavepoint, new
> EmbeddedRocksDBStateBackend(true))
>
>
> val tpe = new
> MessageTypeInformation(MessageFactoryKey.forType(MessageFactoryType.WITH_PROTOBUF_PAYLOADS,
> null) // using Flink Stateful Functions
> val envelopeSerializer: TypeSerializer[Message] =
> tpe.createSerializer(env.getConfig)
>
> val listDescriptor = new
> ListStateDescriptor[Message]("delayed-message-buffer",
> envelopeSerializer.duplicate)
>
> (...)
> override def open(parameters: Configuration): Unit = {
>
> getRuntimeContext.getListState(listDescriptor) // fails with error [1]
>
> }
>
>
>
>
>
> Error [1]:
>
>
>
> Caused by: java.io.IOException: Failed to restore timer state
>
> at
> org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:177)
>
> at
> org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:64)
>
> at
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:183)
>
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
>
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
>
> at
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
>
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
>
> at java.base/java.lang.Thread.run(Thread.java:829)
>
> Caused by: java.lang.RuntimeException: Error while getting state
>
> at
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:74)
>
> at
> org.apache.flink.state.api.runtime.SavepointRuntimeContext.getListState(SavepointRuntimeContext.java:213)
>
> at
> x.x.x.x.x.myModule.StateReader$$anon$1.open(StateReader.scala:527)
>
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
>
> at
> org.apache.

State Processor API - VoidNamespaceSerializer must be compatible with the old namespace serializer LongSerializer

2022-10-25 Thread Filip Karnicki
Hi, I'm trying to load a list state using the State Processor API (Flink
1.14.3)

Cluster settings:

state.backend: rocksdb
state.backend.incremental: true
(...)


Code:

val env = ExecutionEnvironment.getExecutionEnvironment
val savepoint = Savepoint.load(env, pathToSavepoint, new
EmbeddedRocksDBStateBackend(true))

val tpe = new
MessageTypeInformation(MessageFactoryKey.forType(MessageFactoryType.WITH_PROTOBUF_PAYLOADS,
null) // using Flink Stateful Functions
val envelopeSerializer: TypeSerializer[Message] =
tpe.createSerializer(env.getConfig)
val listDescriptor = new
ListStateDescriptor[Message]("delayed-message-buffer",
envelopeSerializer.duplicate)

(...)
override def open(parameters: Configuration): Unit = {
getRuntimeContext.getListState(listDescriptor) // fails with error [1]
}


Error [1]:

Caused by: java.io.IOException: Failed to restore timer state

at
org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:177)

at
org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:64)

at
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:183)

at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)

at
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)

at
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)

at java.base/java.lang.Thread.run(Thread.java:829)

Caused by: java.lang.RuntimeException: Error while getting state

at
org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:74)

at
org.apache.flink.state.api.runtime.SavepointRuntimeContext.getListState(SavepointRuntimeContext.java:213)

at
x.x.x.x.x.myModule.StateReader$$anon$1.open(StateReader.scala:527)

at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)

at
org.apache.flink.state.api.input.operator.StateReaderOperator.open(StateReaderOperator.java:106)

at
org.apache.flink.state.api.input.operator.KeyedStateReaderOperator.open(KeyedStateReaderOperator.java:66)

at
org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:174)

... 7 more

Caused by: org.apache.flink.util.StateMigrationException: The new namespace
serializer (org.apache.flink.runtime.state.VoidNamespaceSerializer@2806d6da)
must be compatible with the old namespace serializer (
org.apache.flink.api.common.typeutils.base.LongSerializer@52b06bef).

at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:685)

at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:624)

at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:837)

at
org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)

at
org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:73)

at
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:302)

at
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:353)

at
org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:115)

at
org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(
*DefaultKeyedStateStore*.java:71)

... 13 more




It seems that *DefaultKeyedStateStore *always wants to use
VoidNamespaceSerializer.INSTANCE despite my state being created with a
LongSerializer namespace serializer.


Is there anything anyone can immediately see me doing wrong?


Thank you

Fil


Re: influxdb metrics reporter - 4k series per job restart

2022-07-22 Thread Filip Karnicki
Hi All,

Thank you for your replies. What ended up working for me was setting

metrics.reporter.influxdb.scope.variables.excludes:
job_id;task_attempt_num;tm_id;task_id;operator_id;task_attempt_id


On Fri, 1 Jul 2022 at 18:36, Mason Chen  wrote:

> Hi all,
>
> If you can wait for Flink 1.16, there is a new feature to filter metrics
> (includes/excludes filter). Additionally, you can already take advantage of
> dropping unnecessary labels with `scope.variables.excludes` in the current
> release. Link to 1.16 metric features:
> https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/metric_reporters/#reporter
>
> Best,
> Mason
>
> On Fri, Jul 1, 2022 at 3:55 AM Martijn Visser 
> wrote:
>
>> Have you considered setting the value for some of the series to a fixed
>> value? For example, if you're not interested in the value for ,
>> you could consider setting that to a fixed value "task_id" [1] ?
>>
>> Best regards,
>>
>> Martijn
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/#system-scope
>>
>> Op do 30 jun. 2022 om 15:52 schreef Weihua Hu :
>>
>>> Hi, Filip
>>>
>>> You can modify the InfluxdbReporter code to rewrite the
>>> notifyOfAddedMetric method and filter the required metrics for reporting.
>>>
>>> Best,
>>> Weihua
>>>
>>>
>>> On Thu, Jun 30, 2022 at 8:46 PM Filip Karnicki 
>>> wrote:
>>>
>>>> Hi All
>>>>
>>>> We're using the influx reporter (flink 1.14.3), which seems to create a
>>>> series per:
>>>> -[task|job]manager
>>>> - host
>>>> - job_id
>>>> - job_name
>>>> - subtask_index
>>>> - task_attempt_id
>>>> - task_attempt_num
>>>> - task_id
>>>> - tm_id
>>>>
>>>> which amounts to about 4k of series each time our job restarts itself
>>>>
>>>> We are currently experiencing problems with checkpoint duration
>>>> timeouts (> 60s) (unrelated) and every 60 secs our job restarts and creates
>>>> further 4k series in influxdb.
>>>>
>>>> Needless to say, the team managing influxdb is not too happy with the
>>>> amount of series we create.
>>>>
>>>> Is there anything I can do to either reduce the number of series, or
>>>> reduce the number of types of metrics in order to produce fewer series? (we
>>>> don't view all the available metrics in grafana, so we don't necessarily
>>>> have to send all of them)
>>>>
>>>> The db caps at 1M series, and with our current problems with
>>>> checkpointing we go through that many in a matter of hours
>>>>
>>>> Many thanks
>>>> Fil
>>>>
>>>>


influxdb metrics reporter - 4k series per job restart

2022-06-30 Thread Filip Karnicki
Hi All

We're using the influx reporter (flink 1.14.3), which seems to create a
series per:
-[task|job]manager
- host
- job_id
- job_name
- subtask_index
- task_attempt_id
- task_attempt_num
- task_id
- tm_id

which amounts to about 4k of series each time our job restarts itself

We are currently experiencing problems with checkpoint duration timeouts (>
60s) (unrelated) and every 60 secs our job restarts and creates further 4k
series in influxdb.

Needless to say, the team managing influxdb is not too happy with the
amount of series we create.

Is there anything I can do to either reduce the number of series, or reduce
the number of types of metrics in order to produce fewer series? (we don't
view all the available metrics in grafana, so we don't necessarily have to
send all of them)

The db caps at 1M series, and with our current problems with checkpointing
we go through that many in a matter of hours

Many thanks
Fil


Re: Exactly-once sink sync checkpoint stacking time effect

2022-03-29 Thread Filip Karnicki
Thank you very much for your answer.

I was able to reduce the number of sinks as you described. That helped a
lot, thank you.

I think you must be right with regards to (2) - opening a new transaction
being the culprit. It's unlikely to be (1) since this behaviour occurs even
when there are 0 messages going through a brand new, locally running kafka
cluster.

Kind regards,
Fil

On Tue, 29 Mar 2022 at 09:34, Arvid Heise  wrote:

> Hi Filip,
>
> two things will impact sync time for Kafka:
> 1. Flushing all old data [1], in particular flushing all in-flight
> partitions [2]. However, that shouldn't cause a stacking effect except when
> the brokers are overloaded on checkpoint.
> 2. Opening a new transaction [3]. Since all transactions are linearized on
> the Kafka brokers, this is the most likely root cause. Note that aborted
> checkpoints may require multiple transactions to be opened. So you could
> check if you have them quite often aborted.
>
> If you want to know more, I suggest you attach a profiler and find the
> specific culprit and report back [4]. There is a low probability that the
> sink framework has a bug that causes this behavior. In that case, we can
> fix it more easily than if it's a fundamental issue with Kafka. In general,
> exactly-once and low latency are somewhat contradicting requirements, so
> there is only so much you can do.
>
> Not knowing your topology but maybe you can reduce the number of sinks?
> With the KafkaRecordSerializationSchema you can set different topics for
> different ProducerRecords of the same DataStream.
>
> [1]
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L190-L190
> [2]
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java#L177-L183
> [3]
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L302-L321
> [4]
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/debugging/application_profiling/
>
>
> On Sat, Mar 26, 2022 at 2:11 PM Filip Karnicki 
> wrote:
>
>> Hi, I noticed that with each added (kafka) sink with exactly-once
>> guarantees, there looks to be a penalty of ~100ms in terms of sync
>> checkpointing time.
>>
>> Would anyone be able to explain and/or point me in the right direction in
>> the source code so that I could understand why that is? Specifically, why
>> there appears to be a 100ms added for _each_ sink, and not a flat 100ms for
>> all sinks, potentially pointing to a sequential set of IO calls (wld
>> guess)
>>
>> I would be keen to understand if there's anything I could do (incl.
>> contributing code) that would parallelise this penalty in terms of sync
>> checkpointing time.
>>
>> Alternatively, is there any setting that would help me bring the sync
>> checkpointing time down (and still get exactly-once guarantees)?
>>
>> Many thanks,
>> Fil
>>
>


Exactly-once sink sync checkpoint stacking time effect

2022-03-26 Thread Filip Karnicki
Hi, I noticed that with each added (kafka) sink with exactly-once
guarantees, there looks to be a penalty of ~100ms in terms of sync
checkpointing time.

Would anyone be able to explain and/or point me in the right direction in
the source code so that I could understand why that is? Specifically, why
there appears to be a 100ms added for _each_ sink, and not a flat 100ms for
all sinks, potentially pointing to a sequential set of IO calls (wld
guess)

I would be keen to understand if there's anything I could do (incl.
contributing code) that would parallelise this penalty in terms of sync
checkpointing time.

Alternatively, is there any setting that would help me bring the sync
checkpointing time down (and still get exactly-once guarantees)?

Many thanks,
Fil


Re: [statefun] hadoop dependencies and StatefulFunctionsConfigValidator

2022-03-08 Thread Filip Karnicki
Hi Roman, Igal (@ below)

Thank you for your answer. I don't think I'll have access to flink's lib
folder given it's a shared Cloudera cluster. The only thing I could think
of is to not include com.google.protobuf in the
classloader.parent-first-patterns.additional setting, and
including protobuf-java 3.7.1 in the uber jar.

I created a jira for this just now + a discuss thread on the dev group
https://issues.apache.org/jira/browse/FLINK-26537

Hi @Igal Shilman  , is the plugin solution outlined
by Roman something that fits in better with Statefun than having the
creators of uber .jars be responsible for using a statefun-compatible
protobuf-java?

Kind regards
Fil

On Tue, 8 Mar 2022 at 14:02, Roman Khachatryan  wrote:

> Hi Filip,
>
> Have you tried putting protobuf-java 3.7.1 into the Flink's lib/ folder?
> Or maybe re-writing the dependencies you mentioned to be loaded as
> plugins? [1]
>
> I don't see any other ways to solve this problem.
> Probably Chesnay or Seth will suggest a better solution.
>
> [1]
>
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/plugins/
>
>
> Regards,
> Roman
>
> On Fri, Mar 4, 2022 at 9:54 AM Filip Karnicki 
> wrote:
> >
> > Hi All!
> >
> > We're running a statefun uber jar on a shared cloudera flink cluster,
> the latter of which launches with some ancient protobuf dependencies
> because of reasons[1].
> >
> > Setting the following flink-config settings on the entire cluster
> >
> > classloader.parent-first-patterns.additional:
> org.apache.flink.statefun;org.apache.kafka;com.google.protobuf
> >
> > causes these old protobuf dependencies to get loaded over statefun's
> protobuf-java 3.7.1, and NoSuchMethod exceptions occur.
> >
> > We hacked together a version of statefun that doesn't perform the check
> whether the classloader settings contain the three patterns from above, and
> as long as our job uses protobouf-java 3.7.1 and the com.google.protobuf
> pattern is not present in the classloader.parent-first-patterns.additional
> setting, then all is well.
> >
> > Aside from removing old hadoop from the classpath, which may not be
> possible given that it's a shared cluster, is there anything we can do
> other than adding a configurable override not to perform the config check
> in StatefulFunctionsConfigValidator to an upcoming statefun core release?
> >
> > Many thanks
> > Fil
> >
> >
> > [1] We're still trying to find out if it's absolutely necessary to have
> these on the classpath.
>


Re: Shaded zookeeper - curator mismatch?

2022-03-07 Thread Filip Karnicki
Hi  Zhanghao

it's 3.5.5

Thank you
Fil

On Sat, 5 Mar 2022 at 08:12, Zhanghao Chen 
wrote:

> Hi Filip,
>
> Could you share the version of the ZK server you are connecting to?
>
>
> Best,
> Zhanghao Chen
> ------
> *From:* Filip Karnicki 
> *Sent:* Friday, March 4, 2022 23:12
> *To:* user 
> *Subject:* Shaded zookeeper - curator mismatch?
>
> Hi, I believe there's a mismatch in shaded zookeeper/curator dependencies.
> I see that curator 4.2.0 needs zookeeper 3.5.4-beta, but it's still used in
> flink-shaded-zookeeper-34, which as far as I can tell is used by flink
> runtime 1.14.3
>
> https://mvnrepository.com/artifact/org.apache.flink/flink-runtime/1.14.3
>
>
> https://github.com/apache/flink-shaded/blob/release-14.0/flink-shaded-zookeeper-parent/flink-shaded-zookeeper-34/pom.xml
>
> I believe this might be causing an issue for us while running a statefun
> uber jar in a cloudera cluster (indeed QuorumMaj only has a constructor
> that takes an int in zk 3.4.x, not a map)
>
> Am I understanding this correctly? Is this something that we need to fix
> with a patch?
>
>
> org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to
> initialize the cluster entrypoint YarnJobClusterEntrypoint.
>
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:216)
> ~[fatty.fil.jar:?]
>
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:617)
> [fatty.fil.jar:?]
>
> at
> org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:99)
> [flink-dist_2.12-1.14.0-csa1.6.1.0.jar:1.14.0-csa1.6.1.0]
>
> *Caused by: java.lang.NoSuchMethodError:
> org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.server.quorum.flexible.QuorumMaj.(Ljava/util/Map;)*
> V
>
> at
> org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.EnsembleTracker.(EnsembleTracker.java:57)
> ~[fatty.fil.jar:?]
>
> at
> org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorFrameworkImpl.(CuratorFrameworkImpl.java:159)
> ~[fatty.fil.jar:?]
>
> at
> org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFrameworkFactory$Builder.build(CuratorFrameworkFactory.java:165)
> ~[fatty.fil.jar:?]
>
> at
> org.apache.flink.runtime.util.ZooKeeperUtils.startCuratorFramework(ZooKeeperUtils.java:248)
> ~[fatty.fil.jar:?]
>
> at
> org.apache.flink.runtime.util.ZooKeeperUtils.startCuratorFramework(ZooKeeperUtils.java:233)
> ~[fatty.fil.jar:?]
>
> at
> org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:124)
> ~[fatty.fil.jar:?]
>
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:361)
> ~[fatty.fil.jar:?]
>
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:318)
> ~[fatty.fil.jar:?]
>
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:243)
> ~[fatty.fil.jar:?]
>
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$1(ClusterEntrypoint.java:193)
> ~[fatty.fil.jar:?]
>
> at java.security.AccessController.doPrivileged(Native Method)
> ~[?:1.8.0_212]
>
> at javax.security.auth.Subject.doAs(Subject.java:422)
> ~[?:1.8.0_212]
>
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1898)
> ~[hadoop-common-3.1.1.7.1.7.74-6.jar:?]
>
> at
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> ~[fatty.fil.jar:?]
>
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:190)
> ~[fatty.fil.jar:?]
>
> ... 2 more
>
>
>
> Many thanks
>
> Fil
>


Shaded zookeeper - curator mismatch?

2022-03-04 Thread Filip Karnicki
Hi, I believe there's a mismatch in shaded zookeeper/curator dependencies.
I see that curator 4.2.0 needs zookeeper 3.5.4-beta, but it's still used in
flink-shaded-zookeeper-34, which as far as I can tell is used by flink
runtime 1.14.3

https://mvnrepository.com/artifact/org.apache.flink/flink-runtime/1.14.3

https://github.com/apache/flink-shaded/blob/release-14.0/flink-shaded-zookeeper-parent/flink-shaded-zookeeper-34/pom.xml

I believe this might be causing an issue for us while running a statefun
uber jar in a cloudera cluster (indeed QuorumMaj only has a constructor
that takes an int in zk 3.4.x, not a map)

Am I understanding this correctly? Is this something that we need to fix
with a patch?


org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to
initialize the cluster entrypoint YarnJobClusterEntrypoint.

at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:216)
~[fatty.fil.jar:?]

at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:617)
[fatty.fil.jar:?]

at
org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:99)
[flink-dist_2.12-1.14.0-csa1.6.1.0.jar:1.14.0-csa1.6.1.0]

*Caused by: java.lang.NoSuchMethodError:
org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.server.quorum.flexible.QuorumMaj.(Ljava/util/Map;)*
V

at
org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.EnsembleTracker.(EnsembleTracker.java:57)
~[fatty.fil.jar:?]

at
org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorFrameworkImpl.(CuratorFrameworkImpl.java:159)
~[fatty.fil.jar:?]

at
org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFrameworkFactory$Builder.build(CuratorFrameworkFactory.java:165)
~[fatty.fil.jar:?]

at
org.apache.flink.runtime.util.ZooKeeperUtils.startCuratorFramework(ZooKeeperUtils.java:248)
~[fatty.fil.jar:?]

at
org.apache.flink.runtime.util.ZooKeeperUtils.startCuratorFramework(ZooKeeperUtils.java:233)
~[fatty.fil.jar:?]

at
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:124)
~[fatty.fil.jar:?]

at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:361)
~[fatty.fil.jar:?]

at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:318)
~[fatty.fil.jar:?]

at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:243)
~[fatty.fil.jar:?]

at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$1(ClusterEntrypoint.java:193)
~[fatty.fil.jar:?]

at java.security.AccessController.doPrivileged(Native Method)
~[?:1.8.0_212]

at javax.security.auth.Subject.doAs(Subject.java:422) ~[?:1.8.0_212]

at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1898)
~[hadoop-common-3.1.1.7.1.7.74-6.jar:?]

at
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
~[fatty.fil.jar:?]

at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:190)
~[fatty.fil.jar:?]

... 2 more



Many thanks

Fil


[statefun] hadoop dependencies and StatefulFunctionsConfigValidator

2022-03-04 Thread Filip Karnicki
Hi All!

We're running a statefun uber jar on a shared cloudera flink cluster,
the latter of which launches with some ancient protobuf dependencies
because of reasons[1].

Setting the following flink-config settings on the entire cluster

classloader.parent-first-patterns.additional:
org.apache.flink.statefun;org.apache.kafka;com.google.protobuf

causes these old protobuf dependencies to get loaded over statefun's
protobuf-java 3.7.1, and NoSuchMethod exceptions occur.

We hacked together a version of statefun that doesn't perform the check
whether the classloader settings contain the three patterns from above, and
as long as our job uses protobouf-java 3.7.1 and the
com.google.protobuf pattern
is not present in the classloader.parent-first-patterns.additional setting,
then all is well.

Aside from removing old hadoop from the classpath, which may not be
possible given that it's a shared cluster, is there anything we can do
other than adding a configurable override not to perform the config check
in StatefulFunctionsConfigValidator to an upcoming statefun core release?

Many thanks
Fil


[1] We're still trying to find out if it's absolutely necessary to have
these on the classpath.


Re: [statefun] client cert auth in remote function

2022-01-24 Thread Filip Karnicki
Cool, thanks! I'll speak to the shared cluster support team to see if they
can install our CA cert on every box. So we've got that side of
authentication sorted - flink can trust that the service is who it says it
is.

How about the other way around? Any thoughts on how I could provide a
*key*store
for the stateful functions job to use while calling remote function
services with TLS? The remote function server (undertow in our case) needs
to authenticate and authorise statefun based on the latter's cert.

Many thanks
Fil

On Mon, 24 Jan 2022 at 21:25, Igal Shilman  wrote:

> Hello Filip,
>
> As far as I know SslContextBuilder.forClient() should use the default
> trust store, so if you will install your self signed certificate in the
> community supported container image this should be picked up[1].
> The following user has reported something similar, and it seems that
> they've gone down a similar path [2].
>
> Cheers,
> Igal.
>
> [1] https://stackoverflow.com/a/35304873/4405470
> [2] https://lists.apache.org/thread/nxf7yk5ctcvndyygnvx7l34gldn0xgj3
>
>
> On Mon, Jan 24, 2022 at 7:08 PM Filip Karnicki 
> wrote:
>
>> Hi All!
>>
>> I was wondering if there's a way to secure a remote function by requiring
>> the client (flink) to use a client cert. Preferably a base64 encoded string
>> from the env properties, but that might be asking for a lot :)
>>
>> I had a look at the code, and NettySharedResources seems to use
>> SslContextBuilder.forClient(), and doesn't really seem to deal with setting
>> any kind of a keystore
>>
>> Also, I don't think that setting
>> -Djavax.net.ssl.trustStore=path/to/truststore.jks does anything, since I
>> keep getting 'unable to find valid certification path to requested target',
>> while an exported .pem from my tuststore works fine as a CA in postman
>>
>> I'm happy to contribute some code if need be, just point me in the right
>> direction
>>
>> Kind regards,
>> Fil
>>
>


[statefun] client cert auth in remote function

2022-01-24 Thread Filip Karnicki
Hi All!

I was wondering if there's a way to secure a remote function by requiring
the client (flink) to use a client cert. Preferably a base64 encoded string
from the env properties, but that might be asking for a lot :)

I had a look at the code, and NettySharedResources seems to use
SslContextBuilder.forClient(), and doesn't really seem to deal with setting
any kind of a keystore

Also, I don't think that setting
-Djavax.net.ssl.trustStore=path/to/truststore.jks does anything, since I
keep getting 'unable to find valid certification path to requested target',
while an exported .pem from my tuststore works fine as a CA in postman

I'm happy to contribute some code if need be, just point me in the right
direction

Kind regards,
Fil


[statefun] upgrade path - shared cluster use

2022-01-17 Thread Filip Karnicki
Hi, we're currently using statefun 3.1.1 on a shared cloudera cluster,
which is going to be updated to 1.14.x

We think this update might break our jobs, since 3.1.1 is not explicitly
compatible with 1.14.x (
https://flink.apache.org/downloads.html#apache-flink-stateful-functions-311)

Is there any appetite for statefun to always be made compatible with the
latest base flink version, or do we need to stop using the shared cluster
and procure our own? Or is the update of statefun to something like 3.2.0
(based on 1.14.x) just a matter of having the resources to do it?

Thanks
Fil


Statefun remote functions - acessing kafka headers from a remote function

2021-11-02 Thread Filip Karnicki
Hi, is there a neat way to access kafka headers from within a remote
function without using the datastream api to insert the headers as part of
a RoutableMessage payload?

Many thanks
Fil


Re: Statefun embedded functions - parallel per partition, sequential per key

2021-11-02 Thread Filip Karnicki
Hi All

Just an update for future reference, it turned out that the machine we were
using for this test didn't have enough memory for what we were asking it to
do. It was that simple. The upside is that not even with the world's most
unstable cluster did we manage to lose a single message.

Just as an aside, we got the best results by switching back to undertow,
but we ended up using it slightly differently than the current example in
the docs suggests. We needed to pass the work onto a worker thread because
we had a blocking call in our funcion

class Handler extends HttpHandler{
(...)

  def handleRequest(exchange: HttpServerExchange): Unit = {
if (exchange.isInIoThread) {
  exchange.dispatch(this)
  return
}
exchange.getRequestReceiver.receiveFullBytes((exchange, bytes) => {
  flinkHandler
.handle(Slices.wrap(bytes))
.whenComplete((response: Slice, exception: Throwable) =>
onComplete(exchange, response, exception))
})
  }

  def onComplete(exchange: HttpServerExchange, slice: Slice, throwable:
Throwable) = (... as per the example)

}

Many thanks again for your help, Igal

On Wed, 27 Oct 2021 at 13:59, Filip Karnicki 
wrote:

> Thanks for your reply Igal
>
> The reason why I'm using data stream integration is that the messages on
> kafka are in .json, and I need to convert them to protobufs for embedded
> functions. If I was using remote functions I wouldn't need to do that.
>
> With regards to performance, in order to exclude the possibility that it's
> the remote service that's causing a slowdown, I replaced the undertow
> example from the docs with 5 instances of webflux services that hand off
> the work from an event loop to a worker which then sleeps for 1 second. I
> then launched an nginx instance to forward the request in a round robin
> fashion to the 5 webflux instances.
>
> When I push 10_000 messages onto the ingress kafka topic, it takes upwards
> of 100 seconds to process all messages. The flink cluster first works
> pretty hard for about 30 seconds (at ~100% of cpu utilisation) then
> everything slows down and eventually I get tens of messages trickling down
> until, after the flink-side statefun job (not the remote job) crashes and
> gets restarted, which is when the last few stragglers get sent to the
> egress after 120+ seconds from the launch of the test.
>
> I can try to replicate this outside of my work environment if you'd like
> to run it yourself, but in the meantime, is there a way to achieve this
> 'sequencial-per-key' behaviour with the use of embedded functions? Those
> seem to be rock-solid. Maybe there are some internal classes that would at
> least provide a template on how to do it? I have a naive implementation
> ready (the one I described in the previous email) but I'm sure there are
> some edge cases I haven't thought of.
>
> Thanks again,
> Fil
>
>
>
> On Wed, 27 Oct 2021 at 09:43, Igal Shilman  wrote:
>
>> Hello Fil,
>>
>> Indeed what you are describing is exactly what a remote function does.
>>
>> I am curious to learn more about the current performance limitations that
>> you encounter with the remote functions.
>>
>> One thing to try in combination with the async transport, is to increase
>> the total number of in fight async operations, by setting the following
>> property in flink-conf.yaml:
>>
>> statefun.async.max-per-task
>>
>> To a much higher value than 1024, try experimenting with 16k,32k,64k and
>> even higher.
>>
>> Let me know if that improves the situation, and we can continue from
>> there.
>>
>> p.s,
>>
>> You've mentioned that you are using the data stream integration, where
>> there any particular reason you chose that? It has some limitations at the
>> moment with respect to remote functions.
>>
>>
>> Cheers,
>> Igal
>>
>> On Wed 27. Oct 2021 at 08:49, Filip Karnicki 
>> wrote:
>>
>>> Hi
>>>
>>> I have a kafka topic with json messages that I map to protobufs within a
>>> data stream, and then send those to embedded stateful functions using the
>>> datastream integration api (DataStream[RoutableMessage]). From there I need
>>> to make an idempotent long-running blocking IO call.
>>>
>>> I noticed that I was processing messages sequentially per kafka
>>> partition. Is there a way that I could process them sequentially by key
>>> only (but in parallel per partition)?
>>>
>>> I created some code that uses the embedded functions'
>>> registerAsyncOperation capabilities to make my long-running IO calls
>>> effectively asynchronous, but I had to add all this custom code to enque

Re: Statefun embedded functions - parallel per partition, sequential per key

2021-10-27 Thread Filip Karnicki
Thanks for your reply Igal

The reason why I'm using data stream integration is that the messages on
kafka are in .json, and I need to convert them to protobufs for embedded
functions. If I was using remote functions I wouldn't need to do that.

With regards to performance, in order to exclude the possibility that it's
the remote service that's causing a slowdown, I replaced the undertow
example from the docs with 5 instances of webflux services that hand off
the work from an event loop to a worker which then sleeps for 1 second. I
then launched an nginx instance to forward the request in a round robin
fashion to the 5 webflux instances.

When I push 10_000 messages onto the ingress kafka topic, it takes upwards
of 100 seconds to process all messages. The flink cluster first works
pretty hard for about 30 seconds (at ~100% of cpu utilisation) then
everything slows down and eventually I get tens of messages trickling down
until, after the flink-side statefun job (not the remote job) crashes and
gets restarted, which is when the last few stragglers get sent to the
egress after 120+ seconds from the launch of the test.

I can try to replicate this outside of my work environment if you'd like to
run it yourself, but in the meantime, is there a way to achieve this
'sequencial-per-key' behaviour with the use of embedded functions? Those
seem to be rock-solid. Maybe there are some internal classes that would at
least provide a template on how to do it? I have a naive implementation
ready (the one I described in the previous email) but I'm sure there are
some edge cases I haven't thought of.

Thanks again,
Fil



On Wed, 27 Oct 2021 at 09:43, Igal Shilman  wrote:

> Hello Fil,
>
> Indeed what you are describing is exactly what a remote function does.
>
> I am curious to learn more about the current performance limitations that
> you encounter with the remote functions.
>
> One thing to try in combination with the async transport, is to increase
> the total number of in fight async operations, by setting the following
> property in flink-conf.yaml:
>
> statefun.async.max-per-task
>
> To a much higher value than 1024, try experimenting with 16k,32k,64k and
> even higher.
>
> Let me know if that improves the situation, and we can continue from there.
>
> p.s,
>
> You've mentioned that you are using the data stream integration, where
> there any particular reason you chose that? It has some limitations at the
> moment with respect to remote functions.
>
>
> Cheers,
> Igal
>
> On Wed 27. Oct 2021 at 08:49, Filip Karnicki 
> wrote:
>
>> Hi
>>
>> I have a kafka topic with json messages that I map to protobufs within a
>> data stream, and then send those to embedded stateful functions using the
>> datastream integration api (DataStream[RoutableMessage]). From there I need
>> to make an idempotent long-running blocking IO call.
>>
>> I noticed that I was processing messages sequentially per kafka
>> partition. Is there a way that I could process them sequentially by key
>> only (but in parallel per partition)?
>>
>> I created some code that uses the embedded functions'
>> registerAsyncOperation capabilities to make my long-running IO calls
>> effectively asynchronous, but I had to add all this custom code to enqueue
>> and persist any messages for a key that came in while there was an
>> in-flight IO call happening for that key. I'm fairly confident that I can
>> figure out all the fault tolerance cases _eventually_ (including re-sending
>> the in-flight message upon getting the UNKNOWN status back from the async
>> operation).
>>
>> That said, am I missing a trick that would allow Flink/statefun to take
>> care of this "parallel per partition, sequential-per-key" behaviour? Remote
>> functions don't seem to have the performance we need, even with async http
>> transport.
>>
>> Many thanks!
>> Fil
>>
> --
>
> ---
>
> about.me/igalshilman
>
>


Statefun embedded functions - parallel per partition, sequential per key

2021-10-27 Thread Filip Karnicki
Hi

I have a kafka topic with json messages that I map to protobufs within a
data stream, and then send those to embedded stateful functions using the
datastream integration api (DataStream[RoutableMessage]). From there I need
to make an idempotent long-running blocking IO call.

I noticed that I was processing messages sequentially per kafka partition.
Is there a way that I could process them sequentially by key only (but in
parallel per partition)?

I created some code that uses the embedded functions'
registerAsyncOperation capabilities to make my long-running IO calls
effectively asynchronous, but I had to add all this custom code to enqueue
and persist any messages for a key that came in while there was an
in-flight IO call happening for that key. I'm fairly confident that I can
figure out all the fault tolerance cases _eventually_ (including re-sending
the in-flight message upon getting the UNKNOWN status back from the async
operation).

That said, am I missing a trick that would allow Flink/statefun to take
care of this "parallel per partition, sequential-per-key" behaviour? Remote
functions don't seem to have the performance we need, even with async http
transport.

Many thanks!
Fil


statefun: Unable to find a source translation for ingress

2021-02-10 Thread Filip Karnicki
Hi, I modified the Stateful Functions 2.2.0 asyc example to include a real
binding to kafka, I included statefun-flink-distribution and
stateful-kafka-io in the pom and I created a fat jar using the
maven-assembly-plugin,

and my flink cluster complains about:

java.lang.IllegalStateException: Unable to find a source translation for
ingress of type IngressType(statefun.kafka.io, universal-ingress), which is
bound for key IngressIdentifier(org.apache.flink.statefun.examples.async,
tasks, class
org.apache.flink.statefun.examples.async.events.TaskStartedEvent)
org.apache.flink.statefun.flink.core.translation.IngressToSourceFunctionTranslator.sourceFromSpec(IngressToSourceFunctionTranslator.java:45)
org.apache.flink.statefun.flink.core.common.Maps.transformValues(Maps.java:54)
org.apache.flink.statefun.flink.core.translation.IngressToSourceFunctionTranslator.translate(IngressToSourceFunctionTranslator.java:37)
org.apache.flink.statefun.flink.core.translation.Sources.ingressToSourceFunction(Sources.java:117)
org.apache.flink.statefun.flink.core.translation.Sources.create(Sources.java:52)
org.apache.flink.statefun.flink.core.translation.FlinkUniverse.configure(FlinkUniverse.java:44)
org.apache.flink.statefun.flink.core.StatefulFunctionsJob.main(StatefulFunctionsJob.java:74)
org.apache.flink.statefun.flink.core.StatefulFunctionsJob.main(StatefulFunctionsJob.java:47)
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.base/java.lang.reflect.Method.invoke(Method.java:566)
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
... 12 more


Does anyone have any idea why this wouldn't work on a cluster, yet is
completely fine when I'm using the test harness with a real kafka?

Many thanks
Fil