Hi Robert

Yes, I intend to commit the Ignite Source module as part of this jira
ticket.

https://issues.apache.org/jira/browse/IGNITE-3303

I am trying to resolve an issue specific to cancelling the Source function.
Flink serializes and distributes source function and I can see in debugger
different instances of Source function running in
StreamExecutionEnvironment.

During unit tests post execution of assert I want to cancel the Source
function but I can notice that the initial Source object created and the
current Source object are different and as a result the Source functions is
not getting cancelled causing a timeout exception in the unit tests.

Regards,
Saikat


On Sat, Dec 24, 2016 at 1:45 PM, Robert Metzger <rmetz...@apache.org> wrote:

> Hi Saikat,
>
> there is already a connector for Ignite and Flink in the Apache Ignite
> project: https://github.com/apache/ignite/tree/master/modules/flink
> Maybe you can contribute your Ignite source to that project as well.
>
> Regards,
> Robert
>
>
> On Thu, Dec 22, 2016 at 10:04 AM, Fabian Hueske <fhue...@gmail.com> wrote:
>
> > Hi,
> >
> > Flink serializes all user functions (including source functions) with
> Java
> > Serialization to ship them to the worker processes.
> > That's also why everything in a user function must be Serializable.
> >
> > There is not an easy way to synchronize running tasks. Each task has its
> > own function object and these might be distributed across different JVMs.
> > So even a static field won't help here.
> >
> > Best, Fabian
> >
> > 2016-12-22 9:12 GMT+01:00 Saikat Maitra <saikat.mai...@gmail.com>:
> >
> > > Hello,
> > >
> > > I am working on building Apache Ignite connector for Apache flink. I am
> > > currently developing the SourceFunction to consume Cache event from
> > Ignite
> > > cluster.
> > >
> > > Here is the PR https://github.com/apache/ignite/pull/870/files
> > >
> > > I am observing that during unit tests the IgniteSource instances are
> > > different which is created using the IgniteSource constructor and
> inside
> > > run() method. As a result when igniteSrc.cancel() is called the
> igniteSrc
> > > instance is not getting stopped.
> > >
> > > I wanted to discuss:
> > >
> > > 1. If Flink create a copy of IgniteSource object when
> > > env.addSource(igniteSrc)
> > > is called?
> > >
> > > A quick work around the problem is to use static boolean stopped
> variable
> > > which allows single IgniteSource instance but this limits using
> multiple
> > > IgniteSource with different cache combination.
> > >
> > >
> > > Regards
> > > Saikat
> > >
> >
>

Reply via email to