Re: How do I access the nested field in a dataframe, spark Streaming app... Please help.

2016-11-20 Thread shyla deshpande
Thanks Jon, great Learning resource.
Thanks Pandees,  addresses[0].city would work , but I want all the cities
not just from addresses[0].
Finally, I wrote the following function to get the cities.

def getCities(addresses: Seq[Address]) : String = {
  var cities:String = ""
  if (addresses.size > 0) {
cities = (for(a <- addresses) yield a.city.getOrElse("")).mkString(",")
//cities = addresses.foldLeft("")((str,addr) => str  +
addr.city.getOrElse(""))
  }
  cities
}

Great help. Thanks again


On Sun, Nov 20, 2016 at 1:10 PM, Jon Gregg <jonrgr...@gmail.com> wrote:

> In these cases it might help to just flatten the DataFrame.  Here's a
> helper function from the tutorial (scroll down to the "Flattening" header:
> https://docs.cloud.databricks.com/docs/latest/databricks_
> guide/index.html#04%20SQL,%20DataFrames%20%26%20Datasets/
> 02%20Introduction%20to%20DataFrames%20-%20scala.html
>
>
> On Sun, Nov 20, 2016 at 1:24 PM, pandees waran <pande...@gmail.com> wrote:
>
>> have you tried using "." access method?
>>
>> e.g:
>> ds1.select("name","addresses[0].element.city")
>>
>> On Sun, Nov 20, 2016 at 9:59 AM, shyla deshpande <
>> deshpandesh...@gmail.com> wrote:
>>
>>> The following my dataframe schema
>>>
>>> root
>>>  |-- name: string (nullable = true)
>>>  |-- addresses: array (nullable = true)
>>>  ||-- element: struct (containsNull = true)
>>>  |||-- street: string (nullable = true)
>>>  |||-- city: string (nullable = true)
>>>
>>> I want to output name and city. The following is my spark streaming app
>>> which outputs name and addresses, but I want name and cities in the output.
>>>
>>> object PersonConsumer {
>>>   import org.apache.spark.sql.{SQLContext, SparkSession}
>>>   import com.example.protos.demo._
>>>
>>>   def main(args : Array[String]) {
>>>
>>> val spark = SparkSession.builder.
>>>   master("local")
>>>   .appName("spark session example")
>>>   .getOrCreate()
>>>
>>> import spark.implicits._
>>>
>>> val ds1 = spark.readStream.format("kafka").
>>>   option("kafka.bootstrap.servers","localhost:9092").
>>>   option("subscribe","person").load()
>>>
>>> val ds2 = ds1.map(row=> row.getAs[Array[Byte]]("value"
>>> )).map(Person.parseFrom(_)).select($"name", $"addresses")
>>>
>>> ds2.printSchema()
>>>
>>> val query = ds2.writeStream
>>>   .outputMode("append")
>>>   .format("console")
>>>   .start()
>>>
>>> query.awaitTermination()
>>>   }
>>> }
>>>
>>> Appreciate your help. Thanks.
>>>
>>
>>
>>
>> --
>> Thanks,
>> Pandeeswaran
>>
>
>


How do I access the nested field in a dataframe, spark Streaming app... Please help.

2016-11-20 Thread shyla deshpande
The following my dataframe schema

root
 |-- name: string (nullable = true)
 |-- addresses: array (nullable = true)
 ||-- element: struct (containsNull = true)
 |||-- street: string (nullable = true)
 |||-- city: string (nullable = true)

I want to output name and city. The following is my spark streaming app
which outputs name and addresses, but I want name and cities in the output.

object PersonConsumer {
  import org.apache.spark.sql.{SQLContext, SparkSession}
  import com.example.protos.demo._

  def main(args : Array[String]) {

val spark = SparkSession.builder.
  master("local")
  .appName("spark session example")
  .getOrCreate()

import spark.implicits._

val ds1 = spark.readStream.format("kafka").
  option("kafka.bootstrap.servers","localhost:9092").
  option("subscribe","person").load()

val ds2 = ds1.map(row=>
row.getAs[Array[Byte]]("value")).map(Person.parseFrom(_)).select($"name",
$"addresses")

ds2.printSchema()

val query = ds2.writeStream
  .outputMode("append")
  .format("console")
  .start()

query.awaitTermination()
  }
}

Appreciate your help. Thanks.


Re: Spark 2.0.2 with Kafka source, Error please help!

2016-11-17 Thread shyla deshpande
Thanks Zhu, That was it. Now works great!

On Thu, Nov 17, 2016 at 1:07 PM, Shixiong(Ryan) Zhu <shixi...@databricks.com
> wrote:

> The problem is "optional Gender gender = 3;". The generated class "Gender"
> is a trait, and Spark cannot know how to create a trait so it's not
> supported. You can define your class which is supported by SQL Encoder, and
> convert this generated class to the new class in `parseLine`.
>
> On Wed, Nov 16, 2016 at 4:22 PM, shyla deshpande <deshpandesh...@gmail.com
> > wrote:
>
>> Ryan,
>>
>> I just wanted to provide more info. Here is my .proto file which is the
>> basis for generating the Person class. Thanks.
>>
>> option java_package = "com.example.protos";
>> enum Gender {
>> MALE = 1;
>> FEMALE = 2;
>> }
>> message Address {
>> optional string street = 1;
>> optional string city = 2;
>> }
>> message Person {
>> optional string name = 1;
>> optional int32 age = 2;
>> optional Gender gender = 3;
>> repeated string tags = 4;
>> repeated Address addresses = 5;
>> }
>>
>>
>> On Wed, Nov 16, 2016 at 3:04 PM, shyla deshpande <
>> deshpandesh...@gmail.com> wrote:
>>
>>> *Thanks for the response. Following is the Person class..*
>>>
>>> // Generated by the Scala Plugin for the Protocol Buffer Compiler.
>>> // Do not edit!
>>> //
>>> // Protofile syntax: PROTO2
>>>
>>> package com.example.protos.demo
>>>
>>>
>>>
>>> @SerialVersionUID(0L)
>>> final case class Person(
>>> name: scala.Option[String] = None,
>>> age: scala.Option[Int] = None,
>>> gender: scala.Option[com.example.protos.demo.Gender] = None,
>>> tags: scala.collection.Seq[String] = Nil,
>>> addresses: scala.collection.Seq[com.example.protos.demo.Address] = Nil
>>> ) extends com.trueaccord.scalapb.GeneratedMessage with 
>>> com.trueaccord.scalapb.Message[Person] with 
>>> com.trueaccord.lenses.Updatable[Person] {
>>> @transient
>>> private[this] var __serializedSizeCachedValue: Int = 0
>>> private[this] def __computeSerializedValue(): Int = {
>>>   var __size = 0
>>>   if (name.isDefined) { __size += 
>>> com.google.protobuf.CodedOutputStream.computeStringSize(1, name.get) }
>>>   if (age.isDefined) { __size += 
>>> com.google.protobuf.CodedOutputStream.computeInt32Size(2, age.get) }
>>>   if (gender.isDefined) { __size += 
>>> com.google.protobuf.CodedOutputStream.computeEnumSize(3, gender.get.value) }
>>>   tags.foreach(tags => __size += 
>>> com.google.protobuf.CodedOutputStream.computeStringSize(4, tags))
>>>   addresses.foreach(addresses => __size += 1 + 
>>> com.google.protobuf.CodedOutputStream.computeUInt32SizeNoTag(addresses.serializedSize)
>>>  + addresses.serializedSize)
>>>   __size
>>> }
>>> final override def serializedSize: Int = {
>>>   var read = __serializedSizeCachedValue
>>>   if (read == 0) {
>>> read = __computeSerializedValue()
>>> __serializedSizeCachedValue = read
>>>   }
>>>   read
>>> }
>>> def writeTo(`_output__`: com.google.protobuf.CodedOutputStream): Unit = 
>>> {
>>>   name.foreach { __v =>
>>> _output__.writeString(1, __v)
>>>   };
>>>   age.foreach { __v =>
>>> _output__.writeInt32(2, __v)
>>>   };
>>>   gender.foreach { __v =>
>>> _output__.writeEnum(3, __v.value)
>>>   };
>>>   tags.foreach { __v =>
>>> _output__.writeString(4, __v)
>>>   };
>>>   addresses.foreach { __v =>
>>> _output__.writeTag(5, 2)
>>> _output__.writeUInt32NoTag(__v.serializedSize)
>>> __v.writeTo(_output__)
>>>   };
>>> }
>>> def mergeFrom(`_input__`: com.google.protobuf.CodedInputStream): 
>>> com.example.protos.demo.Person = {
>>>   var __name = this.name
>>>   var __age = this.age
>>>   var __gender = this.gender
>>>   val __tags = (scala.collection.immutable.Vector.newBuilder[String] 
>>> ++= this.tags)
>>>   val __addresses = 
>>> (scala.collection.immutable.Vector.newBuilder[com.example.protos.demo.Address]
>>>  ++= 

Re: Spark 2.0.2, Structured Streaming with kafka source... Unable to parse the value to Object..

2016-11-17 Thread shyla deshpande
Hello everyone,
 The following code works ...

def main(args : Array[String]) {

  val spark = SparkSession.builder.
master("local")
.appName("spark session example")
.getOrCreate()

  import spark.implicits._

  val ds1 = spark.readStream.format("kafka").
option("kafka.bootstrap.servers","localhost:9092").
option("subscribe","student").load()

  val ds2 = ds1.map(row=>
row.getAs[Array[Byte]]("value")).map(Student.parseFrom(_))

  val query = ds2.writeStream
.outputMode("append")
.format("console")
.start()

  query.awaitTermination()

}


On Thu, Nov 17, 2016 at 11:30 AM, shyla deshpande <deshpandesh...@gmail.com>
wrote:

> val spark = SparkSession.builder.
>   master("local")
>   .appName("spark session example")
>   .getOrCreate()
>
> import spark.implicits._
>
> val dframe1 = spark.readStream.format("kafka").
>   option("kafka.bootstrap.servers","localhost:9092").
>   option("subscribe","student").load()
>
> *How do I deserialize the value column from dataframe1 *
>
> *which is Array[Byte] to Student object using Student.parseFrom..???*
>
> *Please help.*
>
> *Thanks.*
>
>
>
> // Stream of votes from Kafka as bytesval votesAsBytes = 
> KafkaUtils.createDirectStream[String, Array[Byte]](
>   ssc, LocationStrategies.PreferConsistent,
>   ConsumerStrategies.Subscribe[String, Array[Byte]](Array("votes"), 
> kafkaParams))
> // Parse them into Vote case class.val votes: DStream[Vote] = 
> votesAsBytes.map {
>   (cr: ConsumerRecord[String, Array[Byte]]) =>
> Vote.parseFrom(cr.value())}
>
>


Spark 2.0.2, Structured Streaming with kafka source... Unable to parse the value to Object..

2016-11-17 Thread shyla deshpande
val spark = SparkSession.builder.
  master("local")
  .appName("spark session example")
  .getOrCreate()

import spark.implicits._

val dframe1 = spark.readStream.format("kafka").
  option("kafka.bootstrap.servers","localhost:9092").
  option("subscribe","student").load()

*How do I deserialize the value column from dataframe1 *

*which is Array[Byte] to Student object using Student.parseFrom..???*

*Please help.*

*Thanks.*



// Stream of votes from Kafka as bytesval votesAsBytes =
KafkaUtils.createDirectStream[String, Array[Byte]](
  ssc, LocationStrategies.PreferConsistent,
  ConsumerStrategies.Subscribe[String, Array[Byte]](Array("votes"),
kafkaParams))
// Parse them into Vote case class.val votes: DStream[Vote] = votesAsBytes.map {
  (cr: ConsumerRecord[String, Array[Byte]]) =>
Vote.parseFrom(cr.value())}


Re: Spark 2.0.2 with Kafka source, Error please help!

2016-11-16 Thread shyla deshpande
Ryan,

I just wanted to provide more info. Here is my .proto file which is the
basis for generating the Person class. Thanks.

option java_package = "com.example.protos";
enum Gender {
MALE = 1;
FEMALE = 2;
}
message Address {
optional string street = 1;
optional string city = 2;
}
message Person {
optional string name = 1;
optional int32 age = 2;
optional Gender gender = 3;
repeated string tags = 4;
repeated Address addresses = 5;
}


On Wed, Nov 16, 2016 at 3:04 PM, shyla deshpande <deshpandesh...@gmail.com>
wrote:

> *Thanks for the response. Following is the Person class..*
>
> // Generated by the Scala Plugin for the Protocol Buffer Compiler.
> // Do not edit!
> //
> // Protofile syntax: PROTO2
>
> package com.example.protos.demo
>
>
>
> @SerialVersionUID(0L)
> final case class Person(
> name: scala.Option[String] = None,
> age: scala.Option[Int] = None,
> gender: scala.Option[com.example.protos.demo.Gender] = None,
> tags: scala.collection.Seq[String] = Nil,
> addresses: scala.collection.Seq[com.example.protos.demo.Address] = Nil
> ) extends com.trueaccord.scalapb.GeneratedMessage with 
> com.trueaccord.scalapb.Message[Person] with 
> com.trueaccord.lenses.Updatable[Person] {
> @transient
> private[this] var __serializedSizeCachedValue: Int = 0
> private[this] def __computeSerializedValue(): Int = {
>   var __size = 0
>   if (name.isDefined) { __size += 
> com.google.protobuf.CodedOutputStream.computeStringSize(1, name.get) }
>   if (age.isDefined) { __size += 
> com.google.protobuf.CodedOutputStream.computeInt32Size(2, age.get) }
>   if (gender.isDefined) { __size += 
> com.google.protobuf.CodedOutputStream.computeEnumSize(3, gender.get.value) }
>   tags.foreach(tags => __size += 
> com.google.protobuf.CodedOutputStream.computeStringSize(4, tags))
>   addresses.foreach(addresses => __size += 1 + 
> com.google.protobuf.CodedOutputStream.computeUInt32SizeNoTag(addresses.serializedSize)
>  + addresses.serializedSize)
>   __size
> }
> final override def serializedSize: Int = {
>   var read = __serializedSizeCachedValue
>   if (read == 0) {
> read = __computeSerializedValue()
> __serializedSizeCachedValue = read
>   }
>   read
> }
> def writeTo(`_output__`: com.google.protobuf.CodedOutputStream): Unit = {
>   name.foreach { __v =>
> _output__.writeString(1, __v)
>   };
>   age.foreach { __v =>
> _output__.writeInt32(2, __v)
>   };
>   gender.foreach { __v =>
> _output__.writeEnum(3, __v.value)
>   };
>   tags.foreach { __v =>
> _output__.writeString(4, __v)
>   };
>   addresses.foreach { __v =>
> _output__.writeTag(5, 2)
> _output__.writeUInt32NoTag(__v.serializedSize)
> __v.writeTo(_output__)
>   };
> }
> def mergeFrom(`_input__`: com.google.protobuf.CodedInputStream): 
> com.example.protos.demo.Person = {
>   var __name = this.name
>   var __age = this.age
>   var __gender = this.gender
>   val __tags = (scala.collection.immutable.Vector.newBuilder[String] ++= 
> this.tags)
>   val __addresses = 
> (scala.collection.immutable.Vector.newBuilder[com.example.protos.demo.Address]
>  ++= this.addresses)
>   var _done__ = false
>   while (!_done__) {
> val _tag__ = _input__.readTag()
> _tag__ match {
>   case 0 => _done__ = true
>   case 10 =>
> __name = Some(_input__.readString())
>   case 16 =>
> __age = Some(_input__.readInt32())
>   case 24 =>
> __gender = 
> Some(com.example.protos.demo.Gender.fromValue(_input__.readEnum()))
>   case 34 =>
> __tags += _input__.readString()
>   case 42 =>
> __addresses += 
> com.trueaccord.scalapb.LiteParser.readMessage(_input__, 
> com.example.protos.demo.Address.defaultInstance)
>   case tag => _input__.skipField(tag)
> }
>   }
>   com.example.protos.demo.Person(
>   name = __name,
>   age = __age,
>   gender = __gender,
>   tags = __tags.result(),
>   addresses = __addresses.result()
>   )
> }
> def getName: String = name.getOrElse("")
> def clearName: Person = copy(name = None)
> def withName(__v: String): Person = copy(name = Some(__v))
> def getAge: Int = age.getOrElse(0)
> def clearAge: Person = copy(age = None)
> def withAge(__v: Int): Person = copy(age = Some(__v))
>

Re: Spark 2.0.2 with Kafka source, Error please help!

2016-11-16 Thread shyla deshpande
case 3 => gender.map(_.valueDescriptor).getOrElse(null)
case 4 => tags
case 5 => addresses
  }
}
override def toString: String =
com.trueaccord.scalapb.TextFormat.printToUnicodeString(this)
def companion = com.example.protos.demo.Person
}

object Person extends
com.trueaccord.scalapb.GeneratedMessageCompanion[com.example.protos.demo.Person]
{
  implicit def messageCompanion:
com.trueaccord.scalapb.GeneratedMessageCompanion[com.example.protos.demo.Person]
= this
  def fromFieldsMap(__fieldsMap:
scala.collection.immutable.Map[com.google.protobuf.Descriptors.FieldDescriptor,
scala.Any]): com.example.protos.demo.Person = {
require(__fieldsMap.keys.forall(_.getContainingType() ==
descriptor), "FieldDescriptor does not match message type.")
val __fields = descriptor.getFields
com.example.protos.demo.Person(
  __fieldsMap.get(__fields.get(0)).asInstanceOf[scala.Option[String]],
  __fieldsMap.get(__fields.get(1)).asInstanceOf[scala.Option[Int]],
  
__fieldsMap.get(__fields.get(2)).asInstanceOf[scala.Option[com.google.protobuf.Descriptors.EnumValueDescriptor]].map(__e
=> com.example.protos.demo.Gender.fromValue(__e.getNumber)),
  __fieldsMap.getOrElse(__fields.get(3),
Nil).asInstanceOf[scala.collection.Seq[String]],
  __fieldsMap.getOrElse(__fields.get(4),
Nil).asInstanceOf[scala.collection.Seq[com.example.protos.demo.Address]]
)
  }
  def descriptor: com.google.protobuf.Descriptors.Descriptor =
DemoProto.descriptor.getMessageTypes.get(1)
  def messageCompanionForField(__field:
com.google.protobuf.Descriptors.FieldDescriptor):
com.trueaccord.scalapb.GeneratedMessageCompanion[_] = {
require(__field.getContainingType() == descriptor,
"FieldDescriptor does not match message type.")
var __out: com.trueaccord.scalapb.GeneratedMessageCompanion[_] = null
__field.getNumber match {
  case 5 => __out = com.example.protos.demo.Address
}
  __out
  }
  def enumCompanionForField(__field:
com.google.protobuf.Descriptors.FieldDescriptor):
com.trueaccord.scalapb.GeneratedEnumCompanion[_] = {
require(__field.getContainingType() == descriptor,
"FieldDescriptor does not match message type.")
__field.getNumber match {
  case 3 => com.example.protos.demo.Gender
}
  }
  lazy val defaultInstance = com.example.protos.demo.Person(
  )
  implicit class PersonLens[UpperPB](_l:
com.trueaccord.lenses.Lens[UpperPB, com.example.protos.demo.Person])
extends com.trueaccord.lenses.ObjectLens[UpperPB,
com.example.protos.demo.Person](_l) {
def name: com.trueaccord.lenses.Lens[UpperPB, String] =
field(_.getName)((c_, f_) => c_.copy(name = Some(f_)))
def optionalName: com.trueaccord.lenses.Lens[UpperPB,
scala.Option[String]] = field(_.name)((c_, f_) => c_.copy(name = f_))
def age: com.trueaccord.lenses.Lens[UpperPB, Int] =
field(_.getAge)((c_, f_) => c_.copy(age = Some(f_)))
def optionalAge: com.trueaccord.lenses.Lens[UpperPB,
scala.Option[Int]] = field(_.age)((c_, f_) => c_.copy(age = f_))
def gender: com.trueaccord.lenses.Lens[UpperPB,
com.example.protos.demo.Gender] = field(_.getGender)((c_, f_) =>
c_.copy(gender = Some(f_)))
def optionalGender: com.trueaccord.lenses.Lens[UpperPB,
scala.Option[com.example.protos.demo.Gender]] = field(_.gender)((c_,
f_) => c_.copy(gender = f_))
def tags: com.trueaccord.lenses.Lens[UpperPB,
scala.collection.Seq[String]] = field(_.tags)((c_, f_) => c_.copy(tags
= f_))
def addresses: com.trueaccord.lenses.Lens[UpperPB,
scala.collection.Seq[com.example.protos.demo.Address]] =
field(_.addresses)((c_, f_) => c_.copy(addresses = f_))
  }
  final val NAME_FIELD_NUMBER = 1
  final val AGE_FIELD_NUMBER = 2
  final val GENDER_FIELD_NUMBER = 3
  final val TAGS_FIELD_NUMBER = 4
  final val ADDRESSES_FIELD_NUMBER = 5
}


On Wed, Nov 16, 2016 at 1:28 PM, Shixiong(Ryan) Zhu <shixi...@databricks.com
> wrote:

> Could you provide the Person class?
>
> On Wed, Nov 16, 2016 at 1:19 PM, shyla deshpande <deshpandesh...@gmail.com
> > wrote:
>
>> I am using 2.11.8. Thanks
>>
>> On Wed, Nov 16, 2016 at 1:15 PM, Shixiong(Ryan) Zhu <
>> shixi...@databricks.com> wrote:
>>
>>> Which Scala version are you using? Is it Scala 2.10? Scala 2.10 has some
>>> known race conditions in reflection and the Scala community doesn't have
>>> plan to fix it (http://docs.scala-lang.org/ov
>>> erviews/reflection/thread-safety.html) AFAIK, the only way to fix it is
>>> upgrading to Scala 2.11.
>>>
>>> On Wed, Nov 16, 2016 at 11:16 AM, shyla deshpande <
>>> deshpandesh...@gmail.com> wrote:
>>>
>>>> I am using protobuf to encode. This may not be related to the new
>>>> release issue
>>>>
>>>> Exception in thread "

Re: Spark 2.0.2 with Kafka source, Error please help!

2016-11-16 Thread shyla deshpande
I am using 2.11.8. Thanks

On Wed, Nov 16, 2016 at 1:15 PM, Shixiong(Ryan) Zhu <shixi...@databricks.com
> wrote:

> Which Scala version are you using? Is it Scala 2.10? Scala 2.10 has some
> known race conditions in reflection and the Scala community doesn't have
> plan to fix it (http://docs.scala-lang.org/overviews/reflection/thread-
> safety.html) AFAIK, the only way to fix it is upgrading to Scala 2.11.
>
> On Wed, Nov 16, 2016 at 11:16 AM, shyla deshpande <
> deshpandesh...@gmail.com> wrote:
>
>> I am using protobuf to encode. This may not be related to the new release
>> issue
>>
>> Exception in thread "main" scala.ScalaReflectionException:  is not
>> a term
>> at scala.reflect.api.Symbols$SymbolApi$class.asTerm(Symbols.scala:199)
>> at scala.reflect.internal.Symbols$SymbolContextApiImpl.asTerm(
>> Symbols.scala:84)
>> at org.apache.spark.sql.catalyst.ScalaReflection$class.construc
>> tParams(ScalaReflection.scala:811)
>> at org.apache.spark.sql.catalyst.ScalaReflection$.constructPara
>> ms(ScalaReflection.scala:39)
>> at org.apache.spark.sql.catalyst.ScalaReflection$class.getConst
>> ructorParameters(ScalaReflection.scala:800)
>> at org.apache.spark.sql.catalyst.ScalaReflection$.getConstructo
>> rParameters(ScalaReflection.scala:39)
>> at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$sp
>> ark$sql$catalyst$ScalaReflection$$serializerFor(
>> ScalaReflection.scala:582)
>> at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$sp
>> ark$sql$catalyst$ScalaReflection$$serializerFor(
>> ScalaReflection.scala:460)
>> at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.app
>> ly(ScalaReflection.scala:592)
>> at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.app
>> ly(ScalaReflection.scala:583)
>> at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(Tr
>> aversableLike.scala:252)
>> at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(Tr
>> aversableLike.scala:252)
>> at scala.collection.immutable.List.foreach(List.scala:381)
>> at scala.collection.TraversableLike$class.flatMap(TraversableLi
>> ke.scala:252)
>> at scala.collection.immutable.List.flatMap(List.scala:344)
>> at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$sp
>> ark$sql$catalyst$ScalaReflection$$serializerFor(
>> ScalaReflection.scala:583)
>> at org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor
>> (ScalaReflection.scala:425)
>> at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.ap
>> ply(ExpressionEncoder.scala:61)
>> at org.apache.spark.sql.Encoders$.product(Encoders.scala:274)
>> at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImpli
>> cits.scala:47)
>> at PersonConsumer$.main(PersonConsumer.scala:33)
>> at PersonConsumer.main(PersonConsumer.scala)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>> ssorImpl.java:62)
>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>> thodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
>>
>> The following is my code ...
>>
>> object PersonConsumer {
>>   import org.apache.spark.rdd.RDD
>>   import com.trueaccord.scalapb.spark._
>>   import org.apache.spark.sql.{SQLContext, SparkSession}
>>   import com.example.protos.demo._
>>
>>   def main(args : Array[String]) {
>>
>> def parseLine(s: String): Person =
>>   Person.parseFrom(
>> org.apache.commons.codec.binary.Base64.decodeBase64(s))
>>
>> val spark = SparkSession.builder.
>>   master("local")
>>   .appName("spark session example")
>>   .getOrCreate()
>>
>> import spark.implicits._
>>
>> val ds1 = 
>> spark.readStream.format("kafka").option("kafka.bootstrap.servers","localhost:9092").option("subscribe","person").load()
>>
>> val ds2 = ds1.selectExpr("CAST(value AS STRING)").as[String]
>>
>> val ds3 = ds2.map(str => 
>> parseLine(str)).createOrReplaceTempView("persons")
>>
>> val ds4 = spark.sqlContext.sql("select name from persons")
>>
>> val query = ds4.writeStream
>>   .outputMode("append")
>>   .format("console")
>>   .start()
>> query.awaitTermination()
>>   }
>> }
>>
>>
>


Spark 2.0.2 with Kafka source, Error please help!

2016-11-16 Thread shyla deshpande
I am using protobuf to encode. This may not be related to the new release
issue

Exception in thread "main" scala.ScalaReflectionException:  is not a
term
at scala.reflect.api.Symbols$SymbolApi$class.asTerm(Symbols.scala:199)
at
scala.reflect.internal.Symbols$SymbolContextApiImpl.asTerm(Symbols.scala:84)
at
org.apache.spark.sql.catalyst.ScalaReflection$class.constructParams(ScalaReflection.scala:811)
at
org.apache.spark.sql.catalyst.ScalaReflection$.constructParams(ScalaReflection.scala:39)
at
org.apache.spark.sql.catalyst.ScalaReflection$class.getConstructorParameters(ScalaReflection.scala:800)
at
org.apache.spark.sql.catalyst.ScalaReflection$.getConstructorParameters(ScalaReflection.scala:39)
at
org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:582)
at
org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:460)
at
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:592)
at
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:583)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:252)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:252)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:252)
at scala.collection.immutable.List.flatMap(List.scala:344)
at
org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:583)
at
org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor(ScalaReflection.scala:425)
at
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:61)
at org.apache.spark.sql.Encoders$.product(Encoders.scala:274)
at
org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:47)
at PersonConsumer$.main(PersonConsumer.scala:33)
at PersonConsumer.main(PersonConsumer.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)

The following is my code ...

object PersonConsumer {
  import org.apache.spark.rdd.RDD
  import com.trueaccord.scalapb.spark._
  import org.apache.spark.sql.{SQLContext, SparkSession}
  import com.example.protos.demo._

  def main(args : Array[String]) {

def parseLine(s: String): Person =
  Person.parseFrom(
org.apache.commons.codec.binary.Base64.decodeBase64(s))

val spark = SparkSession.builder.
  master("local")
  .appName("spark session example")
  .getOrCreate()

import spark.implicits._

val ds1 = 
spark.readStream.format("kafka").option("kafka.bootstrap.servers","localhost:9092").option("subscribe","person").load()

val ds2 = ds1.selectExpr("CAST(value AS STRING)").as[String]

val ds3 = ds2.map(str => parseLine(str)).createOrReplaceTempView("persons")

val ds4 = spark.sqlContext.sql("select name from persons")

val query = ds4.writeStream
  .outputMode("append")
  .format("console")
  .start()
query.awaitTermination()
  }
}


Re: Akka Stream as the source for Spark Streaming. Please advice...

2016-11-12 Thread shyla deshpande
Is it OK to use ProtoBuf for sending messages to Kafka?  I do not see
anyone using it .

Please direct me to some code samples of how to use it in Spark Structured
streaming.

Thanks again..


On Sat, Nov 12, 2016 at 11:44 PM, shyla deshpande <deshpandesh...@gmail.com>
wrote:

> Thanks everyone. Very good discussion.
>
> Thanks Jacek, for the code snippet. I downloaded your Mastering Apache
> Spark pdf . I love it.
>
> I have one more question,
>
>
> On Sat, Nov 12, 2016 at 2:21 PM, Sean McKibben <grap...@graphex.com>
> wrote:
>
>> I think one of the advantages of using akka-streams within Spark is the
>> fact that it is a general purpose stream processing toolset with
>> backpressure, not necessarily specific to kafka. If things work out with
>> the approach, Spark could be a great benefit to use as a coordination
>> framework for discrete streams processed on each executor. I've been toying
>> with the idea of making essentially an RDD of task messages, where each
>> task becomes an akka stream which are materialized on multiple executors
>> and completed as that executor's 'task', allowing Spark to coordinate the
>> completion of the entire job. For example, I might make an RDD which is
>> just a set of URLs that I want to download and produce to Kafka, but let's
>> say I have so many URLs that i need to coordinate that work across many
>> servers. Using Spark with a forEachPartition block, I might set up an
>> akka-stream to accomplish that task in a backpressured, stream-oriented
>> way, so that I could have the entire Spark job complete when all of the
>> URLs had been produced to Kafka, using individual Akka Streams within each
>> executor.
>>
>> I realize that this is not the original question on this thread, and I
>> don't meant to hijack that. I am also interested in the potential of Akka
>> Stream sources for a Spark Streaming job directly, which could potentially
>> be adapted for both Kafka and non-kafka use cases, with the emphasis for me
>> being on use cases which aren't necessarily Kafka specific. There are some
>> portions which feel like a bit of a mismatch, but with Structured Streams,
>> I think there is greater opportunity for some kind of symbiotic adapter
>> layer on the input side of things. I think the Apache Gearpump
>> <https://gearpump.apache.org/overview.html> project in incubation may
>> demonstrate how this adaptation can be approached, and the nascent Alpakka
>> project <https://github.com/akka/alpakka> is an example of the generic
>> applications of Akka Streams.
>>
>> It is important to note that Akka Streams are billed as a toolbox and not
>> a framework, because they don't handle coordination of parallelism or
>> multi-host concurrency. I think Spark could end up being a very convenient
>> framework to handle this aspect of of a distributed application's
>> architecture. It may be able to do some of this without any modification to
>> either of these projects, but I haven't had the experience of actually
>> attempting the implementation yet.
>>
>>
>> On Nov 12, 2016, at 9:42 AM, Jacek Laskowski <ja...@japila.pl> wrote:
>>
>> Hi Luciano,
>>
>> Mind sharing why to have a structured streaming source/sink for Akka
>> if Kafka's available and Akka Streams has a Kafka module? #curious
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> 
>> https://medium.com/@jaceklaskowski/
>> Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark
>> Follow me at https://twitter.com/jaceklaskowski
>>
>>
>> On Sat, Nov 12, 2016 at 4:07 PM, Luciano Resende <luckbr1...@gmail.com>
>> wrote:
>>
>> If you are interested in Akka streaming, it is being maintained in Apache
>> Bahir. For Akka there isn't a structured streaming version yet, but we
>> would
>> be interested in collaborating in the structured streaming version for
>> sure.
>>
>> On Thu, Nov 10, 2016 at 8:46 AM shyla deshpande <deshpandesh...@gmail.com
>> >
>> wrote:
>>
>>
>> I am using Spark 2.0.1. I wanted to build a data pipeline using Kafka,
>> Spark Streaming and Cassandra using Structured Streaming. But the kafka
>> source support for Structured Streaming is not yet available. So now I am
>> trying to use Akka Stream as the source to Spark Streaming.
>>
>> Want to make sure I am heading in the right direction. Please direct me to
>> any sample code and reading material for this.
>>
>> Thanks
>>
>> --
>> Sent from my Mobile device
>>
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>>
>


Re: Akka Stream as the source for Spark Streaming. Please advice...

2016-11-12 Thread shyla deshpande
Thanks everyone. Very good discussion.

Thanks Jacek, for the code snippet. I downloaded your Mastering Apache
Spark pdf . I love it.

I have one more question,


On Sat, Nov 12, 2016 at 2:21 PM, Sean McKibben <grap...@graphex.com> wrote:

> I think one of the advantages of using akka-streams within Spark is the
> fact that it is a general purpose stream processing toolset with
> backpressure, not necessarily specific to kafka. If things work out with
> the approach, Spark could be a great benefit to use as a coordination
> framework for discrete streams processed on each executor. I've been toying
> with the idea of making essentially an RDD of task messages, where each
> task becomes an akka stream which are materialized on multiple executors
> and completed as that executor's 'task', allowing Spark to coordinate the
> completion of the entire job. For example, I might make an RDD which is
> just a set of URLs that I want to download and produce to Kafka, but let's
> say I have so many URLs that i need to coordinate that work across many
> servers. Using Spark with a forEachPartition block, I might set up an
> akka-stream to accomplish that task in a backpressured, stream-oriented
> way, so that I could have the entire Spark job complete when all of the
> URLs had been produced to Kafka, using individual Akka Streams within each
> executor.
>
> I realize that this is not the original question on this thread, and I
> don't meant to hijack that. I am also interested in the potential of Akka
> Stream sources for a Spark Streaming job directly, which could potentially
> be adapted for both Kafka and non-kafka use cases, with the emphasis for me
> being on use cases which aren't necessarily Kafka specific. There are some
> portions which feel like a bit of a mismatch, but with Structured Streams,
> I think there is greater opportunity for some kind of symbiotic adapter
> layer on the input side of things. I think the Apache Gearpump
> <https://gearpump.apache.org/overview.html> project in incubation may
> demonstrate how this adaptation can be approached, and the nascent Alpakka
> project <https://github.com/akka/alpakka> is an example of the generic
> applications of Akka Streams.
>
> It is important to note that Akka Streams are billed as a toolbox and not
> a framework, because they don't handle coordination of parallelism or
> multi-host concurrency. I think Spark could end up being a very convenient
> framework to handle this aspect of of a distributed application's
> architecture. It may be able to do some of this without any modification to
> either of these projects, but I haven't had the experience of actually
> attempting the implementation yet.
>
>
> On Nov 12, 2016, at 9:42 AM, Jacek Laskowski <ja...@japila.pl> wrote:
>
> Hi Luciano,
>
> Mind sharing why to have a structured streaming source/sink for Akka
> if Kafka's available and Akka Streams has a Kafka module? #curious
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>
>
> On Sat, Nov 12, 2016 at 4:07 PM, Luciano Resende <luckbr1...@gmail.com>
> wrote:
>
> If you are interested in Akka streaming, it is being maintained in Apache
> Bahir. For Akka there isn't a structured streaming version yet, but we
> would
> be interested in collaborating in the structured streaming version for
> sure.
>
> On Thu, Nov 10, 2016 at 8:46 AM shyla deshpande <deshpandesh...@gmail.com>
> wrote:
>
>
> I am using Spark 2.0.1. I wanted to build a data pipeline using Kafka,
> Spark Streaming and Cassandra using Structured Streaming. But the kafka
> source support for Structured Streaming is not yet available. So now I am
> trying to use Akka Stream as the source to Spark Streaming.
>
> Want to make sure I am heading in the right direction. Please direct me to
> any sample code and reading material for this.
>
> Thanks
>
> --
> Sent from my Mobile device
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>
>


Anyone using ProtoBuf for Kafka messages with Spark Streaming for processing?

2016-11-10 Thread shyla deshpande
Using ProtoBuf for Kafka messages with Spark Streaming because ProtoBuf  is
already being used in the system.

Some sample code and reading material for using ProtoBuf for Kafka messages
with Spark Streaming will be helpful.

Thanks


Akka Stream as the source for Spark Streaming. Please advice...

2016-11-09 Thread shyla deshpande
I am using Spark 2.0.1. I wanted to build a data pipeline using Kafka,
Spark Streaming and Cassandra using Structured Streaming. But the kafka
source support for Structured Streaming is not yet available. So now I am
trying to use Akka Stream as the source to Spark Streaming.

Want to make sure I am heading in the right direction. Please direct me to
any sample code and reading material for this.

Thanks


Re: Structured Streaming with Cassandra, Is it supported??

2016-11-07 Thread shyla deshpande
I am using spark-cassandra-connector_2.11.

On Mon, Nov 7, 2016 at 3:33 PM, shyla deshpande <deshpandesh...@gmail.com>
wrote:

> Hi ,
>
> I am trying to do structured streaming with the wonderful SparkSession,
> but cannot save the streaming data to Cassandra.
>
> If anyone has got this working, please help
>
> Thanks
>
>


Structured Streaming with Cassandra, Is it supported??

2016-11-07 Thread shyla deshpande
Hi ,

I am trying to do structured streaming with the wonderful SparkSession, but
cannot save the streaming data to Cassandra.

If anyone has got this working, please help

Thanks


Re: Structured Streaming with Kafka Source, does it work??

2016-11-06 Thread shyla deshpande
Hi Jaya!

Thanks for the reply. Structured streaming works fine for me with socket
text stream . I think structured streaming with kafka source not yet
supported.

Please if anyone has got it working with kafka source, please provide me
some sample code or direction.

Thanks


On Sun, Nov 6, 2016 at 5:17 PM, Jayaradha Natarajan 
wrote:

> Shyla!
>
> Check
> https://databricks.com/blog/2016/07/28/structured-
> streaming-in-apache-spark.html
>
> Thanks,
> Jayaradha
>
> On Sun, Nov 6, 2016 at 5:13 PM, shyla  wrote:
>
>> I am trying to do Structured Streaming with Kafka Source. Please let me
>> know
>> where I can find some sample code for this. Thanks
>>
>>
>>
>> --
>> View this message in context: http://apache-spark-developers
>> -list.1001551.n3.nabble.com/Structured-Streaming-with-
>> Kafka-Source-does-it-work-tp19748.html
>> Sent from the Apache Spark Developers List mailing list archive at
>> Nabble.com.
>>
>> -
>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>
>>
>


Structured Streaming with Kafka source,, does it work??????

2016-11-06 Thread shyla deshpande
I am trying to do Structured Streaming with Kafka Source. Please let me
know where I can find some sample code for this. Thanks


Re: java.lang.ClassNotFoundException: org.apache.spark.sql.SparkSession$ . Please Help!!!!!!!

2016-11-04 Thread shyla deshpande
I feel so good that Holden replied.

Yes, that was the problem. I was running from Intellij, I removed the
provided scope and works great.

Thanks a lot.

On Fri, Nov 4, 2016 at 2:05 PM, Holden Karau <hol...@pigscanfly.ca> wrote:

> It seems like you've marked the spark jars as provided, in this case they
> would only be provided you run your application with spark-submit or
> otherwise have Spark's JARs on your class path. How are you launching your
> application?
>
> On Fri, Nov 4, 2016 at 2:00 PM, shyla deshpande <deshpandesh...@gmail.com>
> wrote:
>
>> object App {
>>
>>
>>  import org.apache.spark.sql.functions._
>> import org.apache.spark.sql.SparkSession
>>
>>   def main(args : Array[String]) {
>> println( "Hello World!" )
>>   val sparkSession = SparkSession.builder.
>>   master("local")
>>   .appName("spark session example")
>>   .getOrCreate()
>>   }
>>
>> }
>>
>>
>> 
>>   1.8
>>   1.8
>>   UTF-8
>>   2.11.8
>>   2.11
>> 
>>
>> 
>>   
>> org.scala-lang
>> scala-library
>> ${scala.version}
>>   
>>
>>   
>>   org.apache.spark
>>   spark-core_2.11
>>   2.0.1
>>   provided
>>   
>>   
>>   org.apache.spark
>>   spark-sql_2.11
>>   2.0.1
>>   provided
>>   
>>
>>   
>> org.specs2
>> specs2-core_${scala.compat.version}
>> 2.4.16
>> test
>>   
>> 
>>
>> 
>>   src/main/scala
>> 
>>
>>
>
>
> --
> Cell : 425-233-8271
> Twitter: https://twitter.com/holdenkarau
>


java.lang.ClassNotFoundException: org.apache.spark.sql.SparkSession$ . Please Help!!!!!!!

2016-11-04 Thread shyla deshpande
object App {


 import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession

  def main(args : Array[String]) {
println( "Hello World!" )
  val sparkSession = SparkSession.builder.
  master("local")
  .appName("spark session example")
  .getOrCreate()
  }

}



  1.8
  1.8
  UTF-8
  2.11.8
  2.11



  
org.scala-lang
scala-library
${scala.version}
  

  
  org.apache.spark
  spark-core_2.11
  2.0.1
  provided
  
  
  org.apache.spark
  spark-sql_2.11
  2.0.1
  provided
  

  
org.specs2
specs2-core_${scala.compat.version}
2.4.16
test
  



  src/main/scala



Re: Error creating SparkSession, in IntelliJ

2016-11-04 Thread shyla deshpande
I have built many projects using IntellJ, maven using prior version of
Spark. If anyone has a successful project with Kafka, Spark 2.0.1,
Cassandra, Please share the pom.xml file.
Thanks
-S

On Thu, Nov 3, 2016 at 10:03 PM, Hyukjin Kwon <gurwls...@gmail.com> wrote:

> Hi Shyla,
>
> there is the documentation for setting up IDE - https://cwiki.apache.org/
> confluence/display/SPARK/Useful+Developer+Tools#
> UsefulDeveloperTools-IDESetup
>
> I hope this is helpful.
>
>
> 2016-11-04 9:10 GMT+09:00 shyla deshpande <deshpandesh...@gmail.com>:
>
>> Hello Everyone,
>>
>> I just installed Spark 2.0.1, spark shell works fine.
>>
>> Was able to run some simple programs from the Spark Shell, but find it
>> hard to make the same program work when using IntelliJ.
>>  I am getting the following error.
>>
>> Exception in thread "main" java.lang.NoSuchMethodError:
>> scala.Predef$.$scope()Lscala/xml/TopScope$;
>> at org.apache.spark.ui.jobs.StagePage.(StagePage.scala:44)
>> at org.apache.spark.ui.jobs.StagesTab.(StagesTab.scala:34)
>> at org.apache.spark.ui.SparkUI.(SparkUI.scala:62)
>> at org.apache.spark.ui.SparkUI$.create(SparkUI.scala:215)
>> at org.apache.spark.ui.SparkUI$.createLiveUI(SparkUI.scala:157)
>> at org.apache.spark.SparkContext.(SparkContext.scala:440)
>> at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2275)
>> at org.apache.spark.sql.SparkSession$Builder$$anonfun$8.apply(
>> SparkSession.scala:831)
>> at org.apache.spark.sql.SparkSession$Builder$$anonfun$8.apply(
>> SparkSession.scala:823)
>> at scala.Option.getOrElse(Option.scala:121)
>> at org.apache.spark.sql.SparkSession$Builder.getOrCreate(
>> SparkSession.scala:823)
>> at SparkSessionTest.DataSetWordCount$.main(DataSetWordCount.scala:15)
>> at SparkSessionTest.DataSetWordCount.main(DataSetWordCount.scala)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>> ssorImpl.java:62)
>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>> thodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
>>
>> Thanks
>> -Shyla
>>
>>
>>
>


Error creating SparkSession, in IntelliJ

2016-11-03 Thread shyla deshpande
Hello Everyone,

I just installed Spark 2.0.1, spark shell works fine.

Was able to run some simple programs from the Spark Shell, but find it hard
to make the same program work when using IntelliJ.
 I am getting the following error.

Exception in thread "main" java.lang.NoSuchMethodError:
scala.Predef$.$scope()Lscala/xml/TopScope$;
at org.apache.spark.ui.jobs.StagePage.(StagePage.scala:44)
at org.apache.spark.ui.jobs.StagesTab.(StagesTab.scala:34)
at org.apache.spark.ui.SparkUI.(SparkUI.scala:62)
at org.apache.spark.ui.SparkUI$.create(SparkUI.scala:215)
at org.apache.spark.ui.SparkUI$.createLiveUI(SparkUI.scala:157)
at org.apache.spark.SparkContext.(SparkContext.scala:440)
at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2275)
at
org.apache.spark.sql.SparkSession$Builder$$anonfun$8.apply(SparkSession.scala:831)
at
org.apache.spark.sql.SparkSession$Builder$$anonfun$8.apply(SparkSession.scala:823)
at scala.Option.getOrElse(Option.scala:121)
at
org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:823)
at SparkSessionTest.DataSetWordCount$.main(DataSetWordCount.scala:15)
at SparkSessionTest.DataSetWordCount.main(DataSetWordCount.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)

Thanks
-Shyla


Re: Does Data pipeline using kafka and structured streaming work?

2016-11-01 Thread shyla deshpande
Hi Michael,

Thanks for the reply.

The following link says there is a open unresolved Jira for Structured
streaming support for consuming from Kafka.

https://issues.apache.org/jira/browse/SPARK-15406

Appreciate your help.

-Shyla


On Tue, Nov 1, 2016 at 5:19 PM, Michael Armbrust <mich...@databricks.com>
wrote:

> I'm not aware of any open issues against the kafka source for structured
> streaming.
>
> On Tue, Nov 1, 2016 at 4:45 PM, shyla deshpande <deshpandesh...@gmail.com>
> wrote:
>
>> I am building a data pipeline using Kafka, Spark streaming and Cassandra.
>> Wondering if the issues with  Kafka source fixed in Spark 2.0.1. If not,
>> please give me an update on when it may be fixed.
>>
>> Thanks
>> -Shyla
>>
>
>


Does Data pipeline using kafka and structured streaming work?

2016-11-01 Thread shyla deshpande
I am building a data pipeline using Kafka, Spark streaming and Cassandra.
Wondering if the issues with  Kafka source fixed in Spark 2.0.1. If not,
please give me an update on when it may be fixed.

Thanks
-Shyla


<    1   2