Hi, Thanks for sharing the code pointers.
> His question actually boils down to one thing, regarding this class [3]. > Does having HashMap and not ConcurentHashMap in context of [3] for > unacknowledgedMessages is thread safe. Yes, it’s safe, because it’s used only in two places. 1. Under manually acquired checkpoint from the source thread https://github.com/apache/bahir-flink/blob/d3bc0cba10888dcd1187c804fe37455f8af2b776/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSource.java#L228 2. From `acknowledgeIDs` method, which is executed from `notifyCheckpointCompleted`, which is also synchronised on the same lock. https://github.com/apache/bahir-flink/blob/d3bc0cba10888dcd1187c804fe37455f8af2b776/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSource.java#L194 Piotrek > On 30 Jan 2020, at 16:53, KristoffSC <krzysiek.chmielew...@gmail.com> wrote: > > Hi Piotr, > I'm not sure about: > "Note that if you want your state (your HashMap) to be actually > checkpointed, it must be either already defined as Flink manage’d state > (like `ListState` in the example [1]), or you must copy content of your > `HashMap` to Flink managed state during `snapshotState` call." > > From [1] we can read > "Each parallel instance of the Kafka consumer maintains a map of topic > partitions and offsets as its Operator State." > > Oskar was asking about ActiveMq and not Kafka but I guess the rule applies > here also. The ActiveMq connector he is using is this one [2]. > > His question actually boils down to one thing, regarding this class [3]. > Does having HashMap and not ConcurentHashMap in context of [3] for > unacknowledgedMessages is thread safe. > > [1] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html > > <https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html> > [2] https://bahir.apache.org/docs/flink/current/flink-streaming-activemq/ > <https://bahir.apache.org/docs/flink/current/flink-streaming-activemq/> > [3] > https://github.com/apache/bahir-flink/blob/master/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSource.java > > <https://github.com/apache/bahir-flink/blob/master/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSource.java> > Piotr Nowojski-3 wrote >> Hi, >> >> Regarding your last question, sorry I don’t know about ActiveMQ >> connectors. >> >> I’m not sure exactly how you are implementing your SourceFunction. >> Generally speaking `run()` method is executed in one thread, and other >> operations like checkpointing, timers (if any) are executed from another >> thread. In order to synchronise between those, user is expected to acquire >> checkpoint lock in the `run()` method as it’s documented [1]. >> >> Note that if you want your state (your HashMap) to be actually >> checkpointed, it must be either already defined as Flink manage’d state >> (like `ListState` in the example [1]), or you must copy content of your >> `HashMap` to Flink managed state during `snapshotState` call. >> >> Note 2, also keep in mind we are in the process of reimplementing source >> interfaces [2] and probably Flink 1.11 will offer a new and better API for >> that (SourceReader instead of SourceFunction). >> >> Piotrek >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.html >> <https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.html> >> >> <https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.html>>; >> >> [2] >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface >> >> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface> >> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-27:+Refactor+Source+Interface> >> >> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-27:+Refactor+Source+Interface>>; >> >>> On 29 Jan 2020, at 13:08, OskarM < > >> pentiak@ > >> > wrote: >>> >>> Hi all, >>> >>> I am using Flink with Bahir's Apache ActiveMQ connector. However it's >>> quite >>> dated and poses many limitations, most notably the source supports only >>> ByteMessages, does not support parallelism and has a bug that is only >>> fixed >>> in a snapshot version. >>> >>> So I started implementing my own SourceFunction (still with parallelism >>> of >>> only 1) based on AMQSource. >>> I want it to support Flink's checkpointing and make it work with ActiveMQ >>> acks. >>> AMQSource uses ordinary HashMap to store Messages to be acked in the >>> broker >>> and this is where my question arises. >>> >>> Is the HashMap safe to use here? >>> >>> Please correct me if I'm wrong, but my understanding is that /run/ method >>> is >>> executed in one thread and /acknowledgeIDs/ in another so there is a >>> possibility of thread race (even if we assume all the message ids are >>> unique). >>> >>> Also, do you know of any ActiveMQ specific (or JMS in general), more >>> up-to-date connectors I could use which do not have the issues mentioned >>> above? >>> >>> Thanks, >>> Oskar >>> >>> >>> >>> -- >>> Sent from: >>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ > > > > > > -- > Sent from: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>