Hi Robert Yes, I intend to commit the Ignite Source module as part of this jira ticket.
https://issues.apache.org/jira/browse/IGNITE-3303 I am trying to resolve an issue specific to cancelling the Source function. Flink serializes and distributes source function and I can see in debugger different instances of Source function running in StreamExecutionEnvironment. During unit tests post execution of assert I want to cancel the Source function but I can notice that the initial Source object created and the current Source object are different and as a result the Source functions is not getting cancelled causing a timeout exception in the unit tests. Regards, Saikat On Sat, Dec 24, 2016 at 1:45 PM, Robert Metzger <rmetz...@apache.org> wrote: > Hi Saikat, > > there is already a connector for Ignite and Flink in the Apache Ignite > project: https://github.com/apache/ignite/tree/master/modules/flink > Maybe you can contribute your Ignite source to that project as well. > > Regards, > Robert > > > On Thu, Dec 22, 2016 at 10:04 AM, Fabian Hueske <fhue...@gmail.com> wrote: > > > Hi, > > > > Flink serializes all user functions (including source functions) with > Java > > Serialization to ship them to the worker processes. > > That's also why everything in a user function must be Serializable. > > > > There is not an easy way to synchronize running tasks. Each task has its > > own function object and these might be distributed across different JVMs. > > So even a static field won't help here. > > > > Best, Fabian > > > > 2016-12-22 9:12 GMT+01:00 Saikat Maitra <saikat.mai...@gmail.com>: > > > > > Hello, > > > > > > I am working on building Apache Ignite connector for Apache flink. I am > > > currently developing the SourceFunction to consume Cache event from > > Ignite > > > cluster. > > > > > > Here is the PR https://github.com/apache/ignite/pull/870/files > > > > > > I am observing that during unit tests the IgniteSource instances are > > > different which is created using the IgniteSource constructor and > inside > > > run() method. As a result when igniteSrc.cancel() is called the > igniteSrc > > > instance is not getting stopped. > > > > > > I wanted to discuss: > > > > > > 1. If Flink create a copy of IgniteSource object when > > > env.addSource(igniteSrc) > > > is called? > > > > > > A quick work around the problem is to use static boolean stopped > variable > > > which allows single IgniteSource instance but this limits using > multiple > > > IgniteSource with different cache combination. > > > > > > > > > Regards > > > Saikat > > > > > >