IGNITE-529 Initial implementation

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9a5fc056
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9a5fc056
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9a5fc056

Branch: refs/heads/ignite-801
Commit: 9a5fc056786494e748c982e0c766fa9842ba523e
Parents: b382062
Author: shtykh_roman <rsht...@yahoo.com>
Authored: Tue Nov 17 17:38:12 2015 +0300
Committer: Anton Vinogradov <a...@apache.org>
Committed: Tue Nov 17 17:38:12 2015 +0300

----------------------------------------------------------------------
 modules/flume/README.md                         |  40 ++++
 modules/flume/pom.xml                           |  77 ++++++++
 .../ignite/stream/flume/EventTransformer.java   |  36 ++++
 .../apache/ignite/stream/flume/IgniteSink.java  | 186 +++++++++++++++++++
 .../stream/flume/IgniteSinkConstants.java       |  35 ++++
 .../ignite/stream/flume/IgniteSinkTest.java     | 142 ++++++++++++++
 .../stream/flume/IgniteSinkTestSuite.java       |  37 ++++
 .../stream/flume/TestEventTransformer.java      |  66 +++++++
 .../flume/src/test/resources/example-ignite.xml |  71 +++++++
 pom.xml                                         |   1 +
 10 files changed, 691 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/9a5fc056/modules/flume/README.md
----------------------------------------------------------------------
diff --git a/modules/flume/README.md b/modules/flume/README.md
new file mode 100644
index 0000000..2247cf3
--- /dev/null
+++ b/modules/flume/README.md
@@ -0,0 +1,40 @@
+#Flume NG sink
+
+## Setting up and running
+
+1. Create a transformer by implementing EventTransformer interface.
+2. Build it and copy to ${FLUME_HOME}/plugins.d/ignite-sink/lib.
+3. Copy other Ignite-related jar files to 
${FLUME_HOME}/plugins.d/ignite-sink/libext to have them as shown below.
+
+```
+plugins.d/
+`-- ignite
+    |-- lib
+    |   `-- ignite-flume-transformer-x.x.x.jar <-- your jar
+    `-- libext
+        |-- cache-api-1.0.0.jar
+        |-- ignite-core-x.x.x.jar
+        |-- ignite-flume-x.x.x.jar
+        |-- ignite-spring-x.x.x.jar
+        |-- spring-aop-4.1.0.RELEASE.jar
+        |-- spring-beans-4.1.0.RELEASE.jar
+        |-- spring-context-4.1.0.RELEASE.jar
+        |-- spring-core-4.1.0.RELEASE.jar
+        `-- spring-expression-4.1.0.RELEASE.jar
+```
+
+4. In Flume configuration file, specify Ignite configuration XML file's 
location with cache properties
+(see [Apache Ignite](https://apacheignite.readme.io/) with cache name 
specified for cache creation,
+cache name (same as in Ignite configuration file), your EventTransformer's 
implementation class,
+and, optionally, batch size (default -- 100).
+
+```
+# Describe the sink
+a1.sinks.k1.type = org.apache.ignite.stream.flume.IgniteSink
+a1.sinks.k1.igniteCfg = /some-path/ignite.xml
+a1.sinks.k1.cacheName = testCache
+a1.sinks.k1.eventTransformer = my.company.MyEventTransformer
+a1.sinks.k1.batchSize = 100
+```
+
+After specifying your source and channel (see Flume's docs), you are ready to 
run a Flume agent.

http://git-wip-us.apache.org/repos/asf/ignite/blob/9a5fc056/modules/flume/pom.xml
----------------------------------------------------------------------
diff --git a/modules/flume/pom.xml b/modules/flume/pom.xml
new file mode 100644
index 0000000..cd4ee98
--- /dev/null
+++ b/modules/flume/pom.xml
@@ -0,0 +1,77 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<!--
+    POM file.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.ignite</groupId>
+        <artifactId>ignite-parent</artifactId>
+        <version>1</version>
+        <relativePath>../../parent</relativePath>
+    </parent>
+
+    <artifactId>ignite-flume</artifactId>
+    <version>1.5.0-SNAPSHOT</version>
+    <url>http://ignite.apache.org</url>
+
+    <properties>
+        <flume-ng.version>1.6.0</flume-ng.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flume</groupId>
+            <artifactId>flume-ng-core</artifactId>
+            <version>${flume-ng.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-spring</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-log4j</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-core</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/ignite/blob/9a5fc056/modules/flume/src/main/java/org/apache/ignite/stream/flume/EventTransformer.java
----------------------------------------------------------------------
diff --git 
a/modules/flume/src/main/java/org/apache/ignite/stream/flume/EventTransformer.java
 
b/modules/flume/src/main/java/org/apache/ignite/stream/flume/EventTransformer.java
new file mode 100644
index 0000000..e85a98b
--- /dev/null
+++ 
b/modules/flume/src/main/java/org/apache/ignite/stream/flume/EventTransformer.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.flume;
+
+import java.util.List;
+import java.util.Map;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Flume event transformer to convert a list of Flume {@link Event} to cache 
entries.
+ */
+public interface EventTransformer<Event, K, V> {
+
+    /**
+     * Transforms a list of Flume {@link Event} to cache entries.
+     *
+     * @param events List of Flume events to transform.
+     * @return Cache entries to be written into the grid.
+     */
+    @Nullable Map<K, V> transform(List<Event> events);
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/9a5fc056/modules/flume/src/main/java/org/apache/ignite/stream/flume/IgniteSink.java
----------------------------------------------------------------------
diff --git 
a/modules/flume/src/main/java/org/apache/ignite/stream/flume/IgniteSink.java 
b/modules/flume/src/main/java/org/apache/ignite/stream/flume/IgniteSink.java
new file mode 100644
index 0000000..e6e7e90
--- /dev/null
+++ b/modules/flume/src/main/java/org/apache/ignite/stream/flume/IgniteSink.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.flume;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.FlumeException;
+import org.apache.flume.Transaction;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.instrumentation.SinkCounter;
+import org.apache.flume.sink.AbstractSink;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Flume sink for Apache Ignite.
+ */
+public class IgniteSink extends AbstractSink implements Configurable {
+    /** Logger. */
+    private static final Logger log = 
LoggerFactory.getLogger(IgniteSink.class);
+
+    /** Default batch size. */
+    private static final int DFLT_BATCH_SIZE = 100;
+
+    /** Ignite configuration file. */
+    private String springCfgPath;
+
+    /** Cache name. */
+    private String cacheName;
+
+    /** Event transformer implementation class. */
+    private String eventTransformerCls;
+
+    /** Number of events to be written per Flume transaction. */
+    private int batchSize;
+
+    /** Monitoring counter. */
+    private SinkCounter sinkCounter;
+
+    /** Event transformer. */
+    private EventTransformer<Event, Object, Object> eventTransformer;
+
+    /** Ignite instance. */
+    private Ignite ignite;
+
+    /** Empty constructor. */
+    public IgniteSink() {
+    }
+
+    /**
+     * Sink configurations with Ignite-specific settings.
+     *
+     * @param context Context for sink.
+     */
+    @Override public void configure(Context context) {
+        springCfgPath = context.getString(IgniteSinkConstants.CFG_PATH);
+        cacheName = context.getString(IgniteSinkConstants.CFG_CACHE_NAME);
+        eventTransformerCls = 
context.getString(IgniteSinkConstants.CFG_EVENT_TRANSFORMER);
+        batchSize = context.getInteger(IgniteSinkConstants.CFG_BATCH_SIZE, 
DFLT_BATCH_SIZE);
+
+        if (sinkCounter == null)
+            sinkCounter = new SinkCounter(getName());
+    }
+
+    /**
+     * Starts a grid and initializes na event transformer.
+     */
+    @SuppressWarnings("unchecked")
+    @Override synchronized public void start() {
+        A.notNull(springCfgPath, "Ignite config file");
+        A.notNull(cacheName, "Cache name");
+        A.notNull(eventTransformerCls, "Event transformer class");
+
+        sinkCounter.start();
+
+        try {
+            if (ignite == null)
+                ignite = Ignition.start(springCfgPath);
+
+            if (eventTransformerCls != null && !eventTransformerCls.isEmpty()) 
{
+                Class<? extends EventTransformer> clazz =
+                    (Class<? extends EventTransformer<Event, Object, 
Object>>)Class.forName(eventTransformerCls);
+
+                eventTransformer = clazz.newInstance();
+            }
+        }
+        catch (Exception e) {
+            log.error("Failed to start grid", e);
+
+            throw new FlumeException("Failed to start grid", e);
+        }
+
+        super.start();
+    }
+
+    /**
+     * Stops the grid.
+     */
+    @Override synchronized public void stop() {
+        if (ignite != null)
+            ignite.close();
+
+        sinkCounter.stop();
+
+        super.stop();
+    }
+
+    /**
+     * Processes Flume events.
+     */
+    @Override public Status process() throws EventDeliveryException {
+        Channel channel = getChannel();
+
+        Transaction transaction = channel.getTransaction();
+
+        int eventCount = 0;
+
+        try {
+            transaction.begin();
+
+            List<Event> batch = new ArrayList<>(batchSize);
+
+            for (; eventCount < batchSize; ++eventCount) {
+                Event event = channel.take();
+
+                if (event == null) {
+                    break;
+                }
+
+                batch.add(event);
+            }
+
+            if (!batch.isEmpty()) {
+                
ignite.cache(cacheName).putAll(eventTransformer.transform(batch));
+
+                if (batch.size() < batchSize)
+                    sinkCounter.incrementBatchUnderflowCount();
+                else
+                    sinkCounter.incrementBatchCompleteCount();
+            }
+            else {
+                sinkCounter.incrementBatchEmptyCount();
+            }
+
+            sinkCounter.addToEventDrainAttemptCount(batch.size());
+
+            transaction.commit();
+
+            sinkCounter.addToEventDrainSuccessCount(batch.size());
+        }
+        catch (Exception e) {
+            log.error("Failed to process events", e);
+
+            transaction.rollback();
+
+            throw new EventDeliveryException(e);
+        }
+        finally {
+            transaction.close();
+        }
+
+        return eventCount == 0 ? Status.BACKOFF : Status.READY;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/9a5fc056/modules/flume/src/main/java/org/apache/ignite/stream/flume/IgniteSinkConstants.java
----------------------------------------------------------------------
diff --git 
a/modules/flume/src/main/java/org/apache/ignite/stream/flume/IgniteSinkConstants.java
 
b/modules/flume/src/main/java/org/apache/ignite/stream/flume/IgniteSinkConstants.java
new file mode 100644
index 0000000..ddefb24
--- /dev/null
+++ 
b/modules/flume/src/main/java/org/apache/ignite/stream/flume/IgniteSinkConstants.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.flume;
+
+/**
+ * Configuration strings for sending data to cache. Used in Flume agent 
configuration file.
+ */
+public class IgniteSinkConstants {
+    /** Ignite configuration file path. */
+    public static final String CFG_PATH = "igniteCfg";
+
+    /** Cache name. */
+    public static final String CFG_CACHE_NAME = "cacheName";
+
+    /** Event transformer implementation. */
+    public static final String CFG_EVENT_TRANSFORMER = "eventTransformer";
+
+    /** Batch size. */
+    public static final String CFG_BATCH_SIZE = "batchSize";
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/9a5fc056/modules/flume/src/test/java/org/apache/ignite/stream/flume/IgniteSinkTest.java
----------------------------------------------------------------------
diff --git 
a/modules/flume/src/test/java/org/apache/ignite/stream/flume/IgniteSinkTest.java
 
b/modules/flume/src/test/java/org/apache/ignite/stream/flume/IgniteSinkTest.java
new file mode 100644
index 0000000..2f33ed4
--- /dev/null
+++ 
b/modules/flume/src/test/java/org/apache/ignite/stream/flume/IgniteSinkTest.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.flume;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Sink;
+import org.apache.flume.Transaction;
+import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.event.EventBuilder;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CachePeekMode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT;
+
+/**
+ * {@link IgniteSink} test.
+ */
+public class IgniteSinkTest extends GridCommonAbstractTest {
+    /** Number of events to be sent to memory channel. */
+    private static final int EVENT_CNT = 10000;
+
+    /** Cache name. */
+    private static final String CACHE_NAME = "testCache";
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception {@link Exception}.
+     */
+    public void testSink() throws Exception {
+        IgniteConfiguration cfg = 
loadConfiguration("modules/flume/src/test/resources/example-ignite.xml");
+
+        cfg.setClientMode(false);
+
+        final Ignite grid = startGrid("igniteServerNode", cfg);
+
+        Context channelContext = new Context();
+
+        channelContext.put("capacity", String.valueOf(EVENT_CNT));
+        channelContext.put("transactionCapacity", String.valueOf(EVENT_CNT));
+
+        Channel memoryChannel = new MemoryChannel();
+
+        Configurables.configure(memoryChannel, channelContext);
+
+        final CountDownLatch latch = new CountDownLatch(EVENT_CNT);
+
+        final IgnitePredicate<Event> putLsnr = new IgnitePredicate<Event>() {
+            @Override public boolean apply(Event evt) {
+                assert evt != null;
+
+                latch.countDown();
+
+                return true;
+            }
+        };
+
+        IgniteSink sink = new IgniteSink() {
+            // Setting the listener on cache before sink processing starts.
+            @Override synchronized public void start() {
+                super.start();
+
+                
grid.events(grid.cluster().forCacheNodes(CACHE_NAME)).localListen(putLsnr, 
EVT_CACHE_OBJECT_PUT);
+            }
+        };
+
+        sink.setName("IgniteSink");
+        sink.setChannel(memoryChannel);
+
+        Context ctx = new Context();
+
+        ctx.put(IgniteSinkConstants.CFG_CACHE_NAME, CACHE_NAME);
+        ctx.put(IgniteSinkConstants.CFG_PATH, "example-ignite.xml");
+        ctx.put(IgniteSinkConstants.CFG_EVENT_TRANSFORMER, 
"org.apache.ignite.stream.flume.TestEventTransformer");
+
+        Configurables.configure(sink, ctx);
+
+        sink.start();
+
+        try {
+            Transaction tx = memoryChannel.getTransaction();
+
+            tx.begin();
+
+            for (int i = 0; i < EVENT_CNT; i++)
+                memoryChannel.put(EventBuilder.withBody((String.valueOf(i) + 
": " + i).getBytes()));
+
+            tx.commit();
+            tx.close();
+
+            Sink.Status status = Sink.Status.READY;
+
+            while (status != Sink.Status.BACKOFF) {
+                status = sink.process();
+            }
+        }
+        finally {
+            sink.stop();
+        }
+
+        // Checks that 10000 events successfully processed in 10 seconds.
+        assertTrue(latch.await(10, TimeUnit.SECONDS));
+
+        
grid.events(grid.cluster().forCacheNodes(CACHE_NAME)).stopLocalListen(putLsnr);
+
+        IgniteCache<String, Integer> cache = grid.cache(CACHE_NAME);
+
+        // Checks that each event was processed properly.
+        for (int i = 0; i < EVENT_CNT; i++) {
+            assertEquals(i, (int)cache.get(String.valueOf(i)));
+        }
+
+        assertEquals(EVENT_CNT, cache.size(CachePeekMode.PRIMARY));
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/9a5fc056/modules/flume/src/test/java/org/apache/ignite/stream/flume/IgniteSinkTestSuite.java
----------------------------------------------------------------------
diff --git 
a/modules/flume/src/test/java/org/apache/ignite/stream/flume/IgniteSinkTestSuite.java
 
b/modules/flume/src/test/java/org/apache/ignite/stream/flume/IgniteSinkTestSuite.java
new file mode 100644
index 0000000..ad6d162
--- /dev/null
+++ 
b/modules/flume/src/test/java/org/apache/ignite/stream/flume/IgniteSinkTestSuite.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.flume;
+
+import junit.framework.TestSuite;
+
+/**
+ * Tests for a Flume sink for Ignite.
+ */
+public class IgniteSinkTestSuite extends TestSuite {
+    /**
+     * @return Test suite.
+     * @throws Exception Thrown in case of the failure.
+     */
+    public static TestSuite suite() throws Exception {
+        TestSuite suite = new TestSuite("Apache Flume NG Sink Test Suite");
+
+        suite.addTest(new TestSuite(IgniteSinkTest.class));
+
+        return suite;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/9a5fc056/modules/flume/src/test/java/org/apache/ignite/stream/flume/TestEventTransformer.java
----------------------------------------------------------------------
diff --git 
a/modules/flume/src/test/java/org/apache/ignite/stream/flume/TestEventTransformer.java
 
b/modules/flume/src/test/java/org/apache/ignite/stream/flume/TestEventTransformer.java
new file mode 100644
index 0000000..c15efbf
--- /dev/null
+++ 
b/modules/flume/src/test/java/org/apache/ignite/stream/flume/TestEventTransformer.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.flume;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.flume.Event;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * A test transformer to convert {@link org.apache.flume.Event} to cacheable 
entries.
+ */
+public class TestEventTransformer implements EventTransformer<Event, String, 
Integer> {
+
+    /**
+     * Transforms a Flume event to cacheable entries.
+     *
+     * @param event Flume event to transform.
+     * @return Map of cacheable entries.
+     */
+    private Map<String, Integer> transform(Event event) {
+        final Map<String, Integer> map = new HashMap<>();
+
+        String eventStr = new String(event.getBody());
+
+        if (!eventStr.isEmpty()) {
+            String[] tokens = eventStr.split(":"); // Expects column-delimited 
one line.
+
+            map.put(tokens[0].trim(), Integer.valueOf(tokens[1].trim()));
+        }
+
+        return map;
+    }
+
+    /**
+     * Transforms a list of Flume event to cacheable entries.
+     *
+     * @param events Flume events to transform.
+     * @return Map of cacheable entries.
+     */
+    @Nullable @Override public Map<String, Integer> transform(List<Event> 
events) {
+        final Map<String, Integer> map = new HashMap<>();
+
+        for (Event event : events) {
+            map.putAll(transform(event));
+        }
+
+        return map;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/9a5fc056/modules/flume/src/test/resources/example-ignite.xml
----------------------------------------------------------------------
diff --git a/modules/flume/src/test/resources/example-ignite.xml 
b/modules/flume/src/test/resources/example-ignite.xml
new file mode 100644
index 0000000..fbb05d3
--- /dev/null
+++ b/modules/flume/src/test/resources/example-ignite.xml
@@ -0,0 +1,71 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<!--
+    Ignite configuration with all defaults and enabled p2p deployment and 
enabled events.
+    Used for testing IgniteSink running Ignite in a client mode.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans";
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+       xmlns:util="http://www.springframework.org/schema/util";
+       xsi:schemaLocation="
+        http://www.springframework.org/schema/beans
+        http://www.springframework.org/schema/beans/spring-beans.xsd
+        http://www.springframework.org/schema/util
+        http://www.springframework.org/schema/util/spring-util.xsd";>
+    <bean id="ignite.cfg" 
class="org.apache.ignite.configuration.IgniteConfiguration">
+        <!-- Enable client mode. -->
+        <property name="clientMode" value="true"/>
+
+        <!-- Cache accessed from IgniteSink. -->
+        <property name="cacheConfiguration">
+            <list>
+                <!-- Partitioned cache example configuration with 
configurations adjusted to server nodes'. -->
+                <bean 
class="org.apache.ignite.configuration.CacheConfiguration">
+                    <property name="atomicityMode" value="ATOMIC"/>
+
+                    <property name="name" value="testCache"/>
+                </bean>
+            </list>
+        </property>
+
+        <!-- Enable cache events. -->
+        <property name="includeEventTypes">
+            <list>
+                <!-- Cache events (only EVT_CACHE_OBJECT_PUT for tests). -->
+                <util:constant 
static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/>
+            </list>
+        </property>
+
+        <!-- Explicitly configure TCP discovery SPI to provide list of initial 
nodes. -->
+        <property name="discoverySpi">
+            <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
+                <property name="ipFinder">
+                    <bean 
class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
+                        <property name="addresses">
+                            <list>
+                                <value>127.0.0.1:47500</value>
+                            </list>
+                        </property>
+                    </bean>
+                </property>
+            </bean>
+        </property>
+    </bean>
+</beans>

http://git-wip-us.apache.org/repos/asf/ignite/blob/9a5fc056/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 5f06555..c40b551 100644
--- a/pom.xml
+++ b/pom.xml
@@ -73,6 +73,7 @@
         <module>modules/cloud</module>
         <module>modules/mesos</module>
         <module>modules/kafka</module>
+        <module>modules/flume</module>
         <module>modules/yarn</module>
         <module>modules/jms11</module>
         <module>modules/mqtt</module>

Reply via email to