Hi Piotr,
I'm not sure about:
"Note that if you want your state (your HashMap) to be actually
checkpointed, it must be either already defined as Flink manage’d state
(like `ListState` in the example [1]), or you must copy content of your
`HashMap` to Flink managed state during `snapshotState` call."

>From [1] we can read
"Each parallel instance of the Kafka consumer maintains a map of topic
partitions and offsets as its Operator State."

Oskar was asking about ActiveMq and not Kafka but I guess the rule applies
here also. The ActiveMq connector he is using is this one [2].

His question actually boils down to one thing, regarding this class [3].
Does having HashMap and not ConcurentHashMap in context of [3] for
unacknowledgedMessages is thread safe.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html
[2] https://bahir.apache.org/docs/flink/current/flink-streaming-activemq/
[3]
https://github.com/apache/bahir-flink/blob/master/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSource.java
Piotr Nowojski-3 wrote
> Hi,
> 
> Regarding your last question, sorry I don’t know about ActiveMQ
> connectors.
> 
> I’m not sure exactly how you are implementing your SourceFunction.
> Generally speaking `run()` method is executed in one thread, and other
> operations like checkpointing, timers (if any) are executed from another
> thread. In order to synchronise between those, user is expected to acquire
> checkpoint lock in the `run()` method as it’s documented [1].
> 
> Note that if you want your state (your HashMap) to be actually
> checkpointed, it must be either already defined as Flink manage’d state
> (like `ListState` in the example [1]), or you must copy content of your
> `HashMap` to Flink managed state during `snapshotState` call.
> 
> Note 2, also keep in mind we are in the process of reimplementing source
> interfaces [2] and probably Flink 1.11 will offer a new and better API for
> that (SourceReader instead of SourceFunction). 
> 
> Piotrek
> 
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.html
> <https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.html>
>  
> [2]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-27:+Refactor+Source+Interface>
> 
>> On 29 Jan 2020, at 13:08, OskarM <

> pentiak@

> > wrote:
>> 
>> Hi all,
>> 
>> I am using Flink with Bahir's Apache ActiveMQ connector. However it's
>> quite
>> dated and poses many limitations, most notably the source supports only
>> ByteMessages, does not support parallelism and has a bug that is only
>> fixed
>> in a snapshot version.
>> 
>> So I started implementing my own SourceFunction (still with parallelism
>> of
>> only 1) based on AMQSource.
>> I want it to support Flink's checkpointing and make it work with ActiveMQ
>> acks.
>> AMQSource uses ordinary HashMap to store Messages to be acked in the
>> broker
>> and this is where my question arises.
>> 
>> Is the HashMap safe to use here?
>> 
>> Please correct me if I'm wrong, but my understanding is that /run/ method
>> is
>> executed in one thread and /acknowledgeIDs/ in another so there is a
>> possibility of thread race (even if we assume all the message ids are
>> unique).
>> 
>> Also, do you know of any ActiveMQ specific (or JMS in general), more
>> up-to-date connectors I could use which do not have the issues mentioned
>> above?
>> 
>> Thanks,
>> Oskar
>> 
>> 
>> 
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to