IGNITE-3303 Apache Flink Integration Fixes #5020
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3b9c415e Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3b9c415e Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3b9c415e Branch: refs/heads/ignite-601 Commit: 3b9c415ee2c4b6461c18f87f46f1950f324f5662 Parents: d0facb26 Author: samaitra <saikat.mai...@gmail.com> Authored: Tue Dec 25 19:45:30 2018 +0300 Committer: Dmitriy Pavlov <dpav...@apache.org> Committed: Tue Dec 25 19:45:30 2018 +0300 ---------------------------------------------------------------------- modules/flink/pom.xml | 21 +- .../ignite/source/flink/IgniteSource.java | 223 +++++++++++++++++++ .../ignite/source/flink/TaskRemoteFilter.java | 60 +++++ .../ignite/source/flink/package-info.java | 21 ++ .../source/flink/FlinkIgniteSourceSelfTest.java | 157 +++++++++++++ .../flink/FlinkIgniteSourceSelfTestSuite.java | 42 ++++ parent/pom.xml | 6 +- 7 files changed, 526 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/3b9c415e/modules/flink/pom.xml ---------------------------------------------------------------------- diff --git a/modules/flink/pom.xml b/modules/flink/pom.xml index 1082e7b..1597c2a 100644 --- a/modules/flink/pom.xml +++ b/modules/flink/pom.xml @@ -35,7 +35,8 @@ <url>http://ignite.apache.org</url> <properties> - <flink.version>1.3.0</flink.version> + <flink.version>1.5.0</flink.version> + <kryo-serializers.version>0.42</kryo-serializers.version> </properties> <dependencies> @@ -91,7 +92,7 @@ <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-streaming-java_2.10</artifactId> + <artifactId>flink-streaming-java_2.11</artifactId> <version>${flink.version}</version> <exclusions> <exclusion> @@ -123,7 +124,7 @@ <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-clients_2.10</artifactId> + <artifactId>flink-clients_2.11</artifactId> <version>${flink.version}</version> <exclusions> <exclusion> @@ -154,6 +155,13 @@ </dependency> <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils_2.11</artifactId> + <version>${flink.version}</version> + <scope>test</scope> + </dependency> + + <dependency> <groupId>org.apache.ignite</groupId> <artifactId>ignite-core</artifactId> <version>${project.version}</version> @@ -174,6 +182,13 @@ <version>${project.version}</version> <scope>test</scope> </dependency> + + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <version>${mockito.version}</version> + <scope>test</scope> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/ignite/blob/3b9c415e/modules/flink/src/main/java/org/apache/ignite/source/flink/IgniteSource.java ---------------------------------------------------------------------- diff --git a/modules/flink/src/main/java/org/apache/ignite/source/flink/IgniteSource.java b/modules/flink/src/main/java/org/apache/ignite/source/flink/IgniteSource.java new file mode 100644 index 0000000..2dd670a --- /dev/null +++ b/modules/flink/src/main/java/org/apache/ignite/source/flink/IgniteSource.java @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.source.flink; + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteException; +import org.apache.ignite.events.CacheEvent; +import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.internal.util.typedef.internal.A; +import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.resources.IgniteInstanceResource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Apache Flink Ignite source implemented as a RichParallelSourceFunction. + */ +public class IgniteSource extends RichParallelSourceFunction<CacheEvent> { + /** Serial version uid. */ + private static final long serialVersionUID = 1L; + + /** Logger. */ + private static final Logger log = LoggerFactory.getLogger(IgniteSource.class); + + /** Default max number of events taken from the buffer at once. */ + private static final int DFLT_EVT_BATCH_SIZE = 1; + + /** Default number of milliseconds timeout for event buffer queue operation. */ + private static final int DFLT_EVT_BUFFER_TIMEOUT = 10; + + /** Event buffer. */ + private BlockingQueue<CacheEvent> evtBuf = new LinkedBlockingQueue<>(); + + /** Remote Listener id. */ + private UUID rmtLsnrId; + + /** Flag for isRunning state. */ + private volatile boolean isRunning; + + /** Max number of events taken from the buffer at once. */ + private int evtBatchSize = DFLT_EVT_BATCH_SIZE; + + /** Number of milliseconds timeout for event buffer queue operation. */ + private int evtBufTimeout = DFLT_EVT_BUFFER_TIMEOUT; + + /** Local listener. */ + private final TaskLocalListener locLsnr = new TaskLocalListener(); + + /** Ignite instance. */ + @IgniteInstanceResource + private transient Ignite ignite; + + /** Cache name. */ + private final String cacheName; + + /** + * Sets Ignite instance. + * + * @param ignite Ignite instance. + */ + public void setIgnite(Ignite ignite) { + this.ignite = ignite; + } + + /** + * Sets Event Batch Size. + * + * @param evtBatchSize Event Batch Size. + */ + public void setEvtBatchSize(int evtBatchSize) { + this.evtBatchSize = evtBatchSize; + } + + /** + * Sets Event Buffer timeout. + * + * @param evtBufTimeout Event Buffer timeout. + */ + public void setEvtBufTimeout(int evtBufTimeout) { + this.evtBufTimeout = evtBufTimeout; + } + + /** + * @return Local Task Listener + */ + TaskLocalListener getLocLsnr() { + return locLsnr; + } + + /** + * Default IgniteSource constructor. + * + * @param cacheName Cache name. + */ + public IgniteSource(String cacheName) { + this.cacheName = cacheName; + } + + /** + * Starts Ignite source. + * + * @param filter User defined filter. + * @param cacheEvts Converts comma-delimited cache events strings to Ignite internal representation. + */ + @SuppressWarnings("unchecked") + public void start(IgnitePredicate<CacheEvent> filter, int... cacheEvts) { + A.notNull(cacheName, "Cache name"); + + TaskRemoteFilter rmtLsnr = new TaskRemoteFilter(cacheName, filter); + + try { + synchronized (this) { + if (isRunning) + return; + + isRunning = true; + + rmtLsnrId = ignite.events(ignite.cluster().forCacheNodes(cacheName)) + .remoteListen(locLsnr, rmtLsnr, cacheEvts); + } + } + catch (IgniteException e) { + log.error("Failed to register event listener!", e); + + throw e; + } + } + + /** + * Transfers data from grid. + * + * @param ctx SourceContext. + */ + @Override public void run(SourceContext<CacheEvent> ctx) { + List<CacheEvent> evts = new ArrayList<>(evtBatchSize); + + try { + while (isRunning) { + // block here for some time if there is no events from source + CacheEvent firstEvt = evtBuf.poll(1, TimeUnit.SECONDS); + + if (firstEvt != null) + evts.add(firstEvt); + + if (evtBuf.drainTo(evts, evtBatchSize) > 0) { + synchronized (ctx.getCheckpointLock()) { + for (CacheEvent evt : evts) + ctx.collect(evt); + + evts.clear(); + } + } + } + } + catch (Exception e) { + if (X.hasCause(e, InterruptedException.class)) + return; // Executing thread can be interrupted see cancel() javadoc. + + log.error("Error while processing cache event of " + cacheName, e); + } + } + + /** {@inheritDoc} */ + @Override public void cancel() { + synchronized (this) { + if (!isRunning) + return; + + isRunning = false; + + if (rmtLsnrId != null && ignite != null) { + ignite.events(ignite.cluster().forCacheNodes(cacheName)) + .stopRemoteListen(rmtLsnrId); + + rmtLsnrId = null; + } + } + } + + /** + * Local listener buffering cache events to be further sent to Flink. + */ + private class TaskLocalListener implements IgniteBiPredicate<UUID, CacheEvent> { + /** {@inheritDoc} */ + @Override public boolean apply(UUID id, CacheEvent evt) { + try { + if (!evtBuf.offer(evt, evtBufTimeout, TimeUnit.MILLISECONDS)) + log.error("Failed to buffer event {}", evt.name()); + } + catch (InterruptedException ignored) { + log.error("Failed to buffer event using local task listener {}", evt.name()); + + Thread.currentThread().interrupt(); // Restore interrupt flag. + } + + return true; + } + } +} + http://git-wip-us.apache.org/repos/asf/ignite/blob/3b9c415e/modules/flink/src/main/java/org/apache/ignite/source/flink/TaskRemoteFilter.java ---------------------------------------------------------------------- diff --git a/modules/flink/src/main/java/org/apache/ignite/source/flink/TaskRemoteFilter.java b/modules/flink/src/main/java/org/apache/ignite/source/flink/TaskRemoteFilter.java new file mode 100644 index 0000000..4c89d25 --- /dev/null +++ b/modules/flink/src/main/java/org/apache/ignite/source/flink/TaskRemoteFilter.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.source.flink; + +import org.apache.ignite.Ignite; +import org.apache.ignite.cache.affinity.Affinity; +import org.apache.ignite.events.CacheEvent; +import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.resources.IgniteInstanceResource; + +/** + * Remote filter. + */ +public class TaskRemoteFilter implements IgnitePredicate<CacheEvent> { + /** Serial version Id. */ + private static final long serialVersionUID = 1L; + + /** Ignite Instance Resource. */ + @IgniteInstanceResource + private Ignite ignite; + + /** Cache name. */ + private final String cacheName; + + /** User-defined filter. */ + private final IgnitePredicate<CacheEvent> filter; + + /** + * @param cacheName Cache name. + * @param filter IgnitePredicate. + */ + TaskRemoteFilter(String cacheName, IgnitePredicate<CacheEvent> filter) { + this.cacheName = cacheName; + this.filter = filter; + } + + /** {@inheritDoc} */ + @Override public boolean apply(CacheEvent evt) { + Affinity<Object> affinity = ignite.affinity(cacheName); + + // Process this event. Ignored on backups. + return affinity.isPrimary(ignite.cluster().localNode(), evt.key()) && + (filter == null || filter.apply(evt)); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/3b9c415e/modules/flink/src/main/java/org/apache/ignite/source/flink/package-info.java ---------------------------------------------------------------------- diff --git a/modules/flink/src/main/java/org/apache/ignite/source/flink/package-info.java b/modules/flink/src/main/java/org/apache/ignite/source/flink/package-info.java new file mode 100644 index 0000000..adc33fc --- /dev/null +++ b/modules/flink/src/main/java/org/apache/ignite/source/flink/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * IgniteSource -- source connector integration with Apache Flink. + */ +package org.apache.ignite.source.flink; http://git-wip-us.apache.org/repos/asf/ignite/blob/3b9c415e/modules/flink/src/test/java/org/apache/ignite/source/flink/FlinkIgniteSourceSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/flink/src/test/java/org/apache/ignite/source/flink/FlinkIgniteSourceSelfTest.java b/modules/flink/src/test/java/org/apache/ignite/source/flink/FlinkIgniteSourceSelfTest.java new file mode 100644 index 0000000..95a98dc --- /dev/null +++ b/modules/flink/src/test/java/org/apache/ignite/source/flink/FlinkIgniteSourceSelfTest.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.source.flink; + +import java.util.UUID; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCluster; +import org.apache.ignite.IgniteEvents; +import org.apache.ignite.cluster.ClusterGroup; +import org.apache.ignite.events.CacheEvent; +import org.apache.ignite.events.EventType; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.util.lang.GridAbsPredicate; +import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Tests for {@link IgniteSource}. + */ +@RunWith(JUnit4.class) +public class FlinkIgniteSourceSelfTest extends GridCommonAbstractTest { + /** Cache name. */ + private static final String TEST_CACHE = "testCache"; + + /** Flink source context. */ + private SourceFunction.SourceContext<CacheEvent> ctx; + + /** Ignite instance. */ + private Ignite ignite; + + /** Cluster Group */ + private ClusterGroup clsGrp; + + /** Ignite Source instance */ + private IgniteSource igniteSrc; + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Before + public void setUpTest() throws Exception { + ctx = mock(SourceFunction.SourceContext.class); + ignite = mock(Ignite.class); + clsGrp = mock(ClusterGroup.class); + + IgniteEvents igniteEvts = mock(IgniteEvents.class); + IgniteCluster igniteCluster = mock(IgniteCluster.class); + TaskRemoteFilter taskRemoteFilter = mock(TaskRemoteFilter.class); + + when(ctx.getCheckpointLock()).thenReturn(new Object()); + when(ignite.events(clsGrp)).thenReturn(igniteEvts); + when(ignite.cluster()).thenReturn(igniteCluster); + + igniteSrc = new IgniteSource(TEST_CACHE); + igniteSrc.setIgnite(ignite); + igniteSrc.setEvtBatchSize(1); + igniteSrc.setEvtBufTimeout(1); + igniteSrc.setRuntimeContext(createRuntimeContext()); + + IgniteBiPredicate locLsnr = igniteSrc.getLocLsnr(); + + when(igniteEvts.remoteListen(locLsnr, taskRemoteFilter, EventType.EVT_CACHE_OBJECT_PUT )) + .thenReturn(UUID.randomUUID()); + + when(igniteCluster.forCacheNodes(TEST_CACHE)).thenReturn(clsGrp); + } + + /** */ + @After + public void tearDownTest() { + igniteSrc.cancel(); + } + + /** Creates streaming runtime context */ + private RuntimeContext createRuntimeContext() { + StreamingRuntimeContext runtimeCtx = mock(StreamingRuntimeContext.class); + + when(runtimeCtx.isCheckpointingEnabled()).thenReturn(true); + + return runtimeCtx; + } + + /** + * Tests Ignite source start operation. + * + * @throws Exception If failed. + */ + @Test + public void testIgniteSourceStart() throws Exception { + igniteSrc.start(null, EventType.EVT_CACHE_OBJECT_PUT); + + verify(ignite.events(clsGrp), times(1)); + } + + /** + * Tests Ignite source run operation. + * + * @throws Exception If failed. + */ + @Test + public void testIgniteSourceRun() throws Exception { + IgniteInternalFuture f = GridTestUtils.runAsync(new Runnable() { + @Override public void run() { + try { + igniteSrc.start(null, EventType.EVT_CACHE_OBJECT_PUT); + + igniteSrc.run(ctx); + } + catch (Throwable e) { + igniteSrc.cancel(); + + throw new AssertionError("Unexpected failure.", e); + } + } + }); + + long endTime = System.currentTimeMillis() + 2000; + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return f.isDone() || System.currentTimeMillis() > endTime; + } + }, 3000); + + igniteSrc.cancel(); + + f.get(3000); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/3b9c415e/modules/flink/src/test/java/org/apache/ignite/source/flink/FlinkIgniteSourceSelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/flink/src/test/java/org/apache/ignite/source/flink/FlinkIgniteSourceSelfTestSuite.java b/modules/flink/src/test/java/org/apache/ignite/source/flink/FlinkIgniteSourceSelfTestSuite.java new file mode 100644 index 0000000..3c144d9 --- /dev/null +++ b/modules/flink/src/test/java/org/apache/ignite/source/flink/FlinkIgniteSourceSelfTestSuite.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.source.flink; + +import junit.framework.JUnit4TestAdapter; +import junit.framework.TestSuite; +import org.junit.runner.RunWith; +import org.junit.runners.AllTests; + +/** + * Apache Flink source tests. + */ +@RunWith(AllTests.class) +public class FlinkIgniteSourceSelfTestSuite extends TestSuite { + /** + * @return Test suite. + * @throws Exception Thrown in case of the failure. + */ + public static TestSuite suite() throws Exception { + TestSuite suite = new TestSuite("Apache Flink Source Test Suite"); + + suite.addTest(new JUnit4TestAdapter(FlinkIgniteSourceSelfTest.class)); + + return suite; + } +} + http://git-wip-us.apache.org/repos/asf/ignite/blob/3b9c415e/parent/pom.xml ---------------------------------------------------------------------- diff --git a/parent/pom.xml b/parent/pom.xml index 841a45b..00e5634 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -467,10 +467,14 @@ <packages>org.apache.ignite.osgi*</packages> </group> <group> - <title>Flink integration</title> + <title>Flink Sink Integration</title> <packages>org.apache.ignite.sink.flink*</packages> </group> <group> + <title>Flink Source Integration</title> + <packages>org.apache.ignite.source.flink*</packages> + </group> + <group> <title>SpringData integration</title> <packages>org.apache.ignite.springdata.repository*</packages> </group>