IGNITE-529 Initial implementation
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9a5fc056 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9a5fc056 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9a5fc056 Branch: refs/heads/ignite-801 Commit: 9a5fc056786494e748c982e0c766fa9842ba523e Parents: b382062 Author: shtykh_roman <rsht...@yahoo.com> Authored: Tue Nov 17 17:38:12 2015 +0300 Committer: Anton Vinogradov <a...@apache.org> Committed: Tue Nov 17 17:38:12 2015 +0300 ---------------------------------------------------------------------- modules/flume/README.md | 40 ++++ modules/flume/pom.xml | 77 ++++++++ .../ignite/stream/flume/EventTransformer.java | 36 ++++ .../apache/ignite/stream/flume/IgniteSink.java | 186 +++++++++++++++++++ .../stream/flume/IgniteSinkConstants.java | 35 ++++ .../ignite/stream/flume/IgniteSinkTest.java | 142 ++++++++++++++ .../stream/flume/IgniteSinkTestSuite.java | 37 ++++ .../stream/flume/TestEventTransformer.java | 66 +++++++ .../flume/src/test/resources/example-ignite.xml | 71 +++++++ pom.xml | 1 + 10 files changed, 691 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/9a5fc056/modules/flume/README.md ---------------------------------------------------------------------- diff --git a/modules/flume/README.md b/modules/flume/README.md new file mode 100644 index 0000000..2247cf3 --- /dev/null +++ b/modules/flume/README.md @@ -0,0 +1,40 @@ +#Flume NG sink + +## Setting up and running + +1. Create a transformer by implementing EventTransformer interface. +2. Build it and copy to ${FLUME_HOME}/plugins.d/ignite-sink/lib. +3. Copy other Ignite-related jar files to ${FLUME_HOME}/plugins.d/ignite-sink/libext to have them as shown below. + +``` +plugins.d/ +`-- ignite + |-- lib + | `-- ignite-flume-transformer-x.x.x.jar <-- your jar + `-- libext + |-- cache-api-1.0.0.jar + |-- ignite-core-x.x.x.jar + |-- ignite-flume-x.x.x.jar + |-- ignite-spring-x.x.x.jar + |-- spring-aop-4.1.0.RELEASE.jar + |-- spring-beans-4.1.0.RELEASE.jar + |-- spring-context-4.1.0.RELEASE.jar + |-- spring-core-4.1.0.RELEASE.jar + `-- spring-expression-4.1.0.RELEASE.jar +``` + +4. In Flume configuration file, specify Ignite configuration XML file's location with cache properties +(see [Apache Ignite](https://apacheignite.readme.io/) with cache name specified for cache creation, +cache name (same as in Ignite configuration file), your EventTransformer's implementation class, +and, optionally, batch size (default -- 100). + +``` +# Describe the sink +a1.sinks.k1.type = org.apache.ignite.stream.flume.IgniteSink +a1.sinks.k1.igniteCfg = /some-path/ignite.xml +a1.sinks.k1.cacheName = testCache +a1.sinks.k1.eventTransformer = my.company.MyEventTransformer +a1.sinks.k1.batchSize = 100 +``` + +After specifying your source and channel (see Flume's docs), you are ready to run a Flume agent. http://git-wip-us.apache.org/repos/asf/ignite/blob/9a5fc056/modules/flume/pom.xml ---------------------------------------------------------------------- diff --git a/modules/flume/pom.xml b/modules/flume/pom.xml new file mode 100644 index 0000000..cd4ee98 --- /dev/null +++ b/modules/flume/pom.xml @@ -0,0 +1,77 @@ +<?xml version="1.0" encoding="UTF-8"?> + +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<!-- + POM file. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-parent</artifactId> + <version>1</version> + <relativePath>../../parent</relativePath> + </parent> + + <artifactId>ignite-flume</artifactId> + <version>1.5.0-SNAPSHOT</version> + <url>http://ignite.apache.org</url> + + <properties> + <flume-ng.version>1.6.0</flume-ng.version> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-core</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flume</groupId> + <artifactId>flume-ng-core</artifactId> + <version>${flume-ng.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-spring</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-log4j</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-core</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/ignite/blob/9a5fc056/modules/flume/src/main/java/org/apache/ignite/stream/flume/EventTransformer.java ---------------------------------------------------------------------- diff --git a/modules/flume/src/main/java/org/apache/ignite/stream/flume/EventTransformer.java b/modules/flume/src/main/java/org/apache/ignite/stream/flume/EventTransformer.java new file mode 100644 index 0000000..e85a98b --- /dev/null +++ b/modules/flume/src/main/java/org/apache/ignite/stream/flume/EventTransformer.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.stream.flume; + +import java.util.List; +import java.util.Map; +import org.jetbrains.annotations.Nullable; + +/** + * Flume event transformer to convert a list of Flume {@link Event} to cache entries. + */ +public interface EventTransformer<Event, K, V> { + + /** + * Transforms a list of Flume {@link Event} to cache entries. + * + * @param events List of Flume events to transform. + * @return Cache entries to be written into the grid. + */ + @Nullable Map<K, V> transform(List<Event> events); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/9a5fc056/modules/flume/src/main/java/org/apache/ignite/stream/flume/IgniteSink.java ---------------------------------------------------------------------- diff --git a/modules/flume/src/main/java/org/apache/ignite/stream/flume/IgniteSink.java b/modules/flume/src/main/java/org/apache/ignite/stream/flume/IgniteSink.java new file mode 100644 index 0000000..e6e7e90 --- /dev/null +++ b/modules/flume/src/main/java/org/apache/ignite/stream/flume/IgniteSink.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.stream.flume; + +import java.util.ArrayList; +import java.util.List; +import org.apache.flume.Channel; +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.EventDeliveryException; +import org.apache.flume.FlumeException; +import org.apache.flume.Transaction; +import org.apache.flume.conf.Configurable; +import org.apache.flume.instrumentation.SinkCounter; +import org.apache.flume.sink.AbstractSink; +import org.apache.ignite.Ignite; +import org.apache.ignite.Ignition; +import org.apache.ignite.internal.util.typedef.internal.A; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Flume sink for Apache Ignite. + */ +public class IgniteSink extends AbstractSink implements Configurable { + /** Logger. */ + private static final Logger log = LoggerFactory.getLogger(IgniteSink.class); + + /** Default batch size. */ + private static final int DFLT_BATCH_SIZE = 100; + + /** Ignite configuration file. */ + private String springCfgPath; + + /** Cache name. */ + private String cacheName; + + /** Event transformer implementation class. */ + private String eventTransformerCls; + + /** Number of events to be written per Flume transaction. */ + private int batchSize; + + /** Monitoring counter. */ + private SinkCounter sinkCounter; + + /** Event transformer. */ + private EventTransformer<Event, Object, Object> eventTransformer; + + /** Ignite instance. */ + private Ignite ignite; + + /** Empty constructor. */ + public IgniteSink() { + } + + /** + * Sink configurations with Ignite-specific settings. + * + * @param context Context for sink. + */ + @Override public void configure(Context context) { + springCfgPath = context.getString(IgniteSinkConstants.CFG_PATH); + cacheName = context.getString(IgniteSinkConstants.CFG_CACHE_NAME); + eventTransformerCls = context.getString(IgniteSinkConstants.CFG_EVENT_TRANSFORMER); + batchSize = context.getInteger(IgniteSinkConstants.CFG_BATCH_SIZE, DFLT_BATCH_SIZE); + + if (sinkCounter == null) + sinkCounter = new SinkCounter(getName()); + } + + /** + * Starts a grid and initializes na event transformer. + */ + @SuppressWarnings("unchecked") + @Override synchronized public void start() { + A.notNull(springCfgPath, "Ignite config file"); + A.notNull(cacheName, "Cache name"); + A.notNull(eventTransformerCls, "Event transformer class"); + + sinkCounter.start(); + + try { + if (ignite == null) + ignite = Ignition.start(springCfgPath); + + if (eventTransformerCls != null && !eventTransformerCls.isEmpty()) { + Class<? extends EventTransformer> clazz = + (Class<? extends EventTransformer<Event, Object, Object>>)Class.forName(eventTransformerCls); + + eventTransformer = clazz.newInstance(); + } + } + catch (Exception e) { + log.error("Failed to start grid", e); + + throw new FlumeException("Failed to start grid", e); + } + + super.start(); + } + + /** + * Stops the grid. + */ + @Override synchronized public void stop() { + if (ignite != null) + ignite.close(); + + sinkCounter.stop(); + + super.stop(); + } + + /** + * Processes Flume events. + */ + @Override public Status process() throws EventDeliveryException { + Channel channel = getChannel(); + + Transaction transaction = channel.getTransaction(); + + int eventCount = 0; + + try { + transaction.begin(); + + List<Event> batch = new ArrayList<>(batchSize); + + for (; eventCount < batchSize; ++eventCount) { + Event event = channel.take(); + + if (event == null) { + break; + } + + batch.add(event); + } + + if (!batch.isEmpty()) { + ignite.cache(cacheName).putAll(eventTransformer.transform(batch)); + + if (batch.size() < batchSize) + sinkCounter.incrementBatchUnderflowCount(); + else + sinkCounter.incrementBatchCompleteCount(); + } + else { + sinkCounter.incrementBatchEmptyCount(); + } + + sinkCounter.addToEventDrainAttemptCount(batch.size()); + + transaction.commit(); + + sinkCounter.addToEventDrainSuccessCount(batch.size()); + } + catch (Exception e) { + log.error("Failed to process events", e); + + transaction.rollback(); + + throw new EventDeliveryException(e); + } + finally { + transaction.close(); + } + + return eventCount == 0 ? Status.BACKOFF : Status.READY; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/9a5fc056/modules/flume/src/main/java/org/apache/ignite/stream/flume/IgniteSinkConstants.java ---------------------------------------------------------------------- diff --git a/modules/flume/src/main/java/org/apache/ignite/stream/flume/IgniteSinkConstants.java b/modules/flume/src/main/java/org/apache/ignite/stream/flume/IgniteSinkConstants.java new file mode 100644 index 0000000..ddefb24 --- /dev/null +++ b/modules/flume/src/main/java/org/apache/ignite/stream/flume/IgniteSinkConstants.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.stream.flume; + +/** + * Configuration strings for sending data to cache. Used in Flume agent configuration file. + */ +public class IgniteSinkConstants { + /** Ignite configuration file path. */ + public static final String CFG_PATH = "igniteCfg"; + + /** Cache name. */ + public static final String CFG_CACHE_NAME = "cacheName"; + + /** Event transformer implementation. */ + public static final String CFG_EVENT_TRANSFORMER = "eventTransformer"; + + /** Batch size. */ + public static final String CFG_BATCH_SIZE = "batchSize"; +} http://git-wip-us.apache.org/repos/asf/ignite/blob/9a5fc056/modules/flume/src/test/java/org/apache/ignite/stream/flume/IgniteSinkTest.java ---------------------------------------------------------------------- diff --git a/modules/flume/src/test/java/org/apache/ignite/stream/flume/IgniteSinkTest.java b/modules/flume/src/test/java/org/apache/ignite/stream/flume/IgniteSinkTest.java new file mode 100644 index 0000000..2f33ed4 --- /dev/null +++ b/modules/flume/src/test/java/org/apache/ignite/stream/flume/IgniteSinkTest.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.stream.flume; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.apache.flume.Channel; +import org.apache.flume.Context; +import org.apache.flume.Sink; +import org.apache.flume.Transaction; +import org.apache.flume.channel.MemoryChannel; +import org.apache.flume.conf.Configurables; +import org.apache.flume.event.EventBuilder; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CachePeekMode; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.events.Event; +import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT; + +/** + * {@link IgniteSink} test. + */ +public class IgniteSinkTest extends GridCommonAbstractTest { + /** Number of events to be sent to memory channel. */ + private static final int EVENT_CNT = 10000; + + /** Cache name. */ + private static final String CACHE_NAME = "testCache"; + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + } + + /** + * @throws Exception {@link Exception}. + */ + public void testSink() throws Exception { + IgniteConfiguration cfg = loadConfiguration("modules/flume/src/test/resources/example-ignite.xml"); + + cfg.setClientMode(false); + + final Ignite grid = startGrid("igniteServerNode", cfg); + + Context channelContext = new Context(); + + channelContext.put("capacity", String.valueOf(EVENT_CNT)); + channelContext.put("transactionCapacity", String.valueOf(EVENT_CNT)); + + Channel memoryChannel = new MemoryChannel(); + + Configurables.configure(memoryChannel, channelContext); + + final CountDownLatch latch = new CountDownLatch(EVENT_CNT); + + final IgnitePredicate<Event> putLsnr = new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + assert evt != null; + + latch.countDown(); + + return true; + } + }; + + IgniteSink sink = new IgniteSink() { + // Setting the listener on cache before sink processing starts. + @Override synchronized public void start() { + super.start(); + + grid.events(grid.cluster().forCacheNodes(CACHE_NAME)).localListen(putLsnr, EVT_CACHE_OBJECT_PUT); + } + }; + + sink.setName("IgniteSink"); + sink.setChannel(memoryChannel); + + Context ctx = new Context(); + + ctx.put(IgniteSinkConstants.CFG_CACHE_NAME, CACHE_NAME); + ctx.put(IgniteSinkConstants.CFG_PATH, "example-ignite.xml"); + ctx.put(IgniteSinkConstants.CFG_EVENT_TRANSFORMER, "org.apache.ignite.stream.flume.TestEventTransformer"); + + Configurables.configure(sink, ctx); + + sink.start(); + + try { + Transaction tx = memoryChannel.getTransaction(); + + tx.begin(); + + for (int i = 0; i < EVENT_CNT; i++) + memoryChannel.put(EventBuilder.withBody((String.valueOf(i) + ": " + i).getBytes())); + + tx.commit(); + tx.close(); + + Sink.Status status = Sink.Status.READY; + + while (status != Sink.Status.BACKOFF) { + status = sink.process(); + } + } + finally { + sink.stop(); + } + + // Checks that 10000 events successfully processed in 10 seconds. + assertTrue(latch.await(10, TimeUnit.SECONDS)); + + grid.events(grid.cluster().forCacheNodes(CACHE_NAME)).stopLocalListen(putLsnr); + + IgniteCache<String, Integer> cache = grid.cache(CACHE_NAME); + + // Checks that each event was processed properly. + for (int i = 0; i < EVENT_CNT; i++) { + assertEquals(i, (int)cache.get(String.valueOf(i))); + } + + assertEquals(EVENT_CNT, cache.size(CachePeekMode.PRIMARY)); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/9a5fc056/modules/flume/src/test/java/org/apache/ignite/stream/flume/IgniteSinkTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/flume/src/test/java/org/apache/ignite/stream/flume/IgniteSinkTestSuite.java b/modules/flume/src/test/java/org/apache/ignite/stream/flume/IgniteSinkTestSuite.java new file mode 100644 index 0000000..ad6d162 --- /dev/null +++ b/modules/flume/src/test/java/org/apache/ignite/stream/flume/IgniteSinkTestSuite.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.stream.flume; + +import junit.framework.TestSuite; + +/** + * Tests for a Flume sink for Ignite. + */ +public class IgniteSinkTestSuite extends TestSuite { + /** + * @return Test suite. + * @throws Exception Thrown in case of the failure. + */ + public static TestSuite suite() throws Exception { + TestSuite suite = new TestSuite("Apache Flume NG Sink Test Suite"); + + suite.addTest(new TestSuite(IgniteSinkTest.class)); + + return suite; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/9a5fc056/modules/flume/src/test/java/org/apache/ignite/stream/flume/TestEventTransformer.java ---------------------------------------------------------------------- diff --git a/modules/flume/src/test/java/org/apache/ignite/stream/flume/TestEventTransformer.java b/modules/flume/src/test/java/org/apache/ignite/stream/flume/TestEventTransformer.java new file mode 100644 index 0000000..c15efbf --- /dev/null +++ b/modules/flume/src/test/java/org/apache/ignite/stream/flume/TestEventTransformer.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.stream.flume; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.flume.Event; +import org.jetbrains.annotations.Nullable; + +/** + * A test transformer to convert {@link org.apache.flume.Event} to cacheable entries. + */ +public class TestEventTransformer implements EventTransformer<Event, String, Integer> { + + /** + * Transforms a Flume event to cacheable entries. + * + * @param event Flume event to transform. + * @return Map of cacheable entries. + */ + private Map<String, Integer> transform(Event event) { + final Map<String, Integer> map = new HashMap<>(); + + String eventStr = new String(event.getBody()); + + if (!eventStr.isEmpty()) { + String[] tokens = eventStr.split(":"); // Expects column-delimited one line. + + map.put(tokens[0].trim(), Integer.valueOf(tokens[1].trim())); + } + + return map; + } + + /** + * Transforms a list of Flume event to cacheable entries. + * + * @param events Flume events to transform. + * @return Map of cacheable entries. + */ + @Nullable @Override public Map<String, Integer> transform(List<Event> events) { + final Map<String, Integer> map = new HashMap<>(); + + for (Event event : events) { + map.putAll(transform(event)); + } + + return map; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/9a5fc056/modules/flume/src/test/resources/example-ignite.xml ---------------------------------------------------------------------- diff --git a/modules/flume/src/test/resources/example-ignite.xml b/modules/flume/src/test/resources/example-ignite.xml new file mode 100644 index 0000000..fbb05d3 --- /dev/null +++ b/modules/flume/src/test/resources/example-ignite.xml @@ -0,0 +1,71 @@ +<?xml version="1.0" encoding="UTF-8"?> + +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<!-- + Ignite configuration with all defaults and enabled p2p deployment and enabled events. + Used for testing IgniteSink running Ignite in a client mode. +--> +<beans xmlns="http://www.springframework.org/schema/beans" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xmlns:util="http://www.springframework.org/schema/util" + xsi:schemaLocation=" + http://www.springframework.org/schema/beans + http://www.springframework.org/schema/beans/spring-beans.xsd + http://www.springframework.org/schema/util + http://www.springframework.org/schema/util/spring-util.xsd"> + <bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration"> + <!-- Enable client mode. --> + <property name="clientMode" value="true"/> + + <!-- Cache accessed from IgniteSink. --> + <property name="cacheConfiguration"> + <list> + <!-- Partitioned cache example configuration with configurations adjusted to server nodes'. --> + <bean class="org.apache.ignite.configuration.CacheConfiguration"> + <property name="atomicityMode" value="ATOMIC"/> + + <property name="name" value="testCache"/> + </bean> + </list> + </property> + + <!-- Enable cache events. --> + <property name="includeEventTypes"> + <list> + <!-- Cache events (only EVT_CACHE_OBJECT_PUT for tests). --> + <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/> + </list> + </property> + + <!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. --> + <property name="discoverySpi"> + <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi"> + <property name="ipFinder"> + <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder"> + <property name="addresses"> + <list> + <value>127.0.0.1:47500</value> + </list> + </property> + </bean> + </property> + </bean> + </property> + </bean> +</beans> http://git-wip-us.apache.org/repos/asf/ignite/blob/9a5fc056/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 5f06555..c40b551 100644 --- a/pom.xml +++ b/pom.xml @@ -73,6 +73,7 @@ <module>modules/cloud</module> <module>modules/mesos</module> <module>modules/kafka</module> + <module>modules/flume</module> <module>modules/yarn</module> <module>modules/jms11</module> <module>modules/mqtt</module>