Hi,

Thanks for sharing the code pointers.

> His question actually boils down to one thing, regarding this class [3].
> Does having HashMap and not ConcurentHashMap in context of [3] for
> unacknowledgedMessages is thread safe.

Yes, it’s safe, because it’s used only in two places.

1. Under manually acquired checkpoint from the source thread

https://github.com/apache/bahir-flink/blob/d3bc0cba10888dcd1187c804fe37455f8af2b776/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSource.java#L228

2. From `acknowledgeIDs` method, which is executed from 
`notifyCheckpointCompleted`, which is also synchronised on the same lock.

https://github.com/apache/bahir-flink/blob/d3bc0cba10888dcd1187c804fe37455f8af2b776/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSource.java#L194

Piotrek

> On 30 Jan 2020, at 16:53, KristoffSC <krzysiek.chmielew...@gmail.com> wrote:
> 
> Hi Piotr,
> I'm not sure about:
> "Note that if you want your state (your HashMap) to be actually
> checkpointed, it must be either already defined as Flink manage’d state
> (like `ListState` in the example [1]), or you must copy content of your
> `HashMap` to Flink managed state during `snapshotState` call."
> 
> From [1] we can read
> "Each parallel instance of the Kafka consumer maintains a map of topic
> partitions and offsets as its Operator State."
> 
> Oskar was asking about ActiveMq and not Kafka but I guess the rule applies
> here also. The ActiveMq connector he is using is this one [2].
> 
> His question actually boils down to one thing, regarding this class [3].
> Does having HashMap and not ConcurentHashMap in context of [3] for
> unacknowledgedMessages is thread safe.
> 
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html
>  
> <https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html>
> [2] https://bahir.apache.org/docs/flink/current/flink-streaming-activemq/ 
> <https://bahir.apache.org/docs/flink/current/flink-streaming-activemq/>
> [3]
> https://github.com/apache/bahir-flink/blob/master/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSource.java
>  
> <https://github.com/apache/bahir-flink/blob/master/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSource.java>
> Piotr Nowojski-3 wrote
>> Hi,
>> 
>> Regarding your last question, sorry I don’t know about ActiveMQ
>> connectors.
>> 
>> I’m not sure exactly how you are implementing your SourceFunction.
>> Generally speaking `run()` method is executed in one thread, and other
>> operations like checkpointing, timers (if any) are executed from another
>> thread. In order to synchronise between those, user is expected to acquire
>> checkpoint lock in the `run()` method as it’s documented [1].
>> 
>> Note that if you want your state (your HashMap) to be actually
>> checkpointed, it must be either already defined as Flink manage’d state
>> (like `ListState` in the example [1]), or you must copy content of your
>> `HashMap` to Flink managed state during `snapshotState` call.
>> 
>> Note 2, also keep in mind we are in the process of reimplementing source
>> interfaces [2] and probably Flink 1.11 will offer a new and better API for
>> that (SourceReader instead of SourceFunction). 
>> 
>> Piotrek
>> 
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.html
>> &lt;https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.html&gt
>>  
>> <https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.html&gt>;
>>  
>> [2]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
>>  
>> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface>
>> &lt;https://cwiki.apache.org/confluence/display/FLINK/FLIP-27:+Refactor+Source+Interface&gt
>>  
>> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-27:+Refactor+Source+Interface&gt>;
>> 
>>> On 29 Jan 2020, at 13:08, OskarM &lt;
> 
>> pentiak@
> 
>> &gt; wrote:
>>> 
>>> Hi all,
>>> 
>>> I am using Flink with Bahir's Apache ActiveMQ connector. However it's
>>> quite
>>> dated and poses many limitations, most notably the source supports only
>>> ByteMessages, does not support parallelism and has a bug that is only
>>> fixed
>>> in a snapshot version.
>>> 
>>> So I started implementing my own SourceFunction (still with parallelism
>>> of
>>> only 1) based on AMQSource.
>>> I want it to support Flink's checkpointing and make it work with ActiveMQ
>>> acks.
>>> AMQSource uses ordinary HashMap to store Messages to be acked in the
>>> broker
>>> and this is where my question arises.
>>> 
>>> Is the HashMap safe to use here?
>>> 
>>> Please correct me if I'm wrong, but my understanding is that /run/ method
>>> is
>>> executed in one thread and /acknowledgeIDs/ in another so there is a
>>> possibility of thread race (even if we assume all the message ids are
>>> unique).
>>> 
>>> Also, do you know of any ActiveMQ specific (or JMS in general), more
>>> up-to-date connectors I could use which do not have the issues mentioned
>>> above?
>>> 
>>> Thanks,
>>> Oskar
>>> 
>>> 
>>> 
>>> --
>>> Sent from:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
> 
> 
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ 
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>

Reply via email to