This is an automated email from the ASF dual-hosted git repository.

dmagda pushed a commit to branch IGNITE-7595
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/IGNITE-7595 by this push:
     new 74e1981  ported all streaming related docs from readme.io
74e1981 is described below

commit 74e19810cc0a84bf6976b22f2206aad907c34e47
Author: Denis Magda <dma...@gridgain.com>
AuthorDate: Fri Sep 25 16:17:44 2020 -0700

    ported all streaming related docs from readme.io
---
 docs/_data/toc.yaml                                |  22 +++
 .../streaming/camel-streamer.adoc                  | 139 ++++++++++++++
 .../streaming/flink-streamer.adoc                  |  64 +++++++
 .../streaming/flume-sink.adoc                      |  65 +++++++
 .../streaming/jms-streamer.adoc                    | 109 +++++++++++
 .../streaming/kafka-streamer.adoc                  | 207 +++++++++++++++++++++
 .../streaming/mqtt-streamer.adoc                   |  62 ++++++
 .../streaming/rocketmq-streamer.adoc               |  71 +++++++
 .../streaming/storm-streamer.adoc                  |  48 +++++
 .../streaming/twitter-streamer.adoc                |  51 +++++
 .../streaming/zeromq-streamer.adoc                 |  53 ++++++
 docs/_docs/images/integrations/camel-streamer.png  | Bin 0 -> 120217 bytes
 12 files changed, 891 insertions(+)

diff --git a/docs/_data/toc.yaml b/docs/_data/toc.yaml
index 356bbc8..2616397 100644
--- a/docs/_data/toc.yaml
+++ b/docs/_data/toc.yaml
@@ -399,6 +399,28 @@
       url: extensions-and-integrations/hibernate-l2-cache
     - title: MyBatis L2 Cache
       url: extensions-and-integrations/mybatis-l2-cache
+    - title: Streaming
+      items:
+        - title: Kafka Streamer
+          url: extensions-and-integrations/streaming/kafka-streamer
+        - title: Camel Streamer
+          url: extensions-and-integrations/streaming/camel-streamer
+        - title: Flink Streamer
+          url: extensions-and-integrations/streaming/flink-streamer
+        - title: Flume Sink
+          url: extensions-and-integrations/streaming/flume-sink
+        - title: JMS Streamer
+          url: extensions-and-integrations/streaming/jms-streamer
+        - title: MQTT Streamer
+          url: extensions-and-integrations/streaming/mqtt-streamer
+        - title: RocketMQ Streamer
+          url: extensions-and-integrations/streaming/rocketmq-streamer
+        - title: Storm Streamer
+          url: extensions-and-integrations/streaming/storm-streamer
+        - title: ZeroMQ Streamer
+          url: extensions-and-integrations/streaming/zeromq-streamer
+        - title: Twitter Streamer
+          url: extensions-and-integrations/streaming/twitter-streamer
 - title: Plugins
   url: plugins
 - title: SQL Reference
diff --git 
a/docs/_docs/extensions-and-integrations/streaming/camel-streamer.adoc 
b/docs/_docs/extensions-and-integrations/streaming/camel-streamer.adoc
new file mode 100644
index 0000000..36614403
--- /dev/null
+++ b/docs/_docs/extensions-and-integrations/streaming/camel-streamer.adoc
@@ -0,0 +1,139 @@
+= Apache Camel Streamer
+
+== Overview
+
+This documentation page focuses on the Apache Camel, which can also be thought 
of as a universal streamer because it
+allows you to consume from any technology or protocol supported by Camel into 
an Ignite Cache.
+
+image::images/integrations/camel-streamer.png[Camel Streamer]
+
+With this streamer, you can ingest entries straight into an Ignite cache based 
on:
+
+* Calls received on a Web Service (SOAP or REST), by extracting the body or 
headers.
+* Listening on a TCP or UDP channel for messages.
+* The content of files received via FTP or written to the local filesystem.
+* Email messages received via POP3 or IMAP.
+* A MongoDB tailable cursor.
+* An AWS SQS queue.
+* And many others.
+
+This streamer supports two modes of ingestion: **direct ingestion** and 
**mediated ingestion**.
+
+[NOTE]
+====
+[discrete]
+=== The Ignite Camel Component
+There is also the 
https://camel.apache.org/components/latest/ignite-summary.html[camel-ignite, 
window=_blank] component, if what you are looking is
+to interact with Ignite Caches, Compute, Events, Messaging, etc. from within a 
Camel route.
+====
+
+== Maven Dependency
+
+To make use of the `ignite-camel` streamer, you need to add the following 
dependency:
+
+[tabs]
+--
+tab:pom.xml[]
+[source,xml]
+----
+<dependency>
+    <groupId>org.apache.ignite</groupId>
+    <artifactId>ignite-camel</artifactId>
+    <version>${ignite.version}</version>
+</dependency>
+----
+--
+
+It will also pull in `camel-core` as a transitive dependency.
+
+== Direct Ingestion
+
+Direct Ingestion allows you to consume from any Camel endpoint straight into 
Ignite, with the help of a
+Tuple Extractor. We call this **direct ingestion**.
+
+Here is a code sample:
+[tabs]
+--
+tab:Java[]
+[source,java]
+----
+// Start Apache Ignite.
+Ignite ignite = Ignition.start();
+
+// Create an streamer pipe which ingests into the 'mycache' cache.
+IgniteDataStreamer<String, String> pipe = ignite.dataStreamer("mycache");
+
+// Create a Camel streamer and connect it.
+CamelStreamer<String, String> streamer = new CamelStreamer<>();
+streamer.setIgnite(ignite);
+streamer.setStreamer(pipe);
+
+// This endpoint starts a Jetty server and consumes from all network 
interfaces on port 8080 and context path /ignite.
+streamer.setEndpointUri("jetty:http://0.0.0.0:8080/ignite?httpMethodRestrict=POST";);
+
+// This is the tuple extractor. We'll assume each message contains only one 
tuple.
+// If your message contains multiple tuples, use a 
StreamMultipleTupleExtractor.
+// The Tuple Extractor receives the Camel Exchange and returns a 
Map.Entry<?,?> with the key and value.
+streamer.setSingleTupleExtractor(new StreamSingleTupleExtractor<Exchange, 
String, String>() {
+    @Override public Map.Entry<String, String> extract(Exchange exchange) {
+        String stationId = exchange.getIn().getHeader("X-StationId", 
String.class);
+        String temperature = exchange.getIn().getBody(String.class);
+        return new GridMapEntry<>(stationId, temperature);
+    }
+});
+
+// Start the streamer.
+streamer.start();
+----
+--
+
+== Mediated Ingestion
+
+For more sophisticated scenarios, you can also create a Camel route that 
performs complex processing on incoming messages, e.g. transformations, 
validations, splitting, aggregating, idempotency, resequencing, enrichment, 
etc. and **ingest only the result into the Ignite cache**. 
+
+We call this **mediated ingestion**.
+
+[tabs]
+--
+tab:Java[]
+[source,java]
+----
+// Create a CamelContext with a custom route that will:
+//  (1) consume from our Jetty endpoint.
+//  (2) transform incoming JSON into a Java object with Jackson.
+//  (3) uses JSR 303 Bean Validation to validate the object.
+//  (4) dispatches to the direct:ignite.ingest endpoint, where the streamer is 
consuming from.
+CamelContext context = new DefaultCamelContext();
+context.addRoutes(new RouteBuilder() {
+    @Override
+    public void configure() throws Exception {
+        from("jetty:http://0.0.0.0:8080/ignite?httpMethodRestrict=POST";)
+            .unmarshal().json(JsonLibrary.Jackson)
+            .to("bean-validator:validate")
+            .to("direct:ignite.ingest");
+    }
+});
+
+// Remember our Streamer is now consuming from the Direct endpoint above.
+streamer.setEndpointUri("direct:ignite.ingest");
+----
+--
+
+== Setting a Response
+
+By default, the response sent back to the caller (if it is a synchronous 
endpoint) is simply an echo of the original request.
+If you want to customize​ the response, set a Camel `Processor` as a 
`responseProcessor`:
+
+[tabs]
+--
+tab:Java[]
+[source,java]
+----
+streamer.setResponseProcessor(new Processor() {
+    @Override public void process(Exchange exchange) throws Exception {
+        exchange.getOut().setHeader(Exchange.HTTP_RESPONSE_CODE, 200);
+        exchange.getOut().setBody("OK");
+    }
+});
+----
+--
diff --git 
a/docs/_docs/extensions-and-integrations/streaming/flink-streamer.adoc 
b/docs/_docs/extensions-and-integrations/streaming/flink-streamer.adoc
new file mode 100644
index 0000000..381d85e
--- /dev/null
+++ b/docs/_docs/extensions-and-integrations/streaming/flink-streamer.adoc
@@ -0,0 +1,64 @@
+= Apache Flink Streamer
+
+Apache Ignite Flink Sink module is a streaming connector to inject Flink data 
into Ignite cache. The sink emits its input
+data to Ignite cache. When creating a sink, an Ignite cache name and Ignite 
grid configuration file have to be provided.
+
+Starting data transfer to Ignite cache can be done with the following steps.
+
+. Import Ignite Flink Sink Module in Maven Project
+If you are using Maven to manage dependencies of your project, you can add 
Flink module
+dependency like this (replace `${ignite.version}` with actual Ignite version 
you are
+interested in):
++
+[tabs]
+--
+tab:pom.xml[]
+[source,xml]
+----
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
+                        http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    ...
+    <dependencies>
+        ...
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-flink</artifactId>
+            <version>${ignite.version}</version>
+        </dependency>
+        ...
+    </dependencies>
+    ...
+</project>
+----
+--
+. Create an Ignite configuration file and make sure it is accessible from the 
sink.
+. Make sure your data input to the sink is specified and start the sink.
++
+[tabs]
+--
+tab:Java[]
+[source,java]
+----
+IgniteSink igniteSink = new IgniteSink("myCache", "ignite.xml");
+
+igniteSink.setAllowOverwrite(true);
+igniteSink.setAutoFlushFrequency(10);
+igniteSink.start();
+
+DataStream<Map> stream = ...;
+
+// Sink data into the grid.
+stream.addSink(igniteSink);
+try {
+    env.execute();
+} catch (Exception e){
+    // Exception handling.
+}
+finally {
+    igniteSink.stop();
+----
+--
+
+Refer to the Javadocs of the `ignite-flink` module for more info on the 
available options.
diff --git a/docs/_docs/extensions-and-integrations/streaming/flume-sink.adoc 
b/docs/_docs/extensions-and-integrations/streaming/flume-sink.adoc
new file mode 100644
index 0000000..9231824
--- /dev/null
+++ b/docs/_docs/extensions-and-integrations/streaming/flume-sink.adoc
@@ -0,0 +1,65 @@
+= Apache Flume Sink
+
+== Overview
+
+Apache Flume is a distributed, reliable, and available service for efficiently 
collecting, aggregating, and moving large
+amounts of log data. (https://github.com/apache/flume).
+
+`IgniteSink` is a Flume sink that extracts events from an associated Flume 
channel and injects into an Ignite cache.
+
+`IgniteSink` and its dependencies have to be included in the agent's 
classpath, as described in the following subsection,
+before starting the Flume agent.
+
+== Setting Up
+
+. Create a transformer by implementing `EventTransformer` interface.
+. Create `ignite` directory inside `plugins.d` directory which is located in 
`$\{FLUME_HOME}`. If the `plugins.d` directory
+is not there, create it.
+. Build it and copy to `$\{FLUME_HOME}/plugins.d/ignite-sink/lib`.
+. Copy other Ignite-related jar files from Apache Ignite distribution to 
`$\{FLUME_HOME}/plugins.d/ignite-sink/libext` to
+have them as shown below.
++
+----
+plugins.d/
+`-- ignite
+ |-- lib
+ |   `-- ignite-flume-transformer-x.x.x.jar <-- your jar
+ `-- libext
+     |-- cache-api-1.0.0.jar
+     |-- ignite-core-x.x.x.jar
+     |-- ignite-flume-x.x.x.jar <-- IgniteSink
+     |-- ignite-spring-x.x.x.jar
+     |-- spring-aop-4.1.0.RELEASE.jar
+     |-- spring-beans-4.1.0.RELEASE.jar
+     |-- spring-context-4.1.0.RELEASE.jar
+     |-- spring-core-4.1.0.RELEASE.jar
+     `-- spring-expression-4.1.0.RELEASE.jar
+----
+
+. In Flume configuration file, specify Ignite configuration XML file's 
location with cache properties
+(see `flume/src/test/resources/example-ignite.xml` for a basic example) with 
the cache name specified for cache creation.
+Also specify the cache name (same as in Ignite configuration file), your 
`EventTransformer`'s implementation class, and,
+optionally, batch size. All properties are shown in the table below (required 
properties are in bold).
++
+[cols="20%,45%,35%",opts="header"]
+|===
+|Property Name |Description | Default Value
+|channel| | -
+|type| The component type name. Needs to be 
`org.apache.ignite.stream.flume.IgniteSink` | -
+|igniteCfg| Ignite configuration XML file | -
+|cacheName| Cache name. Same as in igniteCfg | -
+|eventTransformer| Your implementation of 
`org.apache.ignite.stream.flume.EventTransformer` | -
+|batchSize| Number of events to be written per transaction| 100
+|===
+
+The sink configuration part of agent named `a1` can look like this:
+
+----
+a1.sinks.k1.type = org.apache.ignite.stream.flume.IgniteSink
+a1.sinks.k1.igniteCfg = /some-path/ignite.xml
+a1.sinks.k1.cacheName = testCache
+a1.sinks.k1.eventTransformer = my.company.MyEventTransformer
+a1.sinks.k1.batchSize = 100
+----
+
+After specifying your source and channel (see Flume's docs), you are ready to 
run a Flume agent.
diff --git a/docs/_docs/extensions-and-integrations/streaming/jms-streamer.adoc 
b/docs/_docs/extensions-and-integrations/streaming/jms-streamer.adoc
new file mode 100644
index 0000000..df6578d
--- /dev/null
+++ b/docs/_docs/extensions-and-integrations/streaming/jms-streamer.adoc
@@ -0,0 +1,109 @@
+= JMS Streamer
+
+== Overview
+
+Ignite offers a JMS Data Streamer to consume messages from JMS brokers, 
convert them into cache tuples and insert them in Ignite.
+
+This data streamer supports the following features:
+
+* Consumes from queues or topics.
+* Supports durable subscriptions from topics.
+* Concurrent consumers are supported via the `threads` parameter.
+ ** When consuming from queues, this component will start as many `Session` 
objects with separate `MessageListener` instances each, therefore achieving 
_natural_ concurrency.
+ ** When consuming from topics, obviously we cannot start multiple threads as 
that would lead us to consume duplicate messages. Therefore, we achieve 
concurrency in a _virtualized_ manner through an internal thread pool.
+* Transacted sessions are supported through the `transacted` parameter.
+* Batched consumption is possible via the `batched` parameter, which groups 
message reception within the scope of a local JMS transaction (XA not used 
supported). Depending on the broker, this technique can provide a higher 
throughput as it decreases the amount of message acknowledgment​ round trips 
that are necessary, albeit at the expense possible duplicate messages 
(especially if an incident occurs in the middle of a transaction).
+ ** Batches are committed when the `batchClosureMillis` time has elapsed, or 
when a Session has received at least `batchClosureSize` messages.
+ ** Time-based closure fires with the specified frequency and applies to all 
``Session``s in parallel.
+ ** Size-based closure applies to each individual `Session` (as transactions 
are `Session-bound` in JMS), so it will fire when that `Session` has processed 
that many messages.
+ ** Both options are compatible with each other. You can disable either, but 
not both if batching is enabled.
+* Supports specifying the destination with implementation-specific 
`Destination` objects or with names.
+
+We have tested our implementation against http://activemq.apache.org[Apache 
ActiveMQ, window=_blank], but any JMS broker
+is supported as long as it client library implements the 
http://download.oracle.com/otndocs/jcp/7195-jms-1.1-fr-spec-oth-JSpec/[JMS 1.1 
specification, window=_blank].
+
+== Instantiating JMS Streamer
+
+When you instantiate the JMS Streamer, you will need to concretize​ the 
following generic types:
+
+* `T extends Message` \=> the type of JMS `Message` this streamer will 
receive. If it can receive multiple, use the generic `Message` type.
+* `K` \=> the type of the cache key.
+* `V` \=> the type of the cache value.
+
+To configure the JMS streamer, you will need to provide the following 
compulsory properties:
+
+* `connectionFactory` \=> an instance of your `ConnectionFactory` duly 
configured as required by the broker. It can be a pooled `ConnectionFactory`.
+* `destination` or (`destinationName` and `destinationType`) \=> a 
`Destination` object (normally a broker-specific implementation of the JMS 
`Queue` or `Topic` interfaces), or the combination of a destination name (queue 
or topic name) and the type as a `Class` reference to either `Queue` or 
`Topic`. In the latter case, the streamer will use either 
`Session.createQueue(String)` or `Session.createTopic(String)` to get a hold of 
the destination.
+* `transformer` \=> an implementation of `MessageTransformer<T, K, V>` that 
digests a JMS message of type `T` and produces a `Map<K, V>` of cache entries 
to add. It can also return `null` or an empty `Map` to ignore the incoming 
message.
+
+== Example
+
+The example in this section populates a cache with `String` keys and `String` 
values, consuming `TextMessages` with this format:
+
+----
+raulk,Raul Kripalani
+dsetrakyan,Dmitriy Setrakyan
+sv,Sergi Vladykin
+gm,Gianfranco Murador
+----
+
+Here is the code:
+
+[tabs]
+--
+tab:Java[]
+[source,java]
+----
+// create a data streamer
+IgniteDataStreamer<String, String> dataStreamer = 
ignite.dataStreamer("mycache"));
+dataStreamer.allowOverwrite(true);
+
+// create a JMS streamer and plug the data streamer into it
+JmsStreamer<TextMessage, String, String> jmsStreamer = new JmsStreamer<>();
+jmsStreamer.setIgnite(ignite);
+jmsStreamer.setStreamer(dataStreamer);
+jmsStreamer.setConnectionFactory(connectionFactory);
+jmsStreamer.setDestination(destination);
+jmsStreamer.setTransacted(true);
+jmsStreamer.setTransformer(new MessageTransformer<TextMessage, String, 
String>() {
+    @Override
+    public Map<String, String> apply(TextMessage message) {
+        final Map<String, String> answer = new HashMap<>();
+        String text;
+        try {
+            text = message.getText();
+        }
+        catch (JMSException e) {
+            LOG.warn("Could not parse message.", e);
+            return Collections.emptyMap();
+        }
+        for (String s : text.split("\n")) {
+            String[] tokens = s.split(",");
+            answer.put(tokens[0], tokens[1]);
+        }
+        return answer;
+    }
+});
+
+jmsStreamer.start();
+
+// on application shutdown
+jmsStreamer.stop();
+dataStreamer.close();
+----
+--
+
+To use this component, you have to import the following module through your 
build system (Maven, Ivy, Gradle, sbt, etc.):
+
+[tabs]
+--
+tab:pom.xml[]
+[source,xml]
+----
+<dependency>
+    <groupId>org.apache.ignite</groupId>
+    <artifactId>ignite-jms11</artifactId>
+    <version>${ignite.version}</version>
+</dependency>
+----
+--
diff --git 
a/docs/_docs/extensions-and-integrations/streaming/kafka-streamer.adoc 
b/docs/_docs/extensions-and-integrations/streaming/kafka-streamer.adoc
new file mode 100644
index 0000000..c1d510d
--- /dev/null
+++ b/docs/_docs/extensions-and-integrations/streaming/kafka-streamer.adoc
@@ -0,0 +1,207 @@
+= Apache Kafka Streamer
+
+== Overview
+
+Apache Ignite Kafka Streamer module provides streaming from Kafka to Ignite 
cache.
+Either of the following two methods can be used to achieve such streaming:
+
+* using Kafka Connect functionality with Ignite sink
+* importing the Kafka Streamer module in your Maven project and instantiating 
KafkaStreamer for data streaming
+
+== Streaming Data via Kafka Connect
+
+`IgniteSinkConnector` will help you export data from Kafka to Ignite cache by 
polling data from Kafka topics and writing
+it to your specified cache. The connector can be found in the 
`optional/ignite-kafka` module. It and its dependencies
+have to be on the classpath of a Kafka running instance, as described in the 
following subsection. _For more information
+on Kafka Connect, see http://kafka.apache.org/documentation.html#connect[Kafka 
Documentation, window=_blank]._
+
+=== Setting up and Running
+
+. Add the `IGNITE_HOME/libs/optional/ignite-kafka` module to the application 
classpath.
+
+. Prepare worker configurations, e.g.,
++
+[tabs]
+--
+tab:Configuration[]
+[source,yaml]
+----
+bootstrap.servers=localhost:9092
+
+key.converter=org.apache.kafka.connect.storage.StringConverter
+value.converter=org.apache.kafka.connect.storage.StringConverter
+key.converter.schemas.enable=false
+value.converter.schemas.enable=false
+
+internal.key.converter=org.apache.kafka.connect.storage.StringConverter
+internal.value.converter=org.apache.kafka.connect.storage.StringConverter
+internal.key.converter.schemas.enable=false
+internal.value.converter.schemas.enable=false
+
+offset.storage.file.filename=/tmp/connect.offsets
+offset.flush.interval.ms=10000
+----
+--
+
+. Prepare connector configurations, e.g.,
++
+[tabs]
+--
+tab:Configuration[]
+[source,yaml]
+----
+# connector
+name=my-ignite-connector
+connector.class=org.apache.ignite.stream.kafka.connect.IgniteSinkConnector
+tasks.max=2
+topics=someTopic1,someTopic2
+
+# cache
+cacheName=myCache
+cacheAllowOverwrite=true
+igniteCfg=/some-path/ignite.xml
+singleTupleExtractorCls=my.company.MyExtractor
+----
+--
++
+* where `cacheName` is the name of the cache you specify in 
`/some-path/ignite.xml` and the data from `someTopic1,someTopic2`
+will be pulled and stored.
+* `cacheAllowOverwrite` can be set to `true` if you want to enable overwriting 
existing values in the cache.
+* If you need to parse the incoming data and decide on your new key and value, 
you can implement it as `StreamSingleTupleExtractor` and specify as 
`singleTupleExtractorCls`.
+* You can also set `cachePerNodeDataSize` and `cachePerNodeParOps` to adjust 
per-node buffer and the maximum number of parallel stream operations for a 
single node.
+
+. Start connector, for instance, in a standalone mode as follows,
++
+[tabs]
+--
+tab:Shell[]
+[source,shell]
+----
+bin/connect-standalone.sh myconfig/connect-standalone.properties 
myconfig/ignite-connector.properties
+----
+--
+
+=== Checking the Flow
+
+To perform a very basic functionality check, you can do the following,
+
+. Start Zookeeper
++
+[tabs]
+--
+tab:Shell[]
+[source,shell]
+----
+bin/zookeeper-server-start.sh config/zookeeper.properties
+----
+--
+. Start Kafka server
++
+[tabs]
+--
+tab:Shell[]
+[source,shell]
+----
+bin/kafka-server-start.sh config/server.properties
+----
+--
+. Provide some data input to the Kafka server
++
+[tabs]
+--
+tab:Shell[]
+[source,shell]
+----
+bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 
--property parse.key=true --property key.separator=,k1,v1
+----
+--
+. Start the connector
++
+[tabs]
+--
+tab:Shell[]
+[source,shell]
+----
+bin/connect-standalone.sh myconfig/connect-standalone.properties 
myconfig/ignite-connector.properties
+----
+--
+. Check the value is in the cache. For example, via REST API,
++
+[tabs]
+--
+tab:Shell[]
+[source,shell]
+----
+http://node1:8080/ignite?cmd=size&cacheName=cache1
+----
+--
+
+== Streaming data with Ignite Kafka Streamer Module
+
+If you are using Maven to manage dependencies of your project, first of all 
you will have to add Kafka Streamer module
+dependency like this (replace `${ignite.version}` with actual Ignite version 
you are interested in):
+
+[tabs]
+--
+tab:pom.xml[]
+[source,xml]
+----
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
+                        http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    ...
+    <dependencies>
+        ...
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-kafka</artifactId>
+            <version>${ignite.version}</version>
+        </dependency>
+        ...
+    </dependencies>
+    ...
+</project>
+----
+--
+
+Having a cache with `String` keys and `String` values, the streamer can be 
started as follows
+[tabs]
+--
+tab:Java[]
+[source,java]
+----
+KafkaStreamer<String, String, String> kafkaStreamer = new KafkaStreamer<>();
+
+IgniteDataStreamer<String, String> stmr = ignite.dataStreamer("myCache"));
+
+// allow overwriting cache data
+stmr.allowOverwrite(true);
+
+kafkaStreamer.setIgnite(ignite);
+kafkaStreamer.setStreamer(stmr);
+
+// set the topic
+kafkaStreamer.setTopic(someKafkaTopic);
+
+// set the number of threads to process Kafka streams
+kafkaStreamer.setThreads(4);
+
+// set Kafka consumer configurations
+kafkaStreamer.setConsumerConfig(kafkaConsumerConfig);
+
+// set extractor
+kafkaStreamer.setSingleTupleExtractor(strExtractor);
+
+kafkaStreamer.start();
+
+...
+
+// stop on shutdown
+kafkaStreamer.stop();
+
+strm.close();
+----
+--
+
+For the detailed information on Kafka consumer properties, refer 
http://kafka.apache.org/documentation.html
diff --git 
a/docs/_docs/extensions-and-integrations/streaming/mqtt-streamer.adoc 
b/docs/_docs/extensions-and-integrations/streaming/mqtt-streamer.adoc
new file mode 100644
index 0000000..9c792da
--- /dev/null
+++ b/docs/_docs/extensions-and-integrations/streaming/mqtt-streamer.adoc
@@ -0,0 +1,62 @@
+= MQTT Streamer
+
+== Overview
+
+This streamer consumes from an MQTT topic and feeds key-value pairs into an 
`IgniteDataStreamer` instance, using
+https://eclipse.org/paho/[Eclipse Paho, window=_blank] as an MQTT client.
+
+You need to provide a stream tuple extractor (either a single-entry or 
multiple-entries extractor) to process the incoming
+message and extract the tuple to insert.
+
+This streamer supports:
+
+* Subscribing to a single topic or multiple topics at once.
+* Specifying the subscriber's QoS for a single topic or for multiple topics.
+* Setting 
https://www.eclipse.org/paho/files/javadoc/org/eclipse/paho/client/mqttv3/MqttConnectOptions.html[MqttConnectOptions,
 window=_blank]
+to enable features like _last will testament_, _persistent sessions_, etc.
+* Specifying the client ID. A random one will be generated and maintained 
throughout reconnections if the user does not provide one.
+* (Re-)Connection retries powered by the 
https://github.com/rholder/guava-retrying[guava-retrying library, 
window=_blank].
+_Retry wait_ and _retry stop_ policies can be configured.
+* Blocking the start() method until the client is connected for the first time.
+
+== Example
+
+Here's a trivial code sample showing how to use this streamer:
+
+[tabs]
+--
+tab:Java[]
+[source,java]
+----
+// Start Ignite.
+Ignite ignite = Ignition.start();
+
+// Get a data streamer reference.
+IgniteDataStreamer<Integer, String> dataStreamer = 
grid().dataStreamer("mycache");
+
+// Create an MQTT data streamer
+MqttStreamer<Integer, String> streamer = new MqttStreamer<>();
+streamer.setIgnite(ignite);
+streamer.setStreamer(dataStreamer);
+streamer.setBrokerUrl(brokerUrl);
+streamer.setBlockUntilConnected(true);
+
+// Set a single tuple extractor to extract items in the format 'key,value' 
where key => Int, and value => String
+// (using Guava here).
+streamer.setSingleTupleExtractor(new StreamSingleTupleExtractor<MqttMessage, 
Integer, String>() {
+    @Override public Map.Entry<Integer, String> extract(MqttMessage msg) {
+        List<String> s = Splitter.on(",").splitToList(new 
String(msg.getPayload()));
+
+        return new GridMapEntry<>(Integer.parseInt(s.get(0)), s.get(1));
+    }
+});
+
+// Consume from multiple topics at once.
+streamer.setTopics(Arrays.asList("def", "ghi", "jkl", "mno"));
+
+// Start the MQTT Streamer.
+streamer.start();
+----
+--
+
+Refer to the Javadocs of the `ignite-mqtt` module for more info on the 
available options.
diff --git 
a/docs/_docs/extensions-and-integrations/streaming/rocketmq-streamer.adoc 
b/docs/_docs/extensions-and-integrations/streaming/rocketmq-streamer.adoc
new file mode 100644
index 0000000..a2781a9
--- /dev/null
+++ b/docs/_docs/extensions-and-integrations/streaming/rocketmq-streamer.adoc
@@ -0,0 +1,71 @@
+= RocketMQ Streamer
+
+This streamer module provides streaming from 
https://github.com/apache/incubator-rocketmq[Apache RocketMQ, window=_blank]
+to Ignite.
+
+To use Ignite RocketMQ Streamer module
+
+. Import it to your Maven project. If you are using Maven to manage 
dependencies of your project, you can add an Ignite
+RocketMQ module dependency like this (replace `${ignite.version}` with actual 
Ignite version you are interested in):
++
+[tabs]
+--
+tab:pom.xml[]
+[source,xml]
+----
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
+                        http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    ...
+    <dependencies>
+        ...
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-rocketmq</artifactId>
+            <version>${ignite.version}</version>
+        </dependency>
+        ...
+    </dependencies>
+    ...
+</project>
+----
+--
+
+. Implement either `StreamSingleTupleExtractor` or 
`StreamMultipleTupleExtractor` for the streamer (shown
+as `MyTupleExtractor` in the code sample below). For a simple implementation, 
refer to `RocketMQStreamerTest.java`.
+
+. Initialize and start the streamer
++
+[tabs]
+--
+tab:Java[]
+[source,java]
+----
+IgniteDataStreamer<String, byte[]> dataStreamer = 
ignite.dataStreamer(MY_CACHE));
+
+dataStreamer.allowOverwrite(true);
+dataStreamer.autoFlushFrequency(10);
+
+streamer = new RocketMQStreamer<>();
+
+//configure.
+streamer.setIgnite(ignite);
+streamer.setStreamer(dataStreamer);
+streamer.setNameSrvAddr(NAMESERVER_IP_PORT);
+streamer.setConsumerGrp(CONSUMER_GRP);
+streamer.setTopic(TOPIC_NAME);
+streamer.setMultipleTupleExtractor(new MyTupleExtractor());
+
+streamer.start();
+
+...
+
+// stop on shutdown
+streamer.stop();
+
+dataStreamer.close();
+----
+--
+
+Refer to the Javadocs for more info on the available options.
diff --git 
a/docs/_docs/extensions-and-integrations/streaming/storm-streamer.adoc 
b/docs/_docs/extensions-and-integrations/streaming/storm-streamer.adoc
new file mode 100644
index 0000000..1eb3a2a
--- /dev/null
+++ b/docs/_docs/extensions-and-integrations/streaming/storm-streamer.adoc
@@ -0,0 +1,48 @@
+= Apache Storm Streamer
+
+Apache Ignite Storm Streamer module provides streaming via 
http://storm.apache.org/[Storm, window=_blank] to Ignite.
+
+Starting data transfer to Ignite can be done with the following steps.
+
+. Import Ignite Storm Streamer Module In Maven Project. If you are using Maven 
to manage dependencies of your project,
+you can add Storm module dependency like this (replace `${ignite.version}` 
with actual Ignite version you are interested in):
++
+[tabs]
+--
+tab:pom.xml[]
+[source,xml]
+----
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
+                        http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    ...
+    <dependencies>
+        ...
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-storm</artifactId>
+            <version>${ignite.version}</version>
+        </dependency>
+        ...
+    </dependencies>
+    ...
+</project>
+----
+--
+
+. Create an Ignite configuration file (see `example-ignite.xml` in 
`modules/storm/src/test/resources/example-ignite.xml`)
+and make sure it is accessible from the streamer.
+. Make sure your key-value data input to the streamer is specified with the 
field named `ignite` (or a different one you
+configure with `StormStreamer.setIgniteTupleField(...)`).
+See TestStormSpout.declareOutputFields(...) for an example.
+. Create a topology with the streamer, make a jar file with all dependencies 
and run the following
++
+[tabs]
+--
+tab:Shell[]
+[source,shell]
+----
+storm jar ignite-storm-streaming-jar-with-dependencies.jar 
my.company.ignite.MyStormTopology
+----
+--
diff --git 
a/docs/_docs/extensions-and-integrations/streaming/twitter-streamer.adoc 
b/docs/_docs/extensions-and-integrations/streaming/twitter-streamer.adoc
new file mode 100644
index 0000000..aca8fc9
--- /dev/null
+++ b/docs/_docs/extensions-and-integrations/streaming/twitter-streamer.adoc
@@ -0,0 +1,51 @@
+= Twitter Streamer
+
+Ignite Twitter Streamer module consumes tweets from Twitter and feeds the 
transformed key-value pairs `<tweetId, text>` into Ignite.
+
+To stream data from Twitter into Ignite, you need to:
+
+. Import Ignite Twitter Module with Maven and replace `${ignite.version}` with 
the actual Ignite version you are interested in.
++
+[tabs]
+--
+tab:pom.xml[]
+[source,xml]
+----
+<dependency>
+  <groupId>org.apache.ignite</groupId>
+  <artifactId>ignite-twitter</artifactId>
+  <version>${ignite.version}</version>
+</dependency>
+----
+--
+
+. In your code, set the necessary parameters and start the streamer, like so:
++
+[tabs]
+--
+tab:Java[]
+[source,java]
+----
+IgniteDataStreamer dataStreamer = ignite.dataStreamer("myCache");
+dataStreamer.allowOverwrite(true);
+dataStreamer.autoFlushFrequency(10);
+
+OAuthSettings oAuthSettings = new OAuthSettings("setting1", "setting2", 
"setting3", "setting4");
+
+TwitterStreamer<Integer, String> streamer = new 
TwitterStreamer<>(oAuthSettings);
+streamer.setIgnite(ignite);
+streamer.setStreamer(dataStreamer);
+
+Map<String, String> params = new HashMap<>();
+params.put("track", "apache, twitter");
+params.put("follow", "3004445758");
+
+streamer.setApiParams(params);// Twitter Streaming API params.
+streamer.setEndpointUrl(endpointUrl);// Twitter streaming API endpoint.
+streamer.setThreadsCount(8);
+
+streamer.start();
+----
+--
+
+Refer to https://dev.twitter.com/streaming/overview[Twitter streaming API, 
window=_blank] documentation for more information on various streaming 
parameters.
diff --git 
a/docs/_docs/extensions-and-integrations/streaming/zeromq-streamer.adoc 
b/docs/_docs/extensions-and-integrations/streaming/zeromq-streamer.adoc
new file mode 100644
index 0000000..0e6e6cf
--- /dev/null
+++ b/docs/_docs/extensions-and-integrations/streaming/zeromq-streamer.adoc
@@ -0,0 +1,53 @@
+= ZeroMQ Streamer
+
+Apache Ignite ZeroMQ Streamer module enables streaming capabilities via 
http://zeromq.org/[ZeroMQ, window=_blank] into Ignite.
+
+To start streaming into Ignite, you need to do the following:
+
+. Add Ignite ZeroMQ Streamer Module to your Maven `pom.xml` file.
++
+[tabs]
+--
+tab:pom.xml[]
+[source,xml]
+----
+<dependencies>
+    ...
+    <dependency>
+        <groupId>org.apache.ignite</groupId>
+        <artifactId>ignite-zeromq</artifactId>
+        <version>${ignite.version}</version>
+    </dependency>
+    ...
+</dependencies>
+----
+--
+
+. Implement either the 
https://github.com/apache/ignite/blob/f2f82f09b35368f25e136c9fce5e7f2198a91171/modules/core/src/main/java/org/apache/ignite/stream/StreamSingleTupleExtractor.java[StreamSingleTupleExtractor,
 window=_blank] or
+the 
https://github.com/apache/ignite/blob/f2f82f09b35368f25e136c9fce5e7f2198a91171/modules/core/src/main/java/org/apache/ignite/stream/StreamMultipleTupleExtractor.java[StreamMultipleTupleExtractor,
 window=_blank] for ZeroMQ streamer.
+Refer to 
https://github.com/apache/ignite/blob/7492843ad9e22c91764fb8d0c3a096b8ce6c653e/modules/zeromq/src/test/java/org/apache/ignite/stream/zeromq/ZeroMqStringSingleTupleExtractor.java[this
 sample implementation, window=_blank] for more details.
+. Set the extractor and initiate the streaming as shown below:
++
+[tabs]
+--
+tab:Java[]
+[source,java]
+----
+try (IgniteDataStreamer<Integer, String> dataStreamer =
+     grid().dataStreamer("myCacheName")) {
+
+    dataStreamer.allowOverwrite(true);
+    dataStreamer.autoFlushFrequency(1);
+
+    try (IgniteZeroMqStreamer streamer = new IgniteZeroMqStreamer(
+      1, ZeroMqTypeSocket.PULL, "tcp://localhost:5671", null)) {
+      streamer.setIgnite(grid());
+      streamer.setStreamer(dataStreamer);
+
+      streamer.setSingleTupleExtractor(new ZeroMqStringSingleTupleExtractor());
+
+      streamer.start();
+    }
+}
+----
+--
diff --git a/docs/_docs/images/integrations/camel-streamer.png 
b/docs/_docs/images/integrations/camel-streamer.png
new file mode 100644
index 0000000..cff36dc
Binary files /dev/null and b/docs/_docs/images/integrations/camel-streamer.png 
differ

Reply via email to