This is an automated email from the ASF dual-hosted git repository.

lresende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bahir-website.git

commit 5cb97e789f17d9a024f1edb89a82da02ff8f45a5
Author: Luciano Resende <[email protected]>
AuthorDate: Mon Dec 14 17:26:01 2020 -0800

    Add Bahir 2.3.4 documentation to website
---
 .../spark/{overview.md => 2.3.4/documentation.md}  |  47 ++-
 site/docs/spark/2.3.4/spark-sql-cloudant.md        | 343 +++++++++++++++++++++
 site/docs/spark/2.3.4/spark-sql-streaming-akka.md  | 137 ++++++++
 site/docs/spark/2.3.4/spark-sql-streaming-mqtt.md  | 218 +++++++++++++
 site/docs/spark/2.3.4/spark-streaming-akka.md      |  89 ++++++
 site/docs/spark/2.3.4/spark-streaming-mqtt.md      | 116 +++++++
 site/docs/spark/2.3.4/spark-streaming-pubnub.md    | 103 +++++++
 site/docs/spark/2.3.4/spark-streaming-pubsub.md    |  96 ++++++
 site/docs/spark/2.3.4/spark-streaming-twitter.md   |  74 +++++
 site/docs/spark/2.3.4/spark-streaming-zeromq.md    |  76 +++++
 site/docs/spark/overview.md                        |   1 +
 11 files changed, 1284 insertions(+), 16 deletions(-)

diff --git a/site/docs/spark/overview.md 
b/site/docs/spark/2.3.4/documentation.md
similarity index 51%
copy from site/docs/spark/overview.md
copy to site/docs/spark/2.3.4/documentation.md
index 92b336f..b5f9e3b 100644
--- a/site/docs/spark/overview.md
+++ b/site/docs/spark/2.3.4/documentation.md
@@ -27,19 +27,34 @@ limitations under the License.
 
 ### Apache Bahir Extensions for Apache Spark
 
- - [Current - 2.4.0-SNAPSHOT](/docs/spark/current/documentation)
- - [2.3.3](/docs/spark/2.3.3/documentation)
- - [2.3.2](/docs/spark/2.3.2/documentation)
- - [2.3.1](/docs/spark/2.3.1/documentation)
- - [2.3.0](/docs/spark/2.3.0/documentation)
- - [2.2.3](/docs/spark/2.2.3/documentation)
- - [2.2.2](/docs/spark/2.2.2/documentation)
- - [2.2.1](/docs/spark/2.2.1/documentation)
- - [2.2.0](/docs/spark/2.2.0/documentation)
- - [2.1.3](/docs/spark/2.1.3/documentation)
- - [2.1.2](/docs/spark/2.1.2/documentation)
- - [2.1.1](/docs/spark/2.1.1/documentation)
- - [2.1.0](/docs/spark/2.1.0/documentation)
- - [2.0.2](/docs/spark/2.0.2/documentation)
- - [2.0.1](/docs/spark/2.0.1/documentation)
- - [2.0.0](/docs/spark/2.0.0/documentation)
+<br/>
+
+#### SQL  Data Sources
+
+[Apache CouchDB/Cloudant data source](../spark-sql-cloudant)
+
+<br/>
+
+#### Structured Streaming Data Sources
+
+[Akka data source](../spark-sql-streaming-akka)
+
+[MQTT data source](../spark-sql-streaming-mqtt) 
![](/assets/themes/apache-clean/img/new-black.png){:height="36px" width="36px"} 
(new Sink)
+
+<br/>
+
+#### Discretized Streams (DStreams) Connectors
+
+[Apache CouchDB/Cloudant connector](../spark-sql-cloudant)
+
+[Akka connector](../spark-streaming-akka)
+
+[Google Cloud Pub/Sub connector](../spark-streaming-pubsub)
+
+[Cloud PubNub connector](../spark-streaming-pubnub) 
![](/assets/themes/apache-clean/img/new-black.png){:height="36px" width="36px"}
+
+[MQTT connector](../spark-streaming-mqtt)
+
+[Twitter connector](../spark-streaming-twitter)
+
+[ZeroMQ connector](../spark-streaming-zeromq) 
![](/assets/themes/apache-clean/img/new-black.png){:height="36px" width="36px"} 
(Enhanced Implementation)
diff --git a/site/docs/spark/2.3.4/spark-sql-cloudant.md 
b/site/docs/spark/2.3.4/spark-sql-cloudant.md
new file mode 100644
index 0000000..ab01cfc
--- /dev/null
+++ b/site/docs/spark/2.3.4/spark-sql-cloudant.md
@@ -0,0 +1,343 @@
+---
+layout: page
+title: Spark Data Source for Apache CouchDB/Cloudant
+description: Spark Data Source for Apache CouchDB/Cloudant
+group: nav-right
+---
+<!--
+{% comment %}
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+{% endcomment %}
+-->
+
+{% include JB/setup %}
+A library for reading data from Cloudant or CouchDB databases using Spark SQL 
and Spark Streaming. 
+
+[IBM® Cloudant®](https://cloudant.com) is a document-oriented DataBase as a 
Service (DBaaS). It stores data as documents 
+in JSON format. It's built with scalability, high availability, and durability 
in mind. It comes with a 
+wide variety of indexing options including map-reduce, Cloudant Query, 
full-text indexing, and 
+geospatial indexing. The replication capabilities make it easy to keep data in 
sync between database 
+clusters, desktop PCs, and mobile devices.
+
+[Apache CouchDB™](http://couchdb.apache.org) is open source database software 
that focuses on ease of use and having an architecture that "completely 
embraces the Web". It has a document-oriented NoSQL database architecture and 
is implemented in the concurrency-oriented language Erlang; it uses JSON to 
store data, JavaScript as its query language using MapReduce, and HTTP for an 
API.
+
+## Linking
+
+Using SBT:
+
+    libraryDependencies += "org.apache.bahir" %% "spark-sql-cloudant" % "2.3.4"
+
+Using Maven:
+
+    <dependency>
+        <groupId>org.apache.bahir</groupId>
+        <artifactId>spark-sql-cloudant_2.11</artifactId>
+        <version>2.3.4</version>
+    </dependency>
+
+This library can also be added to Spark jobs launched through `spark-shell` or 
`spark-submit` by using the `--packages` command line option.
+
+    $ bin/spark-shell --packages org.apache.bahir:spark-sql-cloudant_2.11:2.3.4
+
+Unlike using `--jars`, using `--packages` ensures that this library and its 
dependencies will be added to the classpath.
+The `--packages` argument can also be used with `bin/spark-submit`.
+
+Submit a job in Python:
+    
+    spark-submit  --master local[4] --packages 
org.apache.bahir:spark-sql-cloudant__2.11:2.3.4  <path to python script>
+    
+Submit a job in Scala:
+
+       spark-submit --class "<your class>" --master local[4] --packages 
org.apache.bahir:spark-sql-cloudant__2.11:2.3.4 <path to spark-sql-cloudant jar>
+
+This library is compiled for Scala 2.11 only, and intends to support Spark 2.0 
onwards.
+
+## Configuration options       
+The configuration is obtained in the following sequence:
+
+1. default in the Config, which is set in the application.conf
+2. key in the SparkConf, which is set in SparkConf
+3. key in the parameters, which is set in a dataframe or temporaty table 
options
+4. "spark."+key in the SparkConf (as they are treated as the one passed in 
through spark-submit using --conf option)
+
+Here each subsequent configuration overrides the previous one. Thus, 
configuration set using DataFrame option overrides what has beens set in 
SparkConf. And configuration passed in spark-submit using --conf takes 
precedence over any setting in the code.
+
+
+### Configuration in application.conf
+Default values are defined in [here](src/main/resources/application.conf).
+
+### Configuration on SparkConf
+
+Name | Default | Meaning
+--- |:---:| ---
+cloudant.batchInterval|8|number of seconds to set for streaming all documents 
from `_changes` endpoint into Spark dataframe.  See [Setting the right batch 
interval](https://spark.apache.org/docs/latest/streaming-programming-guide.html#setting-the-right-batch-interval)
 for tuning this value.
+cloudant.endpoint|`_all_docs`|endpoint for RelationProvider when loading data 
from Cloudant to DataFrames or SQL temporary tables. Select between the 
Cloudant `_all_docs` or `_changes` API endpoint.  See **Note** below for 
differences between endpoints.
+cloudant.protocol|https|protocol to use to transfer data: http or https
+cloudant.host| |cloudant host url
+cloudant.username| |cloudant userid
+cloudant.password| |cloudant password
+cloudant.numberOfRetries|3| number of times to replay a request that received 
a 429 `Too Many Requests` response
+cloudant.useQuery|false|by default, `_all_docs` endpoint is used if 
configuration 'view' and 'index' (see below) are not set. When useQuery is 
enabled, `_find` endpoint will be used in place of `_all_docs` when query 
condition is not on primary key field (_id), so that query predicates may be 
driven into datastore. 
+cloudant.queryLimit|25|the maximum number of results returned when querying 
the `_find` endpoint.
+cloudant.storageLevel|MEMORY_ONLY|the storage level for persisting Spark RDDs 
during load when `cloudant.endpoint` is set to `_changes`.  See [RDD 
Persistence 
section](https://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence)
 in Spark's Progamming Guide for all available storage level options.
+cloudant.timeout|60000|stop the response after waiting the defined number of 
milliseconds for data.  Only supported with `changes` endpoint.
+jsonstore.rdd.partitions|10|the number of partitions intent used to drive 
JsonStoreRDD loading query result in parallel. The actual number is calculated 
based on total rows returned and satisfying maxInPartition and minInPartition. 
Only supported with `_all_docs` endpoint.
+jsonstore.rdd.maxInPartition|-1|the max rows in a partition. -1 means unlimited
+jsonstore.rdd.minInPartition|10|the min rows in a partition.
+jsonstore.rdd.requestTimeout|900000|the request timeout in milliseconds
+bulkSize|200|the bulk save size
+schemaSampleSize|-1|the sample size for RDD schema discovery. 1 means we are 
using only the first document for schema discovery; -1 means all documents; 0 
will be treated as 1; any number N means min(N, total) docs. Only supported 
with `_all_docs` endpoint.
+createDBOnSave|false|whether to create a new database during save operation. 
If false, a database should already exist. If true, a new database will be 
created. If true, and a database with a provided name already exists, an error 
will be raised. 
+
+The `cloudant.endpoint` option sets ` _changes` or `_all_docs` API endpoint to 
be called while loading Cloudant data into Spark DataFrames or SQL Tables.
+
+**Note:** When using `_changes` API, please consider: 
+1. Results are partially ordered and may not be be presented in order in 
+which documents were updated.
+2. In case of shards' unavailability, you may see duplicate results (changes 
that have been seen already)
+3. Can use `selector` option to filter Cloudant docs during load
+4. Supports a real snapshot of the database and represents it in a single 
point of time.
+5. Only supports a single partition.
+
+
+When using `_all_docs` API:
+1. Supports parallel reads (using offset and range) and partitioning.
+2. Using partitions may not represent the true snapshot of a database.  Some 
docs
+   may be added or deleted in the database between loading data into different 
+   Spark partitions.
+
+If loading Cloudant docs from a database greater than 100 MB, set 
`cloudant.endpoint` to `_changes` and `spark.streaming.unpersist` to `false`.
+This will enable RDD persistence during load against `_changes` endpoint and 
allow the persisted RDDs to be accessible after streaming completes.  
+ 
+See 
[CloudantChangesDFSuite](src/test/scala/org/apache/bahir/cloudant/CloudantChangesDFSuite.scala)
 
+for examples of loading data into a Spark DataFrame with `_changes` API.
+
+### Configuration on Spark SQL Temporary Table or DataFrame
+
+Besides all the configurations passed to a temporary table or dataframe 
through SparkConf, it is also possible to set the following configurations in 
temporary table or dataframe using OPTIONS: 
+
+Name | Default | Meaning
+--- |:---:| ---
+bulkSize|200| the bulk save size
+createDBOnSave|false| whether to create a new database during save operation. 
If false, a database should already exist. If true, a new database will be 
created. If true, and a database with a provided name already exists, an error 
will be raised. 
+database| | Cloudant database name
+index| | Cloudant Search index without the database name. Search index queries 
are limited to returning 200 results so can only be used to load data with <= 
200 results.
+path| | Cloudant: as database name if database is not present
+schemaSampleSize|-1| the sample size used to discover the schema for this temp 
table. -1 scans all documents
+selector|all documents| a selector written in Cloudant Query syntax, 
specifying conditions for selecting documents when the `cloudant.endpoint` 
option is set to `_changes`. Only documents satisfying the selector's 
conditions will be retrieved from Cloudant and loaded into Spark.
+view| | Cloudant view w/o the database name. only used for load.
+
+For fast loading, views are loaded without include_docs. Thus, a derived 
schema will always be: `{id, key, value}`, where `value `can be a compount 
field. An example of loading data from a view: 
+
+```python
+spark.sql(" CREATE TEMPORARY TABLE flightTable1 USING 
org.apache.bahir.cloudant OPTIONS ( database 'n_flight', view 
'_design/view/_view/AA0')")
+
+```
+
+### Configuration on Cloudant Receiver for Spark Streaming
+
+Name | Default | Meaning
+--- |:---:| ---
+cloudant.host||cloudant host url
+cloudant.username||cloudant userid
+cloudant.password||cloudant password
+database||cloudant database name
+selector| all documents| a selector written in Cloudant Query syntax, 
specifying conditions for selecting documents. Only documents satisfying the 
selector's conditions will be retrieved from Cloudant and loaded into Spark.
+
+### Configuration in spark-submit using --conf option
+
+The above stated configuration keys can also be set using `spark-submit 
--conf` option. When passing configuration in spark-submit, make sure adding 
"spark." as prefix to the keys.
+
+
+## Examples
+
+### Python API
+
+#### Using SQL In Python 
+       
+```python
+spark = SparkSession\
+    .builder\
+    .appName("Cloudant Spark SQL Example in Python using temp tables")\
+    .config("cloudant.host","ACCOUNT.cloudant.com")\
+    .config("cloudant.username", "USERNAME")\
+    .config("cloudant.password","PASSWORD")\
+    .getOrCreate()
+
+
+# Loading temp table from Cloudant db
+spark.sql(" CREATE TEMPORARY TABLE airportTable USING 
org.apache.bahir.cloudant OPTIONS ( database 'n_airportcodemapping')")
+airportData = spark.sql("SELECT _id, airportName FROM airportTable WHERE _id 
>= 'CAA' AND _id <= 'GAA' ORDER BY _id")
+airportData.printSchema()
+print 'Total # of rows in airportData: ' + str(airportData.count())
+for code in airportData.collect():
+    print code._id
+```
+
+See [CloudantApp.py](examples/python/CloudantApp.py) for examples.
+
+Submit job example:
+```
+spark-submit  --packages org.apache.bahir:spark-sql-cloudant_2.11:2.3.4 --conf 
spark.cloudant.host=ACCOUNT.cloudant.com --conf 
spark.cloudant.username=USERNAME --conf spark.cloudant.password=PASSWORD 
sql-cloudant/examples/python/CloudantApp.py
+```
+
+#### Using DataFrame In Python 
+
+```python
+spark = SparkSession\
+    .builder\
+    .appName("Cloudant Spark SQL Example in Python using dataframes")\
+    .config("cloudant.host","ACCOUNT.cloudant.com")\
+    .config("cloudant.username", "USERNAME")\
+    .config("cloudant.password","PASSWORD")\
+    .config("jsonstore.rdd.partitions", 8)\
+    .getOrCreate()
+
+# ***1. Loading dataframe from Cloudant db
+df = spark.read.load("n_airportcodemapping", "org.apache.bahir.cloudant")
+df.cache() 
+df.printSchema()
+df.filter(df.airportName >= 'Moscow').select("_id",'airportName').show()
+df.filter(df._id >= 'CAA').select("_id",'airportName').show()      
+```
+
+See [CloudantDF.py](examples/python/CloudantDF.py) for examples.
+       
+In case of doing multiple operations on a dataframe (select, filter etc.),
+you should persist a dataframe. Otherwise, every operation on a dataframe will 
load the same data from Cloudant again.
+Persisting will also speed up computation. This statement will persist an RDD 
in memory: `df.cache()`.  Alternatively for large dbs to persist in memory & 
disk, use: 
+
+```python
+from pyspark import StorageLevel
+df.persist(storageLevel = StorageLevel(True, True, False, True, 1))
+```
+
+[Sample code](examples/python/CloudantDFOption.py) on using DataFrame option 
to define cloudant configuration
+
+### Scala API
+
+#### Using SQL In Scala 
+
+```scala
+val spark = SparkSession
+      .builder()
+      .appName("Cloudant Spark SQL Example")
+      .config("cloudant.host","ACCOUNT.cloudant.com")
+      .config("cloudant.username", "USERNAME")
+      .config("cloudant.password","PASSWORD")
+      .getOrCreate()
+
+// For implicit conversions of Dataframe to RDDs
+import spark.implicits._
+    
+// create a temp table from Cloudant db and query it using sql syntax
+spark.sql(
+    s"""
+    |CREATE TEMPORARY TABLE airportTable
+    |USING org.apache.bahir.cloudant
+    |OPTIONS ( database 'n_airportcodemapping')
+    """.stripMargin)
+// create a dataframe
+val airportData = spark.sql("SELECT _id, airportName FROM airportTable WHERE 
_id >= 'CAA' AND _id <= 'GAA' ORDER BY _id")
+airportData.printSchema()
+println(s"Total # of rows in airportData: " + airportData.count())
+// convert dataframe to array of Rows, and process each row
+airportData.map(t => "code: " + t(0) + ",name:" + 
t(1)).collect().foreach(println)
+```
+See 
[CloudantApp.scala](examples/scala/src/main/scala/mytest/spark/CloudantApp.scala)
 for examples.
+
+Submit job example:
+```
+spark-submit --class org.apache.spark.examples.sql.cloudant.CloudantApp 
--packages org.apache.bahir:spark-sql-cloudant_2.11:2.3.4 --conf 
spark.cloudant.host=ACCOUNT.cloudant.com --conf 
spark.cloudant.username=USERNAME --conf spark.cloudant.password=PASSWORD  
/path/to/spark-sql-cloudant_2.11-2.3.4-tests.jar
+```
+
+### Using DataFrame In Scala 
+
+```scala
+val spark = SparkSession
+      .builder()
+      .appName("Cloudant Spark SQL Example with Dataframe")
+      .config("cloudant.host","ACCOUNT.cloudant.com")
+      .config("cloudant.username", "USERNAME")
+      .config("cloudant.password","PASSWORD")
+      .config("createDBOnSave","true") // to create a db on save
+      .config("jsonstore.rdd.partitions", "20") // using 20 partitions
+      .getOrCreate()
+          
+// 1. Loading data from Cloudant db
+val df = spark.read.format("org.apache.bahir.cloudant").load("n_flight")
+// Caching df in memory to speed computations
+// and not to retrieve data from cloudant again
+df.cache() 
+df.printSchema()
+
+// 2. Saving dataframe to Cloudant db
+val df2 = df.filter(df("flightSegmentId") === "AA106")
+    .select("flightSegmentId","economyClassBaseCost")
+df2.show()
+df2.write.format("org.apache.bahir.cloudant").save("n_flight2")
+```
+
+See 
[CloudantDF.scala](examples/scala/src/main/scala/mytest/spark/CloudantDF.scala) 
for examples.
+    
+[Sample 
code](examples/scala/src/main/scala/mytest/spark/CloudantDFOption.scala) on 
using DataFrame option to define Cloudant configuration.
+ 
+ 
+### Using Streams In Scala 
+
+```scala
+val ssc = new StreamingContext(sparkConf, Seconds(10))
+val changes = ssc.receiverStream(new CloudantReceiver(Map(
+  "cloudant.host" -> "ACCOUNT.cloudant.com",
+  "cloudant.username" -> "USERNAME",
+  "cloudant.password" -> "PASSWORD",
+  "database" -> "n_airportcodemapping")))
+
+changes.foreachRDD((rdd: RDD[String], time: Time) => {
+  // Get the singleton instance of SparkSession
+  val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf)
+
+  println(s"========= $time =========")
+  // Convert RDD[String] to DataFrame
+  val changesDataFrame = spark.read.json(rdd)
+  if (!changesDataFrame.schema.isEmpty) {
+    changesDataFrame.printSchema()
+    changesDataFrame.select("*").show()
+    ....
+  }
+})
+ssc.start()
+// run streaming for 120 secs
+Thread.sleep(120000L)
+ssc.stop(true)
+       
+```
+
+See 
[CloudantStreaming.scala](examples/scala/src/main/scala/mytest/spark/CloudantStreaming.scala)
 for examples.
+
+By default, Spark Streaming will load all documents from a database. If you 
want to limit the loading to 
+specific documents, use `selector` option of `CloudantReceiver` and specify 
your conditions 
+(See 
[CloudantStreamingSelector.scala](examples/scala/src/main/scala/mytest/spark/CloudantStreamingSelector.scala)
+example for more details):
+
+```scala
+val changes = ssc.receiverStream(new CloudantReceiver(Map(
+  "cloudant.host" -> "ACCOUNT.cloudant.com",
+  "cloudant.username" -> "USERNAME",
+  "cloudant.password" -> "PASSWORD",
+  "database" -> "sales",
+  "selector" -> "{\"month\":\"May\", \"rep\":\"John\"}")))
+```
diff --git a/site/docs/spark/2.3.4/spark-sql-streaming-akka.md 
b/site/docs/spark/2.3.4/spark-sql-streaming-akka.md
new file mode 100644
index 0000000..95e77e0
--- /dev/null
+++ b/site/docs/spark/2.3.4/spark-sql-streaming-akka.md
@@ -0,0 +1,137 @@
+---
+layout: page
+title: Spark Structured Streaming Akka
+description: Spark Structured Streaming Akka
+group: nav-right
+---
+<!--
+{% comment %}
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+{% endcomment %}
+-->
+
+{% include JB/setup %}
+A library for reading data from Akka Actors using Spark SQL Streaming ( or 
Structured streaming.). 
+
+## Linking
+
+Using SBT:
+
+    libraryDependencies += "org.apache.bahir" %% "spark-sql-streaming-akka" % 
"2.3.4"
+
+Using Maven:
+
+    <dependency>
+        <groupId>org.apache.bahir</groupId>
+        <artifactId>spark-sql-streaming-akka_2.11</artifactId>
+        <version>2.3.4</version>
+    </dependency>
+
+This library can also be added to Spark jobs launched through `spark-shell` or 
`spark-submit` by using the `--packages` command line option.
+For example, to include it when starting the spark shell:
+
+    $ bin/spark-shell --packages 
org.apache.bahir:spark-sql-streaming-akka_2.11:2.3.4
+
+Unlike using `--jars`, using `--packages` ensures that this library and its 
dependencies will be added to the classpath.
+The `--packages` argument can also be used with `bin/spark-submit`.
+
+This library is compiled for Scala 2.11 only, and intends to support Spark 2.0 
onwards.
+
+## Examples
+
+A SQL Stream can be created with data streams received from Akka Feeder actor 
using,
+
+        sqlContext.readStream
+                
.format("org.apache.bahir.sql.streaming.akka.AkkaStreamSourceProvider")
+                .option("urlOfPublisher", "feederActorUri")
+                .load()
+                
+## Enable recovering from failures.
+                
+Setting values for option `persistenceDirPath` helps in recovering in case of 
a restart, by restoring the state where it left off before the shutdown.
+                
+        sqlContext.readStream
+                
.format("org.apache.bahir.sql.streaming.akka.AkkaStreamSourceProvider")
+                .option("urlOfPublisher", "feederActorUri")
+                .option("persistenceDirPath", "/path/to/localdir")
+                .load() 
+                       
+## Configuration options.
+                       
+This source uses [Akka Actor 
api](http://doc.akka.io/api/akka/2.5/akka/actor/Actor.html).
+                       
+* `urlOfPublisher` The url of Publisher or Feeder actor that the Receiver 
actor connects to. Set this as the tcp url of the Publisher or Feeder actor.
+* `persistenceDirPath` By default it is used for storing incoming messages on 
disk.
+
+### Scala API
+
+An example, for scala API to count words from incoming message stream. 
+
+        // Create DataFrame representing the stream of input lines from 
connection
+        // to publisher or feeder actor
+        val lines = spark.readStream
+                    
.format("org.apache.bahir.sql.streaming.akka.AkkaStreamSourceProvider")
+                    .option("urlOfPublisher", urlOfPublisher)
+                    .load().as[(String, Timestamp)]
+    
+        // Split the lines into words
+        val words = lines.map(_._1).flatMap(_.split(" "))
+    
+        // Generate running word count
+        val wordCounts = words.groupBy("value").count()
+    
+        // Start running the query that prints the running counts to the 
console
+        val query = wordCounts.writeStream
+                    .outputMode("complete")
+                    .format("console")
+                    .start()
+    
+        query.awaitTermination()
+        
+Please see `AkkaStreamWordCount.scala` for full example.     
+   
+### Java API
+   
+An example, for Java API to count words from incoming message stream.
+   
+        // Create DataFrame representing the stream of input lines from 
connection
+        // to publisher or feeder actor
+        Dataset<String> lines = spark
+                                .readStream()
+                                
.format("org.apache.bahir.sql.streaming.akka.AkkaStreamSourceProvider")
+                                .option("urlOfPublisher", urlOfPublisher)
+                                .load().select("value").as(Encoders.STRING());
+    
+        // Split the lines into words
+        Dataset<String> words = lines.flatMap(new FlatMapFunction<String, 
String>() {
+          @Override
+          public Iterator<String> call(String s) throws Exception {
+            return Arrays.asList(s.split(" ")).iterator();
+          }
+        }, Encoders.STRING());
+    
+        // Generate running word count
+        Dataset<Row> wordCounts = words.groupBy("value").count();
+    
+        // Start running the query that prints the running counts to the 
console
+        StreamingQuery query = wordCounts.writeStream()
+                                .outputMode("complete")
+                                .format("console")
+                                .start();
+    
+        query.awaitTermination();   
+         
+Please see `JavaAkkaStreamWordCount.java` for full example.      
diff --git a/site/docs/spark/2.3.4/spark-sql-streaming-mqtt.md 
b/site/docs/spark/2.3.4/spark-sql-streaming-mqtt.md
new file mode 100644
index 0000000..fe4d7cc
--- /dev/null
+++ b/site/docs/spark/2.3.4/spark-sql-streaming-mqtt.md
@@ -0,0 +1,218 @@
+---
+layout: page
+title: Spark Structured Streaming MQTT
+description: Spark Structured Streaming MQTT
+group: nav-right
+---
+<!--
+{% comment %}
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+{% endcomment %}
+-->
+
+{% include JB/setup %}
+
+A library for writing and reading data from MQTT Servers using Spark SQL 
Streaming (or Structured streaming).
+
+## Linking
+
+Using SBT:
+
+    libraryDependencies += "org.apache.bahir" %% "spark-sql-streaming-mqtt" % 
"2.3.4"
+
+Using Maven:
+
+    <dependency>
+        <groupId>org.apache.bahir</groupId>
+        <artifactId>spark-sql-streaming-mqtt_2.11</artifactId>
+        <version>2.3.4</version>
+    </dependency>
+
+This library can also be added to Spark jobs launched through `spark-shell` or 
`spark-submit` by using the `--packages` command line option.
+For example, to include it when starting the spark shell:
+
+    $ bin/spark-shell --packages 
org.apache.bahir:spark-sql-streaming-mqtt_2.11:2.3.4
+
+Unlike using `--jars`, using `--packages` ensures that this library and its 
dependencies will be added to the classpath.
+The `--packages` argument can also be used with `bin/spark-submit`.
+
+This library is compiled for Scala 2.11 only, and intends to support Spark 2.0 
onwards.
+
+## Examples
+
+SQL Stream can be created with data streams received through MQTT Server using:
+
+    sqlContext.readStream
+        .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
+        .option("topic", "mytopic")
+        .load("tcp://localhost:1883")
+
+SQL Stream may be also transferred into MQTT messages using:
+
+    sqlContext.writeStream
+        .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSinkProvider")
+        .option("checkpointLocation", "/path/to/localdir")
+        .outputMode("complete")
+        .option("topic", "mytopic")
+        .load("tcp://localhost:1883")
+
+## Source recovering from failures
+
+Setting values for option `localStorage` and `clientId` helps in recovering in 
case of source restart, by restoring the state where it left off before the 
shutdown.
+
+    sqlContext.readStream
+        .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
+        .option("topic", "mytopic")
+        .option("localStorage", "/path/to/localdir")
+        .option("clientId", "some-client-id")
+        .load("tcp://localhost:1883")
+
+## Configuration options
+
+This connector uses [Eclipse Paho Java 
Client](https://eclipse.org/paho/clients/java/). Client API documentation is 
located [here](http://www.eclipse.org/paho/files/javadoc/index.html).
+
+ * `brokerUrl` An URL MqttClient connects to. Set this or `path` as the URL of 
the Mqtt Server. e.g. tcp://localhost:1883.
+ * `persistence` By default it is used for storing incoming messages on disk. 
If `memory` is provided as value for this option, then recovery on restart is 
not supported.
+ * `topic` Topic MqttClient subscribes to.
+ * `clientId` clientId, this client is associated with. Provide the same value 
to recover a stopped source client. MQTT sink ignores client identifier, 
because Spark batch can be distributed across multiple workers whereas MQTT 
broker does not allow simultanous connections with same ID from multiple hosts.
+ * `QoS` The maximum quality of service to subscribe each topic at. Messages 
published at a lower quality of service will be received at the published QoS. 
Messages published at a higher quality of service will be received using the 
QoS specified on the subscribe.
+ * `username` Sets the user name to use for the connection to Mqtt Server. Do 
not set it, if server does not need this. Setting it empty will lead to errors.
+ * `password` Sets the password to use for the connection.
+ * `cleanSession` Setting it true starts a clean session, removes all 
checkpointed messages by a previous run of this source. This is set to false by 
default.
+ * `connectionTimeout` Sets the connection timeout, a value of 0 is 
interpretted as wait until client connects. See 
`MqttConnectOptions.setConnectionTimeout` for more information.
+ * `keepAlive` Same as `MqttConnectOptions.setKeepAliveInterval`.
+ * `mqttVersion` Same as `MqttConnectOptions.setMqttVersion`.
+ * `maxInflight` Same as `MqttConnectOptions.setMaxInflight`
+ * `autoReconnect` Same as `MqttConnectOptions.setAutomaticReconnect`
+
+## Environment variables
+
+Custom environment variables allowing to manage MQTT connectivity performed by 
sink connector:
+
+ * `spark.mqtt.client.connect.attempts` Number of attempts sink will try to 
connect to MQTT broker before failing.
+ * `spark.mqtt.client.connect.backoff` Delay in milliseconds to wait before 
retrying connection to the server.
+ * `spark.mqtt.connection.cache.timeout` Sink connector caches MQTT 
connections. Idle connections will be closed after timeout milliseconds.
+ * `spark.mqtt.client.publish.attempts` Number of attempts to publish the 
message before failing the task.
+ * `spark.mqtt.client.publish.backoff` Delay in milliseconds to wait before 
retrying send operation.
+
+### Scala API
+
+An example, for scala API to count words from incoming message stream. 
+
+    // Create DataFrame representing the stream of input lines from connection 
to mqtt server
+    val lines = spark.readStream
+      .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
+      .option("topic", topic)
+      .load(brokerUrl).selectExpr("CAST(payload AS STRING)").as[String]
+
+    // Split the lines into words
+    val words = lines.map(_._1).flatMap(_.split(" "))
+
+    // Generate running word count
+    val wordCounts = words.groupBy("value").count()
+
+    // Start running the query that prints the running counts to the console
+    val query = wordCounts.writeStream
+      .outputMode("complete")
+      .format("console")
+      .start()
+
+    query.awaitTermination()
+
+Please see `MQTTStreamWordCount.scala` for full example. Review 
`MQTTSinkWordCount.scala`, if interested in publishing data to MQTT broker.
+
+### Java API
+
+An example, for Java API to count words from incoming message stream. 
+
+    // Create DataFrame representing the stream of input lines from connection 
to mqtt server.
+    Dataset<String> lines = spark
+            .readStream()
+            
.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
+            .option("topic", topic)
+            .load(brokerUrl)
+            .selectExpr("CAST(payload AS STRING)").as(Encoders.STRING());
+
+    // Split the lines into words
+    Dataset<String> words = lines.flatMap(new FlatMapFunction<String, 
String>() {
+        @Override
+        public Iterator<String> call(String x) {
+            return Arrays.asList(x.split(" ")).iterator();
+        }
+    }, Encoders.STRING());
+
+    // Generate running word count
+    Dataset<Row> wordCounts = words.groupBy("value").count();
+
+    // Start running the query that prints the running counts to the console
+    StreamingQuery query = wordCounts.writeStream()
+            .outputMode("complete")
+            .format("console")
+            .start();
+
+    query.awaitTermination();
+
+Please see `JavaMQTTStreamWordCount.java` for full example. Review 
`JavaMQTTSinkWordCount.java`, if interested in publishing data to MQTT broker.
+
+## Best Practices.
+
+1. Turn Mqtt into a more reliable messaging service. 
+
+> *MQTT is a machine-to-machine (M2M)/"Internet of Things" connectivity 
protocol. It was designed as an extremely lightweight publish/subscribe 
messaging transport.*
+
+The design of Mqtt and the purpose it serves goes well together, but often in 
an application it is of utmost value to have reliability. Since mqtt is not a 
distributed message queue and thus does not offer the highest level of 
reliability features. It should be redirected via a kafka message queue to take 
advantage of a distributed message queue. In fact, using a kafka message queue 
offers a lot of possibilities including a single kafka topic subscribed to 
several mqtt sources and even a [...]
+
+2. Often the message payload is not of the default character encoding or 
contains binary that needs to be parsed using a particular parser. In such 
cases, spark mqtt payload should be processed using the external parser. For 
example:
+
+ * Scala API example:
+```scala
+    // Create DataFrame representing the stream of binary messages
+    val lines = spark.readStream
+      .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
+      .option("topic", topic)
+      .load(brokerUrl).select("payload").as[Array[Byte]].map(externalParser(_))
+```
+
+ * Java API example
+```java
+        // Create DataFrame representing the stream of binary messages
+        Dataset<byte[]> lines = spark
+                .readStream()
+                
.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
+                .option("topic", topic)
+                .load(brokerUrl).selectExpr("CAST(payload AS 
BINARY)").as(Encoders.BINARY());
+
+        // Split the lines into words
+        Dataset<String> words = lines.map(new MapFunction<byte[], String>() {
+            @Override
+            public String call(byte[] bytes) throws Exception {
+                return new String(bytes); // Plug in external parser here.
+            }
+        }, Encoders.STRING()).flatMap(new FlatMapFunction<String, String>() {
+            @Override
+            public Iterator<String> call(String x) {
+                return Arrays.asList(x.split(" ")).iterator();
+            }
+        }, Encoders.STRING());
+
+```
+
+3. What is the solution for a situation when there are a large number of 
varied mqtt sources, each with different schema and throughput characteristics.
+
+Generally, one would create a lot of streaming pipelines to solve this 
problem. This would either require a very sophisticated scheduling setup or 
will waste a lot of resources, as it is not certain which stream is using more 
amount of data.
+
+The general solution is both less optimum and is more cumbersome to operate, 
with multiple moving parts incurs a high maintenance overall. As an 
alternative, in this situation, one can setup a single topic kafka-spark 
stream, where message from each of the varied stream contains a unique tag 
separating one from other streams. This way at the processing end, one can 
distinguish the message from one another and apply the right kind of decoding 
and processing. Similarly while storing, each  [...]
+
diff --git a/site/docs/spark/2.3.4/spark-streaming-akka.md 
b/site/docs/spark/2.3.4/spark-streaming-akka.md
new file mode 100644
index 0000000..1ee3110
--- /dev/null
+++ b/site/docs/spark/2.3.4/spark-streaming-akka.md
@@ -0,0 +1,89 @@
+---
+layout: page
+title: Spark Streaming Akka
+description: Spark Streaming Akka
+group: nav-right
+---
+<!--
+{% comment %}
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+{% endcomment %}
+-->
+
+{% include JB/setup %}
+
+A library for reading data from Akka Actors using Spark Streaming. 
+
+## Linking
+
+Using SBT:
+
+    libraryDependencies += "org.apache.bahir" %% "spark-streaming-akka" % 
"2.3.4"
+
+Using Maven:
+
+    <dependency>
+        <groupId>org.apache.bahir</groupId>
+        <artifactId>spark-streaming-akka_2.11</artifactId>
+        <version>2.3.4</version>
+    </dependency>
+
+This library can also be added to Spark jobs launched through `spark-shell` or 
`spark-submit` by using the `--packages` command line option.
+For example, to include it when starting the spark shell:
+
+    $ bin/spark-shell --packages 
org.apache.bahir:spark-streaming-akka_2.11:2.3.4
+
+Unlike using `--jars`, using `--packages` ensures that this library and its 
dependencies will be added to the classpath.
+The `--packages` argument can also be used with `bin/spark-submit`.
+
+This library is cross-published for Scala 2.10 and Scala 2.11, so users should 
replace the proper Scala version (2.10 or 2.11) in the commands listed above.
+
+## Examples
+
+DStreams can be created with data streams received through Akka actors by 
using `AkkaUtils.createStream(ssc, actorProps, actor-name)`.
+
+### Scala API
+
+You need to extend `ActorReceiver` so as to store received data into Spark 
using `store(...)` methods. The supervisor strategy of
+this actor can be configured to handle failures, etc.
+
+    class CustomActor extends ActorReceiver {
+      def receive = {
+        case data: String => store(data)
+      }
+    }
+
+    // A new input stream can be created with this custom actor as
+    val ssc: StreamingContext = ...
+    val lines = AkkaUtils.createStream[String](ssc, Props[CustomActor](), 
"CustomReceiver")
+
+### Java API
+
+You need to extend `JavaActorReceiver` so as to store received data into Spark 
using `store(...)` methods. The supervisor strategy of
+this actor can be configured to handle failures, etc.
+
+    class CustomActor extends JavaActorReceiver {
+        @Override
+        public void onReceive(Object msg) throws Exception {
+            store((String) msg);
+        }
+    }
+
+    // A new input stream can be created with this custom actor as
+    JavaStreamingContext jssc = ...;
+    JavaDStream<String> lines = AkkaUtils.<String>createStream(jssc, 
Props.create(CustomActor.class), "CustomReceiver");
+
+See end-to-end examples at [Akka 
Examples](https://github.com/apache/bahir/tree/master/streaming-akka/examples)
diff --git a/site/docs/spark/2.3.4/spark-streaming-mqtt.md 
b/site/docs/spark/2.3.4/spark-streaming-mqtt.md
new file mode 100644
index 0000000..f2296e8
--- /dev/null
+++ b/site/docs/spark/2.3.4/spark-streaming-mqtt.md
@@ -0,0 +1,116 @@
+---
+layout: page
+title: Spark Structured Streaming MQTT
+description: Spark Structured Streaming MQTT
+group: nav-right
+---
+<!--
+{% comment %}
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+{% endcomment %}
+-->
+
+{% include JB/setup %}
+
+
+[MQTT](http://mqtt.org/) is MQTT is a machine-to-machine (M2M)/"Internet of 
Things" connectivity protocol. It was designed as an extremely lightweight 
publish/subscribe messaging transport. It is useful for connections with remote 
locations where a small code footprint is required and/or network bandwidth is 
at a premium. 
+
+## Linking
+
+Using SBT:
+
+    libraryDependencies += "org.apache.bahir" %% "spark-streaming-mqtt" % 
"2.3.4"
+
+Using Maven:
+
+    <dependency>
+        <groupId>org.apache.bahir</groupId>
+        <artifactId>spark-streaming-mqtt_2.11</artifactId>
+        <version>2.3.4</version>
+    </dependency>
+
+This library can also be added to Spark jobs launched through `spark-shell` or 
`spark-submit` by using the `--packages` command line option.
+For example, to include it when starting the spark shell:
+
+    $ bin/spark-shell --packages 
org.apache.bahir:spark-streaming-mqtt_2.11:2.3.4
+
+Unlike using `--jars`, using `--packages` ensures that this library and its 
dependencies will be added to the classpath.
+The `--packages` argument can also be used with `bin/spark-submit`.
+
+This library is cross-published for Scala 2.10 and Scala 2.11, so users should 
replace the proper Scala version (2.10 or 2.11) in the commands listed above.
+
+## Configuration options.
+
+This source uses the [Eclipse Paho Java 
Client](https://eclipse.org/paho/clients/java/). Client API documentation is 
located [here](http://www.eclipse.org/paho/files/javadoc/index.html).
+
+ * `brokerUrl` A url MqttClient connects to. Set this as the url of the Mqtt 
Server. e.g. tcp://localhost:1883.
+ * `storageLevel` By default it is used for storing incoming messages on disk.
+ * `topic` Topic MqttClient subscribes to.
+ * `topics` List of topics MqttClient subscribes to.
+ * `clientId` clientId, this client is assoicated with. Provide the same value 
to recover a stopped client.
+ * `QoS` The maximum quality of service to subscribe each topic at. Messages 
published at a lower quality of service will be received at the published QoS. 
Messages published at a higher quality of service will be received using the 
QoS specified on the subscribe.
+ * `username` Sets the user name to use for the connection to Mqtt Server. Do 
not set it, if server does not need this. Setting it empty will lead to errors.
+ * `password` Sets the password to use for the connection.
+ * `cleanSession` Setting it true starts a clean session, removes all 
checkpointed messages by a previous run of this source. This is set to false by 
default.
+ * `connectionTimeout` Sets the connection timeout, a value of 0 is 
interpreted as wait until client connects. See 
`MqttConnectOptions.setConnectionTimeout` for more information.
+ * `keepAlive` Same as `MqttConnectOptions.setKeepAliveInterval`.
+ * `mqttVersion` Same as `MqttConnectOptions.setMqttVersion`.
+
+
+## Examples
+
+### Scala API
+
+You need to extend `ActorReceiver` so as to store received data into Spark 
using `store(...)` methods. The supervisor strategy of
+this actor can be configured to handle failures, etc.
+
+    val lines = MQTTUtils.createStream(ssc, brokerUrl, topic)
+    val lines = MQTTUtils.createPairedStream(ssc, brokerUrl, topic)
+    val lines = MQTTUtils.createPairedByteArrayStream(ssc, brokerUrl, topic)
+
+Additional mqtt connection options can be provided:
+
+```Scala
+val lines = MQTTUtils.createStream(ssc, brokerUrl, topic, storageLevel, 
clientId, username, password, cleanSession, qos, connectionTimeout, 
keepAliveInterval, mqttVersion)
+val lines = MQTTUtils.createPairedStream(ssc, brokerUrl, topics, storageLevel, 
clientId, username, password, cleanSession, qos, connectionTimeout, 
keepAliveInterval, mqttVersion)
+val lines = MQTTUtils.createPairedByteArrayStream(ssc, brokerUrl, topics, 
storageLevel, clientId, username, password, cleanSession, qos, 
connectionTimeout, keepAliveInterval, mqttVersion)
+```
+
+### Java API
+
+You need to extend `JavaActorReceiver` so as to store received data into Spark 
using `store(...)` methods. The supervisor strategy of
+this actor can be configured to handle failures, etc.
+
+    JavaDStream<String> lines = MQTTUtils.createStream(jssc, brokerUrl, topic);
+    JavaReceiverInputDStream<Tuple2<String, String>> lines = 
MQTTUtils.createPairedStream(jssc, brokerUrl, topics);
+    JavaReceiverInputDStream<Tuple2<String, String>> lines = 
MQTTUtils.createPairedByteArrayStream(jssc, brokerUrl, topics);
+
+See end-to-end examples at [MQTT 
Examples](https://github.com/apache/bahir/tree/master/streaming-mqtt/examples)
+
+
+### Python API
+
+Create a DStream from a single topic.
+
+```Python
+       MQTTUtils.createStream(ssc, broker_url, topic)
+```
+
+Create a DStream from a list of topics.
+
+```Python
+       MQTTUtils.createPairedStream(ssc, broker_url, topics)
+```
\ No newline at end of file
diff --git a/site/docs/spark/2.3.4/spark-streaming-pubnub.md 
b/site/docs/spark/2.3.4/spark-streaming-pubnub.md
new file mode 100644
index 0000000..6065206
--- /dev/null
+++ b/site/docs/spark/2.3.4/spark-streaming-pubnub.md
@@ -0,0 +1,103 @@
+---
+layout: page
+title: Spark Streaming Google Pub-Sub
+description: Spark Streaming Google Pub-Sub
+group: nav-right
+---
+<!--
+{% comment %}
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+{% endcomment %}
+-->
+
+{% include JB/setup %}
+# Spark Streaming PubNub Connector
+
+Library for reading data from real-time messaging infrastructure 
[PubNub](https://www.pubnub.com/) using Spark Streaming.
+
+## Linking
+
+Using SBT:
+
+    libraryDependencies += "org.apache.bahir" %% "spark-streaming-pubnub" % 
"2.3.4"
+
+Using Maven:
+
+    <dependency>
+        <groupId>org.apache.bahir</groupId>
+        <artifactId>spark-streaming-pubnub_2.11</artifactId>
+        <version>2.3.4</version>
+    </dependency>
+
+This library can also be added to Spark jobs launched through `spark-shell` or 
`spark-submit` by using the `--packages` command line option.
+For example, to include it when starting the spark shell:
+
+    $ bin/spark-shell --packages 
org.apache.bahir:spark-streaming-pubnub_2.11:2.3.4
+
+Unlike using `--jars`, using `--packages` ensures that this library and its 
dependencies will be added to the classpath.
+The `--packages` argument can also be used with `bin/spark-submit`.
+
+## Examples
+
+Connector leverages official Java client for PubNub cloud infrastructure. You 
can import the `PubNubUtils`
+class and create input stream by calling `PubNubUtils.createStream()` as shown 
below. Security and performance related
+features shall be setup inside standard `PNConfiguration` object. We advise to 
configure reconnection policy so that
+temporary network outages do not interrupt processing job. Users may subscribe 
to multiple channels and channel groups,
+as well as specify time token to start receiving messages since given point in 
time.
+
+For complete code examples, please review _examples_ directory.
+
+### Scala API
+
+    import com.pubnub.api.PNConfiguration
+    import com.pubnub.api.enums.PNReconnectionPolicy
+
+    import org.apache.spark.streaming.pubnub.{PubNubUtils, SparkPubNubMessage}
+
+    val config = new PNConfiguration
+    config.setSubscribeKey(subscribeKey)
+    config.setSecure(true)
+    config.setReconnectionPolicy(PNReconnectionPolicy.LINEAR)
+    val channel = "my-channel"
+
+    val pubNubStream: ReceiverInputDStream[SparkPubNubMessage] = 
PubNubUtils.createStream(
+      ssc, config, Seq(channel), Seq(), None, 
StorageLevel.MEMORY_AND_DISK_SER_2
+    )
+
+### Java API
+
+    import com.pubnub.api.PNConfiguration
+    import com.pubnub.api.enums.PNReconnectionPolicy
+
+    import org.apache.spark.streaming.pubnub.PubNubUtils
+    import org.apache.spark.streaming.pubnub.SparkPubNubMessage
+
+    PNConfiguration config = new PNConfiguration()
+    config.setSubscribeKey(subscribeKey)
+    config.setSecure(true)
+    config.setReconnectionPolicy(PNReconnectionPolicy.LINEAR)
+    Set<String> channels = new HashSet<String>() {
+        add("my-channel");
+    };
+
+    ReceiverInputDStream<SparkPubNubMessage> pubNubStream = 
PubNubUtils.createStream(
+      ssc, config, channels, Collections.EMPTY_SET, null,
+      StorageLevel.MEMORY_AND_DISK_SER_2()
+    )
+
+## Unit Test
+
+Unit tests take advantage of publicly available _demo_ subscription and and 
publish key, which has limited request rate.
diff --git a/site/docs/spark/2.3.4/spark-streaming-pubsub.md 
b/site/docs/spark/2.3.4/spark-streaming-pubsub.md
new file mode 100644
index 0000000..2975d91
--- /dev/null
+++ b/site/docs/spark/2.3.4/spark-streaming-pubsub.md
@@ -0,0 +1,96 @@
+---
+layout: page
+title: Spark Streaming PubNub
+description: Spark Streaming PubNub
+group: nav-right
+---
+<!--
+{% comment %}
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+{% endcomment %}
+-->
+
+{% include JB/setup %}
+A library for reading data from [Google Cloud 
Pub/Sub](https://cloud.google.com/pubsub/) using Spark Streaming.
+
+## Linking
+
+Using SBT:
+    
+    libraryDependencies += "org.apache.bahir" %% "spark-streaming-pubsub" % 
"2.3.4"
+    
+Using Maven:
+    
+    <dependency>
+        <groupId>org.apache.bahir</groupId>
+        <artifactId>spark-streaming-pubsub_2.11</artifactId>
+        <version>2.3.4</version>
+    </dependency>
+
+This library can also be added to Spark jobs launched through `spark-shell` or 
`spark-submit` by using the `--packages` command line option.
+For example, to include it when starting the spark shell:
+
+    $ bin/spark-shell --packages 
org.apache.bahir:spark-streaming-pubsub_2.11:2.3.4
+
+Unlike using `--jars`, using `--packages` ensures that this library and its 
dependencies will be added to the classpath.
+The `--packages` argument can also be used with `bin/spark-submit`.
+
+## Examples
+
+First you need to create credential by SparkGCPCredentials, it support four 
type of credentials
+* application default
+    `SparkGCPCredentials.builder.build()`
+* json type service account
+    `SparkGCPCredentials.builder.jsonServiceAccount(PATH_TO_JSON_KEY).build()`
+* p12 type service account
+    `SparkGCPCredentials.builder.p12ServiceAccount(PATH_TO_P12_KEY, 
EMAIL_ACCOUNT).build()`
+* metadata service account(running on dataproc)
+    `SparkGCPCredentials.builder.metadataServiceAccount().build()`
+
+### Scala API
+    
+    val lines = PubsubUtils.createStream(ssc, projectId, subscriptionName, 
credential, ..)
+    
+### Java API
+    
+    JavaDStream<SparkPubsubMessage> lines = PubsubUtils.createStream(jssc, 
projectId, subscriptionName, credential...) 
+
+See end-to-end examples at [Google Cloud Pubsub 
Examples](streaming-pubsub/examples)
+
+### Unit Test
+
+To run the PubSub test cases, you need to generate **Google API service 
account key files** and set the corresponding environment variable to enable 
the test.
+
+#### To generate a service account key file with PubSub permission
+
+1. Go to [Google API Console](console.cloud.google.com)
+2. Choose the `Credentials` Tab> `Create credentials` button> `Service account 
key`
+3. Fill the account name, assign `Role> Pub/Sub> Pub/Sub Editor` and check the 
option `Furnish a private key` to create one. You need to create one for JSON 
key file, another for P12.
+4. The account email is the `Service account ID`
+
+#### Setting the environment variables and run test
+
+```
+mvn clean package -DskipTests -pl streaming-pubsub
+
+export ENABLE_PUBSUB_TESTS=1
+export GCP_TEST_ACCOUNT="THE_P12_SERVICE_ACCOUNT_ID_MENTIONED_ABOVE"
+export GCP_TEST_PROJECT_ID="YOUR_GCP_PROJECT_ID"
+export 
GCP_TEST_JSON_KEY_PATH=/path/to/pubsub/credential/files/Apache-Bahir-PubSub-1234abcd.json
+export 
GCP_TEST_P12_KEY_PATH=/path/to/pubsub/credential/files/Apache-Bahir-PubSub-5678efgh.p12
+
+mvn test -pl streaming-pubsub
+```
diff --git a/site/docs/spark/2.3.4/spark-streaming-twitter.md 
b/site/docs/spark/2.3.4/spark-streaming-twitter.md
new file mode 100644
index 0000000..4e60ec8
--- /dev/null
+++ b/site/docs/spark/2.3.4/spark-streaming-twitter.md
@@ -0,0 +1,74 @@
+---
+layout: page
+title: Spark Streaming Twitter
+description: Spark Streaming Twitter
+group: nav-right
+---
+<!--
+{% comment %}
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+{% endcomment %}
+-->
+
+{% include JB/setup %}
+
+A library for reading social data from [twitter](http://twitter.com/) using 
Spark Streaming. 
+
+## Linking
+
+Using SBT:
+
+    libraryDependencies += "org.apache.bahir" %% "spark-streaming-twitter" % 
"2.3.4"
+
+Using Maven:
+
+    <dependency>
+        <groupId>org.apache.bahir</groupId>
+        <artifactId>spark-streaming-twitter_2.11</artifactId>
+        <version>2.3.4</version>
+    </dependency>
+
+This library can also be added to Spark jobs launched through `spark-shell` or 
`spark-submit` by using the `--packages` command line option.
+For example, to include it when starting the spark shell:
+
+    $ bin/spark-shell --packages 
org.apache.bahir:spark-streaming-twitter_2.11:2.3.4
+
+Unlike using `--jars`, using `--packages` ensures that this library and its 
dependencies will be added to the classpath.
+The `--packages` argument can also be used with `bin/spark-submit`.
+
+This library is cross-published for Scala 2.10 and Scala 2.11, so users should 
replace the proper Scala version (2.10 or 2.11) in the commands listed above.
+
+
+## Examples
+
+`TwitterUtils` uses Twitter4j to get the public stream of tweets using 
[Twitter's Streaming API](https://dev.twitter.com/docs/streaming-apis). 
Authentication information
+can be provided by any of the 
[methods](http://twitter4j.org/en/configuration.html) supported by Twitter4J 
library. You can import the `TwitterUtils` class and create a DStream with 
`TwitterUtils.createStream` as shown below.
+
+### Scala API
+
+    import org.apache.spark.streaming.twitter._
+
+    TwitterUtils.createStream(ssc, None)
+
+### Java API
+
+    import org.apache.spark.streaming.twitter.*;
+
+    TwitterUtils.createStream(jssc);
+
+
+You can also either get the public stream, or get the filtered stream based on 
keywords. 
+See end-to-end examples at [Twitter 
Examples](https://github.com/apache/bahir/tree/master/streaming-twitter/examples)
\ No newline at end of file
diff --git a/site/docs/spark/2.3.4/spark-streaming-zeromq.md 
b/site/docs/spark/2.3.4/spark-streaming-zeromq.md
new file mode 100644
index 0000000..31ab814
--- /dev/null
+++ b/site/docs/spark/2.3.4/spark-streaming-zeromq.md
@@ -0,0 +1,76 @@
+---
+layout: page
+title: Spark Streaming ZeroMQ
+description: Spark Streaming ZeroMQ
+group: nav-right
+---
+<!--
+{% comment %}
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+{% endcomment %}
+-->
+
+{% include JB/setup %}
+# Spark Streaming ZeroMQ Connector
+
+A library for reading data from [ZeroMQ](http://zeromq.org/) using Spark 
Streaming. 
+
+## Linking
+
+Using SBT:
+
+    libraryDependencies += "org.apache.bahir" %% "spark-streaming-zeromq" % 
"2.3.4"
+
+Using Maven:
+
+    <dependency>
+        <groupId>org.apache.bahir</groupId>
+        <artifactId>spark-streaming-zeromq_2.11</artifactId>
+        <version>2.3.4</version>
+    </dependency>
+
+This library can also be added to Spark jobs launched through `spark-shell` or 
`spark-submit` by using the `--packages` command line option.
+For example, to include it when starting the spark shell:
+
+    $ bin/spark-shell --packages 
org.apache.bahir:spark-streaming-zeromq_2.11:2.3.4
+
+Unlike using `--jars`, using `--packages` ensures that this library and its 
dependencies will be added to the classpath.
+The `--packages` argument can also be used with `bin/spark-submit`.
+
+This library is cross-published for Scala 2.10 and Scala 2.11, so users should 
replace the proper Scala version (2.10 or 2.11) in the commands listed above.
+
+## Examples
+
+Review end-to-end examples at [ZeroMQ 
Examples](https://github.com/apache/bahir/tree/master/streaming-zeromq/examples).
+
+### Scala API
+
+    import org.apache.spark.streaming.zeromq.ZeroMQUtils
+
+    val lines = ZeroMQUtils.createTextStream(
+      ssc, "tcp://server:5555", true, Seq("my-topic".getBytes)
+    )
+
+### Java API
+
+    import org.apache.spark.storage.StorageLevel;
+    import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
+    import org.apache.spark.streaming.zeromq.ZeroMQUtils;
+
+    JavaReceiverInputDStream<String> test1 = ZeroMQUtils.createJavaStream(
+        ssc, "tcp://server:5555", true, Arrays.asList("my-topic.getBytes()),
+        StorageLevel.MEMORY_AND_DISK_SER_2()
+    );
\ No newline at end of file
diff --git a/site/docs/spark/overview.md b/site/docs/spark/overview.md
index 92b336f..bfde6fc 100644
--- a/site/docs/spark/overview.md
+++ b/site/docs/spark/overview.md
@@ -28,6 +28,7 @@ limitations under the License.
 ### Apache Bahir Extensions for Apache Spark
 
  - [Current - 2.4.0-SNAPSHOT](/docs/spark/current/documentation)
+ - [2.3.4](/docs/spark/2.3.4/documentation)
  - [2.3.3](/docs/spark/2.3.3/documentation)
  - [2.3.2](/docs/spark/2.3.2/documentation)
  - [2.3.1](/docs/spark/2.3.1/documentation)

Reply via email to