I am sharing this code snippet since I spent quite some time figuring it out and I couldn't find any examples online. Between the Kinesis documentation, tutorial on AWS site and other code snippets on the Internet, I was confused about structure/format of the messages that Spark fetches from Kinesis - base64 encoded, json, gzipped - which one first and what order.
I tested this on EMR-5.4.0, Amazon Hadoop 2.7.3 and Spark 2.1.0. Hope it helps others googling for similar info. I tried using Structured Streaming but (1) it's in Alpha and (2) despite including what I thought were all the dependencies, it complained of not finding DataSource.Kinesis. You probably do not need all the libs but I am just too lazy to redact ones you don't require for the snippet below :) import org.apache.spark.streaming.Duration import org.apache.spark.streaming.kinesis._ import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream import org.apache.spark._ import org.apache.spark.streaming._ import org.apache.spark.storage.StorageLevel import org.apache.spark.rdd.RDD import java.util.Base64 import org.apache.spark.sql.Row import org.apache.spark.sql.types._ import org.apache.spark.sql.functions.udf import org.apache.spark.sql.functions.explode import org.apache.commons.math3.stat.descriptive._ import java.io.File import java.net.InetAddress import scala.util.control.NonFatal import org.apache.spark.SparkFiles import org.apache.spark.sql.SaveMode import java.util.Properties; import org.json4s._ import org.json4s.jackson.JsonMethods._ import java.io.{ByteArrayOutputStream, ByteArrayInputStream} import java.util.zip.{GZIPOutputStream, GZIPInputStream} import scala.util.Try //sc.setLogLevel("INFO") val ssc = new StreamingContext(sc, Seconds(30)) val kinesisStreams = (0 until 2).map { i => KinesisUtils.createStream(ssc, "myApp", "cloudwatchlogs", "https://kinesis.us-east-1.amazonaws.com","us-east-1", InitialPositionInStream.LATEST , Seconds(30), StorageLevel.MEMORY_AND_DISK_2,"myId","mySecret") } val unionStreams = ssc.union(kinesisStreams) unionStreams.foreachRDD((rdd: RDD[Array[Byte]], time: Time) => { if(rdd.count() > 0) { val json = rdd.map(input => { val inputStream = new GZIPInputStream(new ByteArrayInputStream(input)) val record = scala.io.Source.fromInputStream(inputStream).mkString compact(render(parse(record))) }) val df = spark.sqlContext.read.json(json) val preDF = df.select($"logGroup",explode($"logEvents").as("events_flat")) val penDF = preDF.select($"logGroup",$"events_flat.extractedFields") val finalDF = penDF.select($"logGroup".as("cluster"),$"extractedFields.*") finalDF.printSchema() finalDF.show() } }) ssc.start -- Thanks, Tim