Hi Piotr, I'm not sure about: "Note that if you want your state (your HashMap) to be actually checkpointed, it must be either already defined as Flink manage’d state (like `ListState` in the example [1]), or you must copy content of your `HashMap` to Flink managed state during `snapshotState` call."
>From [1] we can read "Each parallel instance of the Kafka consumer maintains a map of topic partitions and offsets as its Operator State." Oskar was asking about ActiveMq and not Kafka but I guess the rule applies here also. The ActiveMq connector he is using is this one [2]. His question actually boils down to one thing, regarding this class [3]. Does having HashMap and not ConcurentHashMap in context of [3] for unacknowledgedMessages is thread safe. [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html [2] https://bahir.apache.org/docs/flink/current/flink-streaming-activemq/ [3] https://github.com/apache/bahir-flink/blob/master/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSource.java Piotr Nowojski-3 wrote > Hi, > > Regarding your last question, sorry I don’t know about ActiveMQ > connectors. > > I’m not sure exactly how you are implementing your SourceFunction. > Generally speaking `run()` method is executed in one thread, and other > operations like checkpointing, timers (if any) are executed from another > thread. In order to synchronise between those, user is expected to acquire > checkpoint lock in the `run()` method as it’s documented [1]. > > Note that if you want your state (your HashMap) to be actually > checkpointed, it must be either already defined as Flink manage’d state > (like `ListState` in the example [1]), or you must copy content of your > `HashMap` to Flink managed state during `snapshotState` call. > > Note 2, also keep in mind we are in the process of reimplementing source > interfaces [2] and probably Flink 1.11 will offer a new and better API for > that (SourceReader instead of SourceFunction). > > Piotrek > > [1] > https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.html > <https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.html> > > [2] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface > <https://cwiki.apache.org/confluence/display/FLINK/FLIP-27:+Refactor+Source+Interface> > >> On 29 Jan 2020, at 13:08, OskarM < > pentiak@ > > wrote: >> >> Hi all, >> >> I am using Flink with Bahir's Apache ActiveMQ connector. However it's >> quite >> dated and poses many limitations, most notably the source supports only >> ByteMessages, does not support parallelism and has a bug that is only >> fixed >> in a snapshot version. >> >> So I started implementing my own SourceFunction (still with parallelism >> of >> only 1) based on AMQSource. >> I want it to support Flink's checkpointing and make it work with ActiveMQ >> acks. >> AMQSource uses ordinary HashMap to store Messages to be acked in the >> broker >> and this is where my question arises. >> >> Is the HashMap safe to use here? >> >> Please correct me if I'm wrong, but my understanding is that /run/ method >> is >> executed in one thread and /acknowledgeIDs/ in another so there is a >> possibility of thread race (even if we assume all the message ids are >> unique). >> >> Also, do you know of any ActiveMQ specific (or JMS in general), more >> up-to-date connectors I could use which do not have the issues mentioned >> above? >> >> Thanks, >> Oskar >> >> >> >> -- >> Sent from: >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/