IGNITE-3303 Apache Flink Integration Fixes #5020

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3b9c415e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3b9c415e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3b9c415e

Branch: refs/heads/ignite-601
Commit: 3b9c415ee2c4b6461c18f87f46f1950f324f5662
Parents: d0facb26
Author: samaitra <saikat.mai...@gmail.com>
Authored: Tue Dec 25 19:45:30 2018 +0300
Committer: Dmitriy Pavlov <dpav...@apache.org>
Committed: Tue Dec 25 19:45:30 2018 +0300

----------------------------------------------------------------------
 modules/flink/pom.xml                           |  21 +-
 .../ignite/source/flink/IgniteSource.java       | 223 +++++++++++++++++++
 .../ignite/source/flink/TaskRemoteFilter.java   |  60 +++++
 .../ignite/source/flink/package-info.java       |  21 ++
 .../source/flink/FlinkIgniteSourceSelfTest.java | 157 +++++++++++++
 .../flink/FlinkIgniteSourceSelfTestSuite.java   |  42 ++++
 parent/pom.xml                                  |   6 +-
 7 files changed, 526 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/3b9c415e/modules/flink/pom.xml
----------------------------------------------------------------------
diff --git a/modules/flink/pom.xml b/modules/flink/pom.xml
index 1082e7b..1597c2a 100644
--- a/modules/flink/pom.xml
+++ b/modules/flink/pom.xml
@@ -35,7 +35,8 @@
     <url>http://ignite.apache.org</url>
 
     <properties>
-        <flink.version>1.3.0</flink.version>
+        <flink.version>1.5.0</flink.version>
+        <kryo-serializers.version>0.42</kryo-serializers.version>
     </properties>
 
     <dependencies>
@@ -91,7 +92,7 @@
 
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-streaming-java_2.10</artifactId>
+            <artifactId>flink-streaming-java_2.11</artifactId>
             <version>${flink.version}</version>
             <exclusions>
                 <exclusion>
@@ -123,7 +124,7 @@
 
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-clients_2.10</artifactId>
+            <artifactId>flink-clients_2.11</artifactId>
             <version>${flink.version}</version>
             <exclusions>
                 <exclusion>
@@ -154,6 +155,13 @@
         </dependency>
 
         <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-test-utils_2.11</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
             <groupId>org.apache.ignite</groupId>
             <artifactId>ignite-core</artifactId>
             <version>${project.version}</version>
@@ -174,6 +182,13 @@
             <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <version>${mockito.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
 </project>

http://git-wip-us.apache.org/repos/asf/ignite/blob/3b9c415e/modules/flink/src/main/java/org/apache/ignite/source/flink/IgniteSource.java
----------------------------------------------------------------------
diff --git 
a/modules/flink/src/main/java/org/apache/ignite/source/flink/IgniteSource.java 
b/modules/flink/src/main/java/org/apache/ignite/source/flink/IgniteSource.java
new file mode 100644
index 0000000..2dd670a
--- /dev/null
+++ 
b/modules/flink/src/main/java/org/apache/ignite/source/flink/IgniteSource.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.source.flink;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.events.CacheEvent;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Apache Flink Ignite source implemented as a RichParallelSourceFunction.
+ */
+public class IgniteSource extends RichParallelSourceFunction<CacheEvent> {
+    /** Serial version uid. */
+    private static final long serialVersionUID = 1L;
+
+    /** Logger. */
+    private static final Logger log = 
LoggerFactory.getLogger(IgniteSource.class);
+
+    /** Default max number of events taken from the buffer at once. */
+    private static final int DFLT_EVT_BATCH_SIZE = 1;
+
+    /** Default number of milliseconds timeout for event buffer queue 
operation. */
+    private static final int DFLT_EVT_BUFFER_TIMEOUT = 10;
+
+    /** Event buffer. */
+    private BlockingQueue<CacheEvent> evtBuf = new LinkedBlockingQueue<>();
+
+    /** Remote Listener id. */
+    private UUID rmtLsnrId;
+
+    /** Flag for isRunning state. */
+    private volatile boolean isRunning;
+
+    /** Max number of events taken from the buffer at once. */
+    private int evtBatchSize = DFLT_EVT_BATCH_SIZE;
+
+    /** Number of milliseconds timeout for event buffer queue operation. */
+    private int evtBufTimeout = DFLT_EVT_BUFFER_TIMEOUT;
+
+    /** Local listener. */
+    private final TaskLocalListener locLsnr = new TaskLocalListener();
+
+    /** Ignite instance. */
+    @IgniteInstanceResource
+    private transient Ignite ignite;
+
+    /** Cache name. */
+    private final String cacheName;
+
+    /**
+     * Sets Ignite instance.
+     *
+     * @param ignite Ignite instance.
+     */
+    public void setIgnite(Ignite ignite) {
+        this.ignite = ignite;
+    }
+
+    /**
+     * Sets Event Batch Size.
+     *
+     * @param evtBatchSize Event Batch Size.
+     */
+    public void setEvtBatchSize(int evtBatchSize) {
+        this.evtBatchSize = evtBatchSize;
+    }
+
+    /**
+     * Sets Event Buffer timeout.
+     *
+     * @param evtBufTimeout Event Buffer timeout.
+     */
+    public void setEvtBufTimeout(int evtBufTimeout) {
+        this.evtBufTimeout = evtBufTimeout;
+    }
+
+    /**
+     * @return Local Task Listener
+     */
+    TaskLocalListener getLocLsnr() {
+        return locLsnr;
+    }
+
+    /**
+     * Default IgniteSource constructor.
+     *
+     * @param cacheName Cache name.
+     */
+    public IgniteSource(String cacheName) {
+        this.cacheName = cacheName;
+    }
+
+    /**
+     * Starts Ignite source.
+     *
+     * @param filter User defined filter.
+     * @param cacheEvts Converts comma-delimited cache events strings to 
Ignite internal representation.
+     */
+    @SuppressWarnings("unchecked")
+    public void start(IgnitePredicate<CacheEvent> filter, int... cacheEvts) {
+        A.notNull(cacheName, "Cache name");
+
+        TaskRemoteFilter rmtLsnr = new TaskRemoteFilter(cacheName, filter);
+
+        try {
+            synchronized (this) {
+                if (isRunning)
+                    return;
+
+                isRunning = true;
+
+                rmtLsnrId = 
ignite.events(ignite.cluster().forCacheNodes(cacheName))
+                    .remoteListen(locLsnr, rmtLsnr, cacheEvts);
+            }
+        }
+        catch (IgniteException e) {
+            log.error("Failed to register event listener!", e);
+
+            throw e;
+        }
+    }
+
+    /**
+     * Transfers data from grid.
+     *
+     * @param ctx SourceContext.
+     */
+    @Override public void run(SourceContext<CacheEvent> ctx) {
+        List<CacheEvent> evts = new ArrayList<>(evtBatchSize);
+
+        try {
+            while (isRunning) {
+                // block here for some time if there is no events from source
+                CacheEvent firstEvt = evtBuf.poll(1, TimeUnit.SECONDS);
+
+                if (firstEvt != null)
+                    evts.add(firstEvt);
+
+                if (evtBuf.drainTo(evts, evtBatchSize) > 0) {
+                    synchronized (ctx.getCheckpointLock()) {
+                        for (CacheEvent evt : evts)
+                            ctx.collect(evt);
+
+                        evts.clear();
+                    }
+                }
+            }
+        }
+        catch (Exception e) {
+            if (X.hasCause(e, InterruptedException.class))
+                return; // Executing thread can be interrupted see cancel() 
javadoc.
+
+            log.error("Error while processing cache event of " + cacheName, e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void cancel() {
+        synchronized (this) {
+            if (!isRunning)
+                return;
+
+            isRunning = false;
+
+            if (rmtLsnrId != null && ignite != null) {
+                ignite.events(ignite.cluster().forCacheNodes(cacheName))
+                    .stopRemoteListen(rmtLsnrId);
+
+                rmtLsnrId = null;
+            }
+        }
+    }
+
+    /**
+     * Local listener buffering cache events to be further sent to Flink.
+     */
+    private class TaskLocalListener implements IgniteBiPredicate<UUID, 
CacheEvent> {
+        /** {@inheritDoc} */
+        @Override public boolean apply(UUID id, CacheEvent evt) {
+            try {
+                if (!evtBuf.offer(evt, evtBufTimeout, TimeUnit.MILLISECONDS))
+                    log.error("Failed to buffer event {}", evt.name());
+            }
+            catch (InterruptedException ignored) {
+                log.error("Failed to buffer event using local task listener 
{}", evt.name());
+
+                Thread.currentThread().interrupt(); // Restore interrupt flag.
+            }
+
+            return true;
+        }
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/ignite/blob/3b9c415e/modules/flink/src/main/java/org/apache/ignite/source/flink/TaskRemoteFilter.java
----------------------------------------------------------------------
diff --git 
a/modules/flink/src/main/java/org/apache/ignite/source/flink/TaskRemoteFilter.java
 
b/modules/flink/src/main/java/org/apache/ignite/source/flink/TaskRemoteFilter.java
new file mode 100644
index 0000000..4c89d25
--- /dev/null
+++ 
b/modules/flink/src/main/java/org/apache/ignite/source/flink/TaskRemoteFilter.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.source.flink;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.events.CacheEvent;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.resources.IgniteInstanceResource;
+
+/**
+ * Remote filter.
+ */
+public class TaskRemoteFilter implements IgnitePredicate<CacheEvent> {
+    /** Serial version Id. */
+    private static final long serialVersionUID = 1L;
+
+    /** Ignite Instance Resource. */
+    @IgniteInstanceResource
+    private Ignite ignite;
+
+    /** Cache name. */
+    private final String cacheName;
+
+    /** User-defined filter. */
+    private final IgnitePredicate<CacheEvent> filter;
+
+    /**
+     * @param cacheName Cache name.
+     * @param filter IgnitePredicate.
+     */
+    TaskRemoteFilter(String cacheName, IgnitePredicate<CacheEvent> filter) {
+        this.cacheName = cacheName;
+        this.filter = filter;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean apply(CacheEvent evt) {
+        Affinity<Object> affinity = ignite.affinity(cacheName);
+
+        // Process this event. Ignored on backups.
+        return affinity.isPrimary(ignite.cluster().localNode(), evt.key()) &&
+                (filter == null || filter.apply(evt));
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/3b9c415e/modules/flink/src/main/java/org/apache/ignite/source/flink/package-info.java
----------------------------------------------------------------------
diff --git 
a/modules/flink/src/main/java/org/apache/ignite/source/flink/package-info.java 
b/modules/flink/src/main/java/org/apache/ignite/source/flink/package-info.java
new file mode 100644
index 0000000..adc33fc
--- /dev/null
+++ 
b/modules/flink/src/main/java/org/apache/ignite/source/flink/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * IgniteSource -- source connector integration with Apache Flink.
+ */
+package org.apache.ignite.source.flink;

http://git-wip-us.apache.org/repos/asf/ignite/blob/3b9c415e/modules/flink/src/test/java/org/apache/ignite/source/flink/FlinkIgniteSourceSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/flink/src/test/java/org/apache/ignite/source/flink/FlinkIgniteSourceSelfTest.java
 
b/modules/flink/src/test/java/org/apache/ignite/source/flink/FlinkIgniteSourceSelfTest.java
new file mode 100644
index 0000000..95a98dc
--- /dev/null
+++ 
b/modules/flink/src/test/java/org/apache/ignite/source/flink/FlinkIgniteSourceSelfTest.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.source.flink;
+
+import java.util.UUID;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCluster;
+import org.apache.ignite.IgniteEvents;
+import org.apache.ignite.cluster.ClusterGroup;
+import org.apache.ignite.events.CacheEvent;
+import org.apache.ignite.events.EventType;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for {@link IgniteSource}.
+ */
+@RunWith(JUnit4.class)
+public class FlinkIgniteSourceSelfTest extends GridCommonAbstractTest {
+    /** Cache name. */
+    private static final String TEST_CACHE = "testCache";
+
+    /** Flink source context. */
+    private SourceFunction.SourceContext<CacheEvent> ctx;
+
+    /** Ignite instance. */
+    private Ignite ignite;
+
+    /** Cluster Group */
+    private ClusterGroup clsGrp;
+
+    /** Ignite Source instance */
+    private IgniteSource igniteSrc;
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Before
+    public void setUpTest() throws Exception {
+        ctx = mock(SourceFunction.SourceContext.class);
+        ignite = mock(Ignite.class);
+        clsGrp = mock(ClusterGroup.class);
+
+        IgniteEvents igniteEvts = mock(IgniteEvents.class);
+        IgniteCluster igniteCluster = mock(IgniteCluster.class);
+        TaskRemoteFilter taskRemoteFilter = mock(TaskRemoteFilter.class);
+
+        when(ctx.getCheckpointLock()).thenReturn(new Object());
+        when(ignite.events(clsGrp)).thenReturn(igniteEvts);
+        when(ignite.cluster()).thenReturn(igniteCluster);
+
+        igniteSrc = new IgniteSource(TEST_CACHE);
+        igniteSrc.setIgnite(ignite);
+        igniteSrc.setEvtBatchSize(1);
+        igniteSrc.setEvtBufTimeout(1);
+        igniteSrc.setRuntimeContext(createRuntimeContext());
+
+        IgniteBiPredicate locLsnr = igniteSrc.getLocLsnr();
+
+        when(igniteEvts.remoteListen(locLsnr, taskRemoteFilter, 
EventType.EVT_CACHE_OBJECT_PUT ))
+            .thenReturn(UUID.randomUUID());
+
+        when(igniteCluster.forCacheNodes(TEST_CACHE)).thenReturn(clsGrp);
+    }
+
+    /**  */
+    @After
+    public void tearDownTest() {
+        igniteSrc.cancel();
+    }
+
+    /** Creates streaming runtime context */
+    private RuntimeContext createRuntimeContext() {
+        StreamingRuntimeContext runtimeCtx = 
mock(StreamingRuntimeContext.class);
+
+        when(runtimeCtx.isCheckpointingEnabled()).thenReturn(true);
+
+        return runtimeCtx;
+    }
+
+    /**
+     * Tests Ignite source start operation.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testIgniteSourceStart() throws Exception {
+        igniteSrc.start(null, EventType.EVT_CACHE_OBJECT_PUT);
+
+        verify(ignite.events(clsGrp), times(1));
+    }
+
+    /**
+     * Tests Ignite source run operation.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testIgniteSourceRun() throws Exception {
+        IgniteInternalFuture f = GridTestUtils.runAsync(new Runnable() {
+            @Override public void run() {
+                try {
+                    igniteSrc.start(null, EventType.EVT_CACHE_OBJECT_PUT);
+
+                    igniteSrc.run(ctx);
+                }
+                catch (Throwable e) {
+                    igniteSrc.cancel();
+
+                   throw new AssertionError("Unexpected failure.", e);
+                }
+            }
+        });
+
+        long endTime = System.currentTimeMillis() + 2000;
+
+        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                return f.isDone() || System.currentTimeMillis() > endTime;
+            }
+        }, 3000);
+
+        igniteSrc.cancel();
+
+        f.get(3000);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/3b9c415e/modules/flink/src/test/java/org/apache/ignite/source/flink/FlinkIgniteSourceSelfTestSuite.java
----------------------------------------------------------------------
diff --git 
a/modules/flink/src/test/java/org/apache/ignite/source/flink/FlinkIgniteSourceSelfTestSuite.java
 
b/modules/flink/src/test/java/org/apache/ignite/source/flink/FlinkIgniteSourceSelfTestSuite.java
new file mode 100644
index 0000000..3c144d9
--- /dev/null
+++ 
b/modules/flink/src/test/java/org/apache/ignite/source/flink/FlinkIgniteSourceSelfTestSuite.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.source.flink;
+
+import junit.framework.JUnit4TestAdapter;
+import junit.framework.TestSuite;
+import org.junit.runner.RunWith;
+import org.junit.runners.AllTests;
+
+/**
+ * Apache Flink source tests.
+ */
+@RunWith(AllTests.class)
+public class FlinkIgniteSourceSelfTestSuite extends TestSuite {
+    /**
+     * @return Test suite.
+     * @throws Exception Thrown in case of the failure.
+     */
+    public static TestSuite suite() throws Exception {
+        TestSuite suite = new TestSuite("Apache Flink Source Test Suite");
+
+        suite.addTest(new JUnit4TestAdapter(FlinkIgniteSourceSelfTest.class));
+
+        return suite;
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/ignite/blob/3b9c415e/parent/pom.xml
----------------------------------------------------------------------
diff --git a/parent/pom.xml b/parent/pom.xml
index 841a45b..00e5634 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -467,10 +467,14 @@
                                 <packages>org.apache.ignite.osgi*</packages>
                             </group>
                             <group>
-                                <title>Flink integration</title>
+                                <title>Flink Sink Integration</title>
                                 
<packages>org.apache.ignite.sink.flink*</packages>
                             </group>
                             <group>
+                                <title>Flink Source Integration</title>
+                                
<packages>org.apache.ignite.source.flink*</packages>
+                            </group>
+                            <group>
                                 <title>SpringData integration</title>
                                 
<packages>org.apache.ignite.springdata.repository*</packages>
                             </group>

Reply via email to