Re: multiple splits fails

2016-04-05 Thread Mich Talebzadeh
;>>>>>>>>> This works
>>>>>>>>>>>>
>>>>>>>>>>>> scala> val messages = KafkaUtils.createDirectStream[String,
>>>>>>>>>>>> String, StringDecoder, StringDecoder](ssc, kafkaParams, topic)
>>>>>>>>>>>> messages:
>>>>>>>>>>>> org.apache.spark.streaming.dstream.InputDStream[(String, String)] =
>>>>>>>>>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream@3bfc0063
>>>>>>>>>>>>
>>>>>>>>>>>> scala> // Get the lines
>>>>>>>>>>>> scala> val lines = messages.map(_._2)
>>>>>>>>>>>> lines: org.apache.spark.streaming.dstream.DStream[String] =
>>>>>>>>>>>> org.apache.spark.streaming.dstream.MappedDStream@1e4afd64
>>>>>>>>>>>>
>>>>>>>>>>>> scala> val v = lines.filter(_.contains("ASE 15")).filter(_
>>>>>>>>>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>>>>>>>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + _)
>>>>>>>>>>>> v: org.apache.spark.streaming.dstream.DStream[(String, Int)] =
>>>>>>>>>>>> org.apache.spark.streaming.dstream.ShuffledDStream@5aa09d
>>>>>>>>>>>>
>>>>>>>>>>>> However, this fails
>>>>>>>>>>>>
>>>>>>>>>>>> scala> val v = lines.filter(_.contains("ASE 15")).filter(_
>>>>>>>>>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>>>>>>>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
>>>>>>>>>>>> _).collect.foreach(println)
>>>>>>>>>>>> :43: error: value collect is not a member of
>>>>>>>>>>>> org.apache.spark.streaming.dstream.DStream[(String, Int)]
>>>>>>>>>>>>  val v = lines.filter(_.contains("ASE 15")).filter(_
>>>>>>>>>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>>>>>>>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
>>>>>>>>>>>> _).collect.foreach(println)
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Dr Mich Talebzadeh
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> LinkedIn * 
>>>>>>>>>>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On 3 April 2016 at 16:01, Ted Yu  wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> bq. is not a member of (String, String)
>>>>>>>>>>>>>
>>>>>>>>>>>>> As shown above, contains shouldn't be applied directly on a
>>>>>>>>>>>>> tuple.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Choose the element of the tuple and then apply contains on it.
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Sun, Apr 3, 2016 at 7:54 AM, Mich Talebzadeh <
>>>>>>>>>>>>> mich.talebza...@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thank you gents.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> That should "\n&qu

Re: multiple splits fails

2016-04-05 Thread Mich Talebzadeh
rd => (word, 1)).reduceByKey(_ +
>>>>>>>>>>> _).collect.foreach(println)
>>>>>>>>>>> :43: error: value collect is not a member of
>>>>>>>>>>> org.apache.spark.streaming.dstream.DStream[(String, Int)]
>>>>>>>>>>>  val v = lines.filter(_.contains("ASE 15")).filter(_
>>>>>>>>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>>>>>>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
>>>>>>>>>>> _).collect.foreach(println)
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Dr Mich Talebzadeh
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> LinkedIn * 
>>>>>>>>>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On 3 April 2016 at 16:01, Ted Yu  wrote:
>>>>>>>>>>>
>>>>>>>>>>>> bq. is not a member of (String, String)
>>>>>>>>>>>>
>>>>>>>>>>>> As shown above, contains shouldn't be applied directly on a
>>>>>>>>>>>> tuple.
>>>>>>>>>>>>
>>>>>>>>>>>> Choose the element of the tuple and then apply contains on it.
>>>>>>>>>>>>
>>>>>>>>>>>> On Sun, Apr 3, 2016 at 7:54 AM, Mich Talebzadeh <
>>>>>>>>>>>> mich.talebza...@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Thank you gents.
>>>>>>>>>>>>>
>>>>>>>>>>>>> That should "\n" as carriage return
>>>>>>>>>>>>>
>>>>>>>>>>>>> OK I am using spark streaming to analyse the message
>>>>>>>>>>>>>
>>>>>>>>>>>>> It does the streaming
>>>>>>>>>>>>>
>>>>>>>>>>>>> import _root_.kafka.serializer.StringDecoder
>>>>>>>>>>>>> import org.apache.spark.SparkConf
>>>>>>>>>>>>> import org.apache.spark.streaming._
>>>>>>>>>>>>> import org.apache.spark.streaming.kafka.KafkaUtils
>>>>>>>>>>>>> //
>>>>>>>>>>>>> scala> val sparkConf = new SparkConf().
>>>>>>>>>>>>>  |  setAppName("StreamTest").
>>>>>>>>>>>>>  |  setMaster("local[12]").
>>>>>>>>>>>>>  |  set("spark.driver.allowMultipleContexts",
>>>>>>>>>>>>> "true").
>>>>>>>>>>>>>  |  set("spark.hadoop.validateOutputSpecs",
>>>>>>>>>>>>> "false")
>>>>>>>>>>>>> scala> val ssc = new StreamingContext(sparkConf, Seconds(55))
>>>>>>>>>>>>> scala>
>>>>>>>>>>>>> scala> val kafkaParams = Map[String,
>>>>>>>>>>>>> String]("bootstrap.servers" -> "rhes564:9092", 
>>>>>>>>>>>>> "schema.registry.url" -> "
>>>>>>>>>>>>> http://rhes564:8081";, "zookeeper.connect" -> "rhes564:2181", "
>>>>>>>>>>>>> group.id" -> "

Re: multiple splits fails

2016-04-05 Thread Sachin Aggarwal
;>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Dr Mich Talebzadeh
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> LinkedIn * 
>>>>>>>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On 3 April 2016 at 17:06, Mich Talebzadeh <
>>>>>>>>> mich.talebza...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Thanks Ted
>>>>>>>>>>
>>>>>>>>>> This works
>>>>>>>>>>
>>>>>>>>>> scala> val messages = KafkaUtils.createDirectStream[String,
>>>>>>>>>> String, StringDecoder, StringDecoder](ssc, kafkaParams, topic)
>>>>>>>>>> messages:
>>>>>>>>>> org.apache.spark.streaming.dstream.InputDStream[(String, String)] =
>>>>>>>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream@3bfc0063
>>>>>>>>>>
>>>>>>>>>> scala> // Get the lines
>>>>>>>>>> scala> val lines = messages.map(_._2)
>>>>>>>>>> lines: org.apache.spark.streaming.dstream.DStream[String] =
>>>>>>>>>> org.apache.spark.streaming.dstream.MappedDStream@1e4afd64
>>>>>>>>>>
>>>>>>>>>> scala> val v = lines.filter(_.contains("ASE 15")).filter(_
>>>>>>>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>>>>>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + _)
>>>>>>>>>> v: org.apache.spark.streaming.dstream.DStream[(String, Int)] =
>>>>>>>>>> org.apache.spark.streaming.dstream.ShuffledDStream@5aa09d
>>>>>>>>>>
>>>>>>>>>> However, this fails
>>>>>>>>>>
>>>>>>>>>> scala> val v = lines.filter(_.contains("ASE 15")).filter(_
>>>>>>>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>>>>>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
>>>>>>>>>> _).collect.foreach(println)
>>>>>>>>>> :43: error: value collect is not a member of
>>>>>>>>>> org.apache.spark.streaming.dstream.DStream[(String, Int)]
>>>>>>>>>>  val v = lines.filter(_.contains("ASE 15")).filter(_
>>>>>>>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>>>>>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
>>>>>>>>>> _).collect.foreach(println)
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Dr Mich Talebzadeh
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> LinkedIn * 
>>>>>>>>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On 3 April 2016 at 16:01, Ted Yu  wrote:
>>>>>>>>>>
>>>>>>>>>>> bq. is not a member of (String, String)
>>>>>>>>>>>
>>>>>>>>>&g

Re: multiple splits fails

2016-04-05 Thread Mich Talebzadeh
;>>>>
>>>>>>>>> scala> val messages = KafkaUtils.createDirectStream[String,
>>>>>>>>> String, StringDecoder, StringDecoder](ssc, kafkaParams, topic)
>>>>>>>>> messages: org.apache.spark.streaming.dstream.InputDStream[(String,
>>>>>>>>> String)] = 
>>>>>>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream@3bfc0063
>>>>>>>>>
>>>>>>>>> scala> // Get the lines
>>>>>>>>> scala> val lines = messages.map(_._2)
>>>>>>>>> lines: org.apache.spark.streaming.dstream.DStream[String] =
>>>>>>>>> org.apache.spark.streaming.dstream.MappedDStream@1e4afd64
>>>>>>>>>
>>>>>>>>> scala> val v = lines.filter(_.contains("ASE 15")).filter(_
>>>>>>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>>>>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + _)
>>>>>>>>> v: org.apache.spark.streaming.dstream.DStream[(String, Int)] =
>>>>>>>>> org.apache.spark.streaming.dstream.ShuffledDStream@5aa09d
>>>>>>>>>
>>>>>>>>> However, this fails
>>>>>>>>>
>>>>>>>>> scala> val v = lines.filter(_.contains("ASE 15")).filter(_
>>>>>>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>>>>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
>>>>>>>>> _).collect.foreach(println)
>>>>>>>>> :43: error: value collect is not a member of
>>>>>>>>> org.apache.spark.streaming.dstream.DStream[(String, Int)]
>>>>>>>>>  val v = lines.filter(_.contains("ASE 15")).filter(_
>>>>>>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>>>>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
>>>>>>>>> _).collect.foreach(println)
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Dr Mich Talebzadeh
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> LinkedIn * 
>>>>>>>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On 3 April 2016 at 16:01, Ted Yu  wrote:
>>>>>>>>>
>>>>>>>>>> bq. is not a member of (String, String)
>>>>>>>>>>
>>>>>>>>>> As shown above, contains shouldn't be applied directly on a tuple.
>>>>>>>>>>
>>>>>>>>>> Choose the element of the tuple and then apply contains on it.
>>>>>>>>>>
>>>>>>>>>> On Sun, Apr 3, 2016 at 7:54 AM, Mich Talebzadeh <
>>>>>>>>>> mich.talebza...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Thank you gents.
>>>>>>>>>>>
>>>>>>>>>>> That should "\n" as carriage return
>>>>>>>>>>>
>>>>>>>>>>> OK I am using spark streaming to analyse the message
>>>>>>>>>>>
>>>>>>>>>>> It does the streaming
>>>>>>>>>>>
>>>>>>>>>>> import _root_.kafka.serializer.StringDecoder
>>>>>>>>>>> import org.apache.spark.SparkConf
>>>>>>>>>>> import org.apache.spark.streaming._
>>>>>>>>>>> import org.apache.spark.streaming.kafka.KafkaUtils
>>>>>>>>>>> //
>>>>>>>&g

Re: multiple splits fails

2016-04-05 Thread Sachin Aggarwal
t;>>>>>>
>>>>>>>> scala> val v = lines.filter(_.contains("ASE 15")).filter(_
>>>>>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>>>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
>>>>>>>> _).collect.foreach(println)
>>>>>>>> :43: error: value collect is not a member of
>>>>>>>> org.apache.spark.streaming.dstream.DStream[(String, Int)]
>>>>>>>>  val v = lines.filter(_.contains("ASE 15")).filter(_
>>>>>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>>>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
>>>>>>>> _).collect.foreach(println)
>>>>>>>>
>>>>>>>>
>>>>>>>> Dr Mich Talebzadeh
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> LinkedIn * 
>>>>>>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On 3 April 2016 at 16:01, Ted Yu  wrote:
>>>>>>>>
>>>>>>>>> bq. is not a member of (String, String)
>>>>>>>>>
>>>>>>>>> As shown above, contains shouldn't be applied directly on a tuple.
>>>>>>>>>
>>>>>>>>> Choose the element of the tuple and then apply contains on it.
>>>>>>>>>
>>>>>>>>> On Sun, Apr 3, 2016 at 7:54 AM, Mich Talebzadeh <
>>>>>>>>> mich.talebza...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Thank you gents.
>>>>>>>>>>
>>>>>>>>>> That should "\n" as carriage return
>>>>>>>>>>
>>>>>>>>>> OK I am using spark streaming to analyse the message
>>>>>>>>>>
>>>>>>>>>> It does the streaming
>>>>>>>>>>
>>>>>>>>>> import _root_.kafka.serializer.StringDecoder
>>>>>>>>>> import org.apache.spark.SparkConf
>>>>>>>>>> import org.apache.spark.streaming._
>>>>>>>>>> import org.apache.spark.streaming.kafka.KafkaUtils
>>>>>>>>>> //
>>>>>>>>>> scala> val sparkConf = new SparkConf().
>>>>>>>>>>  |  setAppName("StreamTest").
>>>>>>>>>>  |  setMaster("local[12]").
>>>>>>>>>>  |  set("spark.driver.allowMultipleContexts",
>>>>>>>>>> "true").
>>>>>>>>>>  |  set("spark.hadoop.validateOutputSpecs",
>>>>>>>>>> "false")
>>>>>>>>>> scala> val ssc = new StreamingContext(sparkConf, Seconds(55))
>>>>>>>>>> scala>
>>>>>>>>>> scala> val kafkaParams = Map[String, String]("bootstrap.servers"
>>>>>>>>>> -> "rhes564:9092", "schema.registry.url" -> "http://rhes564:8081";,
>>>>>>>>>> "zookeeper.connect" -> "rhes564:2181", "group.id" ->
>>>>>>>>>> "StreamTest" )
>>>>>>>>>> kafkaParams: scala.collection.immutable.Map[String,String] =
>>>>>>>>>> Map(bootstrap.servers -> rhes564:9092, schema.registry.url ->
>>>>>>>>>> http://rhes564:8081, zookeeper.connect -> rhes564:2181, group.id
>>>>>>>>>> -> StreamTest)
>>>>>>>>>> scala> val topic = Set("newtopic")
>>>>>>>>>> t

Re: multiple splits fails

2016-04-05 Thread Mich Talebzadeh
w?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On 3 April 2016 at 16:01, Ted Yu  wrote:
>>>>>>>
>>>>>>>> bq. is not a member of (String, String)
>>>>>>>>
>>>>>>>> As shown above, contains shouldn't be applied directly on a tuple.
>>>>>>>>
>>>>>>>> Choose the element of the tuple and then apply contains on it.
>>>>>>>>
>>>>>>>> On Sun, Apr 3, 2016 at 7:54 AM, Mich Talebzadeh <
>>>>>>>> mich.talebza...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Thank you gents.
>>>>>>>>>
>>>>>>>>> That should "\n" as carriage return
>>>>>>>>>
>>>>>>>>> OK I am using spark streaming to analyse the message
>>>>>>>>>
>>>>>>>>> It does the streaming
>>>>>>>>>
>>>>>>>>> import _root_.kafka.serializer.StringDecoder
>>>>>>>>> import org.apache.spark.SparkConf
>>>>>>>>> import org.apache.spark.streaming._
>>>>>>>>> import org.apache.spark.streaming.kafka.KafkaUtils
>>>>>>>>> //
>>>>>>>>> scala> val sparkConf = new SparkConf().
>>>>>>>>>  |  setAppName("StreamTest").
>>>>>>>>>  |  setMaster("local[12]").
>>>>>>>>>  |  set("spark.driver.allowMultipleContexts",
>>>>>>>>> "true").
>>>>>>>>>  |  set("spark.hadoop.validateOutputSpecs",
>>>>>>>>> "false")
>>>>>>>>> scala> val ssc = new StreamingContext(sparkConf, Seconds(55))
>>>>>>>>> scala>
>>>>>>>>> scala> val kafkaParams = Map[String, String]("bootstrap.servers"
>>>>>>>>> -> "rhes564:9092", "schema.registry.url" -> "http://rhes564:8081";,
>>>>>>>>> "zookeeper.connect" -> "rhes564:2181", "group.id" -> "StreamTest"
>>>>>>>>> )
>>>>>>>>> kafkaParams: scala.collection.immutable.Map[String,String] =
>>>>>>>>> Map(bootstrap.servers -> rhes564:9092, schema.registry.url ->
>>>>>>>>> http://rhes564:8081, zookeeper.connect -> rhes564:2181, group.id
>>>>>>>>> -> StreamTest)
>>>>>>>>> scala> val topic = Set("newtopic")
>>>>>>>>> topic: scala.collection.immutable.Set[String] = Set(newtopic)
>>>>>>>>> scala> val messages = KafkaUtils.createDirectStream[String,
>>>>>>>>> String, StringDecoder, StringDecoder](ssc, kafkaParams, topic)
>>>>>>>>> messages: org.apache.spark.streaming.dstream.InputDStream[(String,
>>>>>>>>> String)] = 
>>>>>>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream@5d8ccb6c
>>>>>>>>>
>>>>>>>>> This part is tricky
>>>>>>>>>
>>>>>>>>> scala> val showlines = messages.filter(_ contains("ASE
>>>>>>>>> 15")).filter(_ contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>>>>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
>>>>>>>>> _).collect.foreach(println)
>>>>>>>>> :47: error: value contains is not a member of (String,
>>>>>>>>> String)
>>>>>>>>>  val showlines = messages.filter(_ contains("ASE
>>>>>>>>> 15")).filter(_ contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>>>>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
>>>>>>>>> _).collect.foreach(println)
>>>>>

Re: multiple splits fails

2016-04-04 Thread Sachin Aggarwal
_
>>>>>>>> import org.apache.spark.streaming.kafka.KafkaUtils
>>>>>>>> //
>>>>>>>> scala> val sparkConf = new SparkConf().
>>>>>>>>  |  setAppName("StreamTest").
>>>>>>>>  |  setMaster("local[12]").
>>>>>>>>  |  set("spark.driver.allowMultipleContexts",
>>>>>>>> "true").
>>>>>>>>  |  set("spark.hadoop.validateOutputSpecs", "false")
>>>>>>>> scala> val ssc = new StreamingContext(sparkConf, Seconds(55))
>>>>>>>> scala>
>>>>>>>> scala> val kafkaParams = Map[String, String]("bootstrap.servers" ->
>>>>>>>> "rhes564:9092", "schema.registry.url" -> "http://rhes564:8081";,
>>>>>>>> "zookeeper.connect" -> "rhes564:2181", "group.id" -> "StreamTest" )
>>>>>>>> kafkaParams: scala.collection.immutable.Map[String,String] =
>>>>>>>> Map(bootstrap.servers -> rhes564:9092, schema.registry.url ->
>>>>>>>> http://rhes564:8081, zookeeper.connect -> rhes564:2181, group.id
>>>>>>>> -> StreamTest)
>>>>>>>> scala> val topic = Set("newtopic")
>>>>>>>> topic: scala.collection.immutable.Set[String] = Set(newtopic)
>>>>>>>> scala> val messages = KafkaUtils.createDirectStream[String, String,
>>>>>>>> StringDecoder, StringDecoder](ssc, kafkaParams, topic)
>>>>>>>> messages: org.apache.spark.streaming.dstream.InputDStream[(String,
>>>>>>>> String)] = 
>>>>>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream@5d8ccb6c
>>>>>>>>
>>>>>>>> This part is tricky
>>>>>>>>
>>>>>>>> scala> val showlines = messages.filter(_ contains("ASE
>>>>>>>> 15")).filter(_ contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>>>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
>>>>>>>> _).collect.foreach(println)
>>>>>>>> :47: error: value contains is not a member of (String,
>>>>>>>> String)
>>>>>>>>  val showlines = messages.filter(_ contains("ASE
>>>>>>>> 15")).filter(_ contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>>>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
>>>>>>>> _).collect.foreach(println)
>>>>>>>>
>>>>>>>>
>>>>>>>> How does one refer to the content of the stream here?
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> //
>>>>>>>> // Now want to do some analysis on the same text file
>>>>>>>> //
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Dr Mich Talebzadeh
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> LinkedIn * 
>>>>>>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On 3 April 2016 at 15:32, Ted Yu  wrote:
>>>>>>>>
>>>>>>>>> bq. split"\t," splits the filter by carriage return
>>>>>>>>>
>>>>>>>>> Minor correction: "\t" denotes tab character.
>&

Re: multiple splits fails

2016-04-03 Thread Mich Talebzadeh
KafkaUtils.createDirectStream[String, String,
>>>>> StringDecoder, StringDecoder](ssc, kafkaParams, topic)
>>>>> messages: org.apache.spark.streaming.dstream.InputDStream[(String,
>>>>> String)] = 
>>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream@3bfc0063
>>>>>
>>>>> scala> // Get the lines
>>>>> scala> val lines = messages.map(_._2)
>>>>> lines: org.apache.spark.streaming.dstream.DStream[String] =
>>>>> org.apache.spark.streaming.dstream.MappedDStream@1e4afd64
>>>>>
>>>>> scala> val v = lines.filter(_.contains("ASE 15")).filter(_
>>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + _)
>>>>> v: org.apache.spark.streaming.dstream.DStream[(String, Int)] =
>>>>> org.apache.spark.streaming.dstream.ShuffledDStream@5aa09d
>>>>>
>>>>> However, this fails
>>>>>
>>>>> scala> val v = lines.filter(_.contains("ASE 15")).filter(_
>>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
>>>>> _).collect.foreach(println)
>>>>> :43: error: value collect is not a member of
>>>>> org.apache.spark.streaming.dstream.DStream[(String, Int)]
>>>>>  val v = lines.filter(_.contains("ASE 15")).filter(_
>>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
>>>>> _).collect.foreach(println)
>>>>>
>>>>>
>>>>> Dr Mich Talebzadeh
>>>>>
>>>>>
>>>>>
>>>>> LinkedIn * 
>>>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>
>>>>>
>>>>>
>>>>> http://talebzadehmich.wordpress.com
>>>>>
>>>>>
>>>>>
>>>>> On 3 April 2016 at 16:01, Ted Yu  wrote:
>>>>>
>>>>>> bq. is not a member of (String, String)
>>>>>>
>>>>>> As shown above, contains shouldn't be applied directly on a tuple.
>>>>>>
>>>>>> Choose the element of the tuple and then apply contains on it.
>>>>>>
>>>>>> On Sun, Apr 3, 2016 at 7:54 AM, Mich Talebzadeh <
>>>>>> mich.talebza...@gmail.com> wrote:
>>>>>>
>>>>>>> Thank you gents.
>>>>>>>
>>>>>>> That should "\n" as carriage return
>>>>>>>
>>>>>>> OK I am using spark streaming to analyse the message
>>>>>>>
>>>>>>> It does the streaming
>>>>>>>
>>>>>>> import _root_.kafka.serializer.StringDecoder
>>>>>>> import org.apache.spark.SparkConf
>>>>>>> import org.apache.spark.streaming._
>>>>>>> import org.apache.spark.streaming.kafka.KafkaUtils
>>>>>>> //
>>>>>>> scala> val sparkConf = new SparkConf().
>>>>>>>  |  setAppName("StreamTest").
>>>>>>>  |  setMaster("local[12]").
>>>>>>>  |  set("spark.driver.allowMultipleContexts",
>>>>>>> "true").
>>>>>>>  |  set("spark.hadoop.validateOutputSpecs", "false")
>>>>>>> scala> val ssc = new StreamingContext(sparkConf, Seconds(55))
>>>>>>> scala>
>>>>>>> scala> val kafkaParams = Map[String, String]("bootstrap.servers" ->
>>>>>>> "rhes564:9092", "schema.registry.url" -> "http://rhes564:8081";,
>>>>>>> "zookeeper.connect" -> "rhes564:2181", "group.id" -> "StreamTest" )
>>>>>>> kafkaParams: scala.collection.immutable.Map[String,String] =
>>>>>>> Map(bootstrap.servers -> rhes564:9092, schema.registry.url ->
>>>&

Re: multiple splits fails

2016-04-03 Thread Ted Yu
rd => (word, 1)).reduceByKey(_ + _)
>>>> v: org.apache.spark.streaming.dstream.DStream[(String, Int)] =
>>>> org.apache.spark.streaming.dstream.ShuffledDStream@5aa09d
>>>>
>>>> However, this fails
>>>>
>>>> scala> val v = lines.filter(_.contains("ASE 15")).filter(_
>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
>>>> _).collect.foreach(println)
>>>> :43: error: value collect is not a member of
>>>> org.apache.spark.streaming.dstream.DStream[(String, Int)]
>>>>  val v = lines.filter(_.contains("ASE 15")).filter(_
>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
>>>> _).collect.foreach(println)
>>>>
>>>>
>>>> Dr Mich Talebzadeh
>>>>
>>>>
>>>>
>>>> LinkedIn * 
>>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>
>>>>
>>>>
>>>> http://talebzadehmich.wordpress.com
>>>>
>>>>
>>>>
>>>> On 3 April 2016 at 16:01, Ted Yu  wrote:
>>>>
>>>>> bq. is not a member of (String, String)
>>>>>
>>>>> As shown above, contains shouldn't be applied directly on a tuple.
>>>>>
>>>>> Choose the element of the tuple and then apply contains on it.
>>>>>
>>>>> On Sun, Apr 3, 2016 at 7:54 AM, Mich Talebzadeh <
>>>>> mich.talebza...@gmail.com> wrote:
>>>>>
>>>>>> Thank you gents.
>>>>>>
>>>>>> That should "\n" as carriage return
>>>>>>
>>>>>> OK I am using spark streaming to analyse the message
>>>>>>
>>>>>> It does the streaming
>>>>>>
>>>>>> import _root_.kafka.serializer.StringDecoder
>>>>>> import org.apache.spark.SparkConf
>>>>>> import org.apache.spark.streaming._
>>>>>> import org.apache.spark.streaming.kafka.KafkaUtils
>>>>>> //
>>>>>> scala> val sparkConf = new SparkConf().
>>>>>>  |  setAppName("StreamTest").
>>>>>>  |  setMaster("local[12]").
>>>>>>  |  set("spark.driver.allowMultipleContexts", "true").
>>>>>>  |  set("spark.hadoop.validateOutputSpecs", "false")
>>>>>> scala> val ssc = new StreamingContext(sparkConf, Seconds(55))
>>>>>> scala>
>>>>>> scala> val kafkaParams = Map[String, String]("bootstrap.servers" ->
>>>>>> "rhes564:9092", "schema.registry.url" -> "http://rhes564:8081";,
>>>>>> "zookeeper.connect" -> "rhes564:2181", "group.id" -> "StreamTest" )
>>>>>> kafkaParams: scala.collection.immutable.Map[String,String] =
>>>>>> Map(bootstrap.servers -> rhes564:9092, schema.registry.url ->
>>>>>> http://rhes564:8081, zookeeper.connect -> rhes564:2181, group.id ->
>>>>>> StreamTest)
>>>>>> scala> val topic = Set("newtopic")
>>>>>> topic: scala.collection.immutable.Set[String] = Set(newtopic)
>>>>>> scala> val messages = KafkaUtils.createDirectStream[String, String,
>>>>>> StringDecoder, StringDecoder](ssc, kafkaParams, topic)
>>>>>> messages: org.apache.spark.streaming.dstream.InputDStream[(String,
>>>>>> String)] = 
>>>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream@5d8ccb6c
>>>>>>
>>>>>> This part is tricky
>>>>>>
>>>>>> scala> val showlines = messages.filter(_ contains("ASE 15")).filter(_
>>>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
>>>>>> _).collect.foreach(println)
>>>>>> :47: error: value contains is not a member 

Re: multiple splits fails

2016-04-03 Thread Mich Talebzadeh
am.DStream[(String, Int)]
>>>  val v = lines.filter(_.contains("ASE 15")).filter(_
>>> contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
>>> _).collect.foreach(println)
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>>
>>> On 3 April 2016 at 16:01, Ted Yu  wrote:
>>>
>>>> bq. is not a member of (String, String)
>>>>
>>>> As shown above, contains shouldn't be applied directly on a tuple.
>>>>
>>>> Choose the element of the tuple and then apply contains on it.
>>>>
>>>> On Sun, Apr 3, 2016 at 7:54 AM, Mich Talebzadeh <
>>>> mich.talebza...@gmail.com> wrote:
>>>>
>>>>> Thank you gents.
>>>>>
>>>>> That should "\n" as carriage return
>>>>>
>>>>> OK I am using spark streaming to analyse the message
>>>>>
>>>>> It does the streaming
>>>>>
>>>>> import _root_.kafka.serializer.StringDecoder
>>>>> import org.apache.spark.SparkConf
>>>>> import org.apache.spark.streaming._
>>>>> import org.apache.spark.streaming.kafka.KafkaUtils
>>>>> //
>>>>> scala> val sparkConf = new SparkConf().
>>>>>  |  setAppName("StreamTest").
>>>>>  |  setMaster("local[12]").
>>>>>  |  set("spark.driver.allowMultipleContexts", "true").
>>>>>  |  set("spark.hadoop.validateOutputSpecs", "false")
>>>>> scala> val ssc = new StreamingContext(sparkConf, Seconds(55))
>>>>> scala>
>>>>> scala> val kafkaParams = Map[String, String]("bootstrap.servers" ->
>>>>> "rhes564:9092", "schema.registry.url" -> "http://rhes564:8081";,
>>>>> "zookeeper.connect" -> "rhes564:2181", "group.id" -> "StreamTest" )
>>>>> kafkaParams: scala.collection.immutable.Map[String,String] =
>>>>> Map(bootstrap.servers -> rhes564:9092, schema.registry.url ->
>>>>> http://rhes564:8081, zookeeper.connect -> rhes564:2181, group.id ->
>>>>> StreamTest)
>>>>> scala> val topic = Set("newtopic")
>>>>> topic: scala.collection.immutable.Set[String] = Set(newtopic)
>>>>> scala> val messages = KafkaUtils.createDirectStream[String, String,
>>>>> StringDecoder, StringDecoder](ssc, kafkaParams, topic)
>>>>> messages: org.apache.spark.streaming.dstream.InputDStream[(String,
>>>>> String)] = 
>>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream@5d8ccb6c
>>>>>
>>>>> This part is tricky
>>>>>
>>>>> scala> val showlines = messages.filter(_ contains("ASE 15")).filter(_
>>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
>>>>> _).collect.foreach(println)
>>>>> :47: error: value contains is not a member of (String, String)
>>>>>  val showlines = messages.filter(_ contains("ASE
>>>>> 15")).filter(_ contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
>>>>> _).collect.foreach(println)
>>>>>
>>>>>
>>>>> How does one refer to the content of the stream here?
>>>>>
>>>>> Thanks
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> //
>>>>> // Now want to do some analysis on the same text file
>>>>> //
>>>>>
>>>>>
>>>>>
>>>>> Dr Mich Talebzadeh

Re: multiple splits fails

2016-04-03 Thread Ted Yu
ldn't be applied directly on a tuple.
>>>
>>> Choose the element of the tuple and then apply contains on it.
>>>
>>> On Sun, Apr 3, 2016 at 7:54 AM, Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
>>>> Thank you gents.
>>>>
>>>> That should "\n" as carriage return
>>>>
>>>> OK I am using spark streaming to analyse the message
>>>>
>>>> It does the streaming
>>>>
>>>> import _root_.kafka.serializer.StringDecoder
>>>> import org.apache.spark.SparkConf
>>>> import org.apache.spark.streaming._
>>>> import org.apache.spark.streaming.kafka.KafkaUtils
>>>> //
>>>> scala> val sparkConf = new SparkConf().
>>>>  |  setAppName("StreamTest").
>>>>  |  setMaster("local[12]").
>>>>  |  set("spark.driver.allowMultipleContexts", "true").
>>>>  |  set("spark.hadoop.validateOutputSpecs", "false")
>>>> scala> val ssc = new StreamingContext(sparkConf, Seconds(55))
>>>> scala>
>>>> scala> val kafkaParams = Map[String, String]("bootstrap.servers" ->
>>>> "rhes564:9092", "schema.registry.url" -> "http://rhes564:8081";,
>>>> "zookeeper.connect" -> "rhes564:2181", "group.id" -> "StreamTest" )
>>>> kafkaParams: scala.collection.immutable.Map[String,String] =
>>>> Map(bootstrap.servers -> rhes564:9092, schema.registry.url ->
>>>> http://rhes564:8081, zookeeper.connect -> rhes564:2181, group.id ->
>>>> StreamTest)
>>>> scala> val topic = Set("newtopic")
>>>> topic: scala.collection.immutable.Set[String] = Set(newtopic)
>>>> scala> val messages = KafkaUtils.createDirectStream[String, String,
>>>> StringDecoder, StringDecoder](ssc, kafkaParams, topic)
>>>> messages: org.apache.spark.streaming.dstream.InputDStream[(String,
>>>> String)] = 
>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream@5d8ccb6c
>>>>
>>>> This part is tricky
>>>>
>>>> scala> val showlines = messages.filter(_ contains("ASE 15")).filter(_
>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
>>>> _).collect.foreach(println)
>>>> :47: error: value contains is not a member of (String, String)
>>>>  val showlines = messages.filter(_ contains("ASE 15")).filter(_
>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
>>>> _).collect.foreach(println)
>>>>
>>>>
>>>> How does one refer to the content of the stream here?
>>>>
>>>> Thanks
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> //
>>>> // Now want to do some analysis on the same text file
>>>> //
>>>>
>>>>
>>>>
>>>> Dr Mich Talebzadeh
>>>>
>>>>
>>>>
>>>> LinkedIn * 
>>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>
>>>>
>>>>
>>>> http://talebzadehmich.wordpress.com
>>>>
>>>>
>>>>
>>>> On 3 April 2016 at 15:32, Ted Yu  wrote:
>>>>
>>>>> bq. split"\t," splits the filter by carriage return
>>>>>
>>>>> Minor correction: "\t" denotes tab character.
>>>>>
>>>>> On Sun, Apr 3, 2016 at 7:24 AM, Eliran Bivas  wrote:
>>>>>
>>>>>> Hi Mich,
>>>>>>
>>>>>> 1. The first underscore in your filter call is refering to a line in
>>>>>> the file (as textFile() results in a collection of strings)
>>>>>> 2. You're correct. No need for it.
>>>>>> 3. Filter is expecting a Boolean result. So you can merge your
>>>>>> contains fil

Re: multiple splits fails

2016-04-03 Thread Mich Talebzadeh
gt;>>  |  setAppName("StreamTest").
>>>  |  setMaster("local[12]").
>>>  |  set("spark.driver.allowMultipleContexts", "true").
>>>  |  set("spark.hadoop.validateOutputSpecs", "false")
>>> scala> val ssc = new StreamingContext(sparkConf, Seconds(55))
>>> scala>
>>> scala> val kafkaParams = Map[String, String]("bootstrap.servers" ->
>>> "rhes564:9092", "schema.registry.url" -> "http://rhes564:8081";,
>>> "zookeeper.connect" -> "rhes564:2181", "group.id" -> "StreamTest" )
>>> kafkaParams: scala.collection.immutable.Map[String,String] =
>>> Map(bootstrap.servers -> rhes564:9092, schema.registry.url ->
>>> http://rhes564:8081, zookeeper.connect -> rhes564:2181, group.id ->
>>> StreamTest)
>>> scala> val topic = Set("newtopic")
>>> topic: scala.collection.immutable.Set[String] = Set(newtopic)
>>> scala> val messages = KafkaUtils.createDirectStream[String, String,
>>> StringDecoder, StringDecoder](ssc, kafkaParams, topic)
>>> messages: org.apache.spark.streaming.dstream.InputDStream[(String,
>>> String)] = org.apache.spark.streaming.kafka.DirectKafkaInputDStream@5d8ccb6c
>>>
>>> This part is tricky
>>>
>>> scala> val showlines = messages.filter(_ contains("ASE 15")).filter(_
>>> contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
>>> _).collect.foreach(println)
>>> :47: error: value contains is not a member of (String, String)
>>>  val showlines = messages.filter(_ contains("ASE 15")).filter(_
>>> contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
>>> _).collect.foreach(println)
>>>
>>>
>>> How does one refer to the content of the stream here?
>>>
>>> Thanks
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> //
>>> // Now want to do some analysis on the same text file
>>> //
>>>
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>>
>>> On 3 April 2016 at 15:32, Ted Yu  wrote:
>>>
>>>> bq. split"\t," splits the filter by carriage return
>>>>
>>>> Minor correction: "\t" denotes tab character.
>>>>
>>>> On Sun, Apr 3, 2016 at 7:24 AM, Eliran Bivas  wrote:
>>>>
>>>>> Hi Mich,
>>>>>
>>>>> 1. The first underscore in your filter call is refering to a line in
>>>>> the file (as textFile() results in a collection of strings)
>>>>> 2. You're correct. No need for it.
>>>>> 3. Filter is expecting a Boolean result. So you can merge your
>>>>> contains filters to one with AND (&&) statement.
>>>>> 4. Correct. Each character in split() is used as a divider.
>>>>>
>>>>> Eliran Bivas
>>>>>
>>>>> *From:* Mich Talebzadeh 
>>>>> *Sent:* Apr 3, 2016 15:06
>>>>> *To:* Eliran Bivas
>>>>> *Cc:* user @spark
>>>>> *Subject:* Re: multiple splits fails
>>>>>
>>>>> Hi Eliran,
>>>>>
>>>>> Many thanks for your input on this.
>>>>>
>>>>> I thought about what I was trying to achieve so I rewrote the logic as
>>>>> follows:
>>>>>
>>>>>
>>>>>1. Read the text file in
>>>>>2. Filter out empty lines (well not really needed here)
>>>>>3. Search for lines that contain "ASE 15" and further have
>>>>>sentence "UPDATE INDEX STATISTICS" in the said line
>>>>>4. Split the text by "\t" and ","
>>>>>5. Print the outcome
>>>>>
>>>>&g

Re: multiple splits fails

2016-04-03 Thread Mich Talebzadeh
le/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 3 April 2016 at 15:32, Ted Yu  wrote:
>>
>>> bq. split"\t," splits the filter by carriage return
>>>
>>> Minor correction: "\t" denotes tab character.
>>>
>>> On Sun, Apr 3, 2016 at 7:24 AM, Eliran Bivas  wrote:
>>>
>>>> Hi Mich,
>>>>
>>>> 1. The first underscore in your filter call is refering to a line in
>>>> the file (as textFile() results in a collection of strings)
>>>> 2. You're correct. No need for it.
>>>> 3. Filter is expecting a Boolean result. So you can merge your contains
>>>> filters to one with AND (&&) statement.
>>>> 4. Correct. Each character in split() is used as a divider.
>>>>
>>>> Eliran Bivas
>>>>
>>>> *From:* Mich Talebzadeh 
>>>> *Sent:* Apr 3, 2016 15:06
>>>> *To:* Eliran Bivas
>>>> *Cc:* user @spark
>>>> *Subject:* Re: multiple splits fails
>>>>
>>>> Hi Eliran,
>>>>
>>>> Many thanks for your input on this.
>>>>
>>>> I thought about what I was trying to achieve so I rewrote the logic as
>>>> follows:
>>>>
>>>>
>>>>1. Read the text file in
>>>>2. Filter out empty lines (well not really needed here)
>>>>3. Search for lines that contain "ASE 15" and further have sentence
>>>>"UPDATE INDEX STATISTICS" in the said line
>>>>4. Split the text by "\t" and ","
>>>>5. Print the outcome
>>>>
>>>>
>>>> This was what I did with your suggestions included
>>>>
>>>> val f = sc.textFile("/tmp/ASE15UpgradeGuide.txt")
>>>> f.cache()
>>>>  f.filter(_.length > 0).filter(_ contains("ASE 15")).filter(_
>>>> contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>>>> line.split("\t,")).map(word => (word, 1)).reduceByKey(_ +
>>>> _).collect.foreach(println)
>>>>
>>>>
>>>> Couple of questions if I may
>>>>
>>>>
>>>>1. I take that "_" refers to content of the file read in by default?
>>>>2. _.length > 0 basically filters out blank lines (not really
>>>>needed here)
>>>>3. Multiple filters are needed for each *contains* logic
>>>>4. split"\t," splits the filter by carriage return AND ,?
>>>>
>>>>
>>>> Regards
>>>>
>>>>
>>>> Dr Mich Talebzadeh
>>>>
>>>>
>>>>
>>>> LinkedIn * 
>>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>
>>>>
>>>>
>>>> http://talebzadehmich.wordpress.com
>>>>
>>>>
>>>>
>>>> On 3 April 2016 at 12:35, Eliran Bivas  wrote:
>>>>
>>>>> Hi Mich,
>>>>>
>>>>> Few comments:
>>>>>
>>>>> When doing .filter(_ > “”) you’re actually doing a lexicographic
>>>>> comparison and not filtering for empty lines (which could be achieved with
>>>>> _.notEmpty or _.length > 0).
>>>>> I think that filtering with _.contains should be sufficient and the
>>>>> first filter can be omitted.
>>>>>
>>>>> As for line => line.split(“\t”).split(“,”):
>>>>> You have to do a second map or (since split() requires a regex as
>>>>> input) .split(“\t,”).
>>>>> The problem is that your first split() call will generate an Array
>>>>> and then your second call will result in an error.
>>>>> e.g.
>>>>>
>>>>> val lines: Array[String] = line.split(“\t”)
>>>>> lines.split(“,”) // Compilation error - no method split() exists for
>>>>> Array
>>>>>
>>>>> So either go with map(_.split(“\t”)).map(_.split(“,”)) or
>>>>> map(_.split(“\t,”))
>>>>>
>>>>> Hope that helps.
>>>>>
>>>>> *Eliran Bivas*
>>>>> Data Team | iguaz.io
>>>>>
>>>>>
>>>>> On 3 Apr 2016, at 13:31, Mich Talebzadeh 
>>>>> wrote:
>>>>>
>>>>> Hi,
>>>>>
>>>>> I am not sure this is the correct approach
>>>>>
>>>>> Read a text file in
>>>>>
>>>>> val f = sc.textFile("/tmp/ASE15UpgradeGuide.txt")
>>>>>
>>>>>
>>>>> Now I want to get rid of empty lines and filter only the lines that
>>>>> contain "ASE15"
>>>>>
>>>>>  f.filter(_ > "").filter(_ contains("ASE15")).
>>>>>
>>>>> The above works but I am not sure whether I need two filter
>>>>> transformation above? Can it be done in one?
>>>>>
>>>>> Now I want to map the above filter to lines with carriage return ans
>>>>> split them by ","
>>>>>
>>>>> f.filter(_ > "").filter(_ contains("ASE15")).map(line =>
>>>>> (line.split("\t")))
>>>>> res88: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[131]
>>>>> at map at :30
>>>>>
>>>>> Now I want to split the output by ","
>>>>>
>>>>> scala> f.filter(_ > "").filter(_ contains("ASE15")).map(line =>
>>>>> (line.split("\t").split(",")))
>>>>> :30: error: value split is not a member of Array[String]
>>>>>   f.filter(_ > "").filter(_ contains("ASE15")).map(line =>
>>>>> (line.split("\t").split(",")))
>>>>>
>>>>> ^
>>>>> Any advice will be appreciated
>>>>>
>>>>> Thanks
>>>>>
>>>>> Dr Mich Talebzadeh
>>>>>
>>>>>
>>>>> LinkedIn * 
>>>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>
>>>>>
>>>>> http://talebzadehmich.wordpress.com
>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>


Re: multiple splits fails

2016-04-03 Thread Ted Yu
bq. is not a member of (String, String)

As shown above, contains shouldn't be applied directly on a tuple.

Choose the element of the tuple and then apply contains on it.

On Sun, Apr 3, 2016 at 7:54 AM, Mich Talebzadeh 
wrote:

> Thank you gents.
>
> That should "\n" as carriage return
>
> OK I am using spark streaming to analyse the message
>
> It does the streaming
>
> import _root_.kafka.serializer.StringDecoder
> import org.apache.spark.SparkConf
> import org.apache.spark.streaming._
> import org.apache.spark.streaming.kafka.KafkaUtils
> //
> scala> val sparkConf = new SparkConf().
>  |  setAppName("StreamTest").
>  |  setMaster("local[12]").
>  |  set("spark.driver.allowMultipleContexts", "true").
>  |  set("spark.hadoop.validateOutputSpecs", "false")
> scala> val ssc = new StreamingContext(sparkConf, Seconds(55))
> scala>
> scala> val kafkaParams = Map[String, String]("bootstrap.servers" ->
> "rhes564:9092", "schema.registry.url" -> "http://rhes564:8081";,
> "zookeeper.connect" -> "rhes564:2181", "group.id" -> "StreamTest" )
> kafkaParams: scala.collection.immutable.Map[String,String] =
> Map(bootstrap.servers -> rhes564:9092, schema.registry.url ->
> http://rhes564:8081, zookeeper.connect -> rhes564:2181, group.id ->
> StreamTest)
> scala> val topic = Set("newtopic")
> topic: scala.collection.immutable.Set[String] = Set(newtopic)
> scala> val messages = KafkaUtils.createDirectStream[String, String,
> StringDecoder, StringDecoder](ssc, kafkaParams, topic)
> messages: org.apache.spark.streaming.dstream.InputDStream[(String,
> String)] = org.apache.spark.streaming.kafka.DirectKafkaInputDStream@5d8ccb6c
>
> This part is tricky
>
> scala> val showlines = messages.filter(_ contains("ASE 15")).filter(_
> contains("UPDATE INDEX STATISTICS")).flatMap(line =>
> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
> _).collect.foreach(println)
> :47: error: value contains is not a member of (String, String)
>  val showlines = messages.filter(_ contains("ASE 15")).filter(_
> contains("UPDATE INDEX STATISTICS")).flatMap(line =>
> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
> _).collect.foreach(println)
>
>
> How does one refer to the content of the stream here?
>
> Thanks
>
>
>
>
>
>
>
>
>
>
> //
> // Now want to do some analysis on the same text file
> //
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 3 April 2016 at 15:32, Ted Yu  wrote:
>
>> bq. split"\t," splits the filter by carriage return
>>
>> Minor correction: "\t" denotes tab character.
>>
>> On Sun, Apr 3, 2016 at 7:24 AM, Eliran Bivas  wrote:
>>
>>> Hi Mich,
>>>
>>> 1. The first underscore in your filter call is refering to a line in the
>>> file (as textFile() results in a collection of strings)
>>> 2. You're correct. No need for it.
>>> 3. Filter is expecting a Boolean result. So you can merge your contains
>>> filters to one with AND (&&) statement.
>>> 4. Correct. Each character in split() is used as a divider.
>>>
>>> Eliran Bivas
>>>
>>> *From:* Mich Talebzadeh 
>>> *Sent:* Apr 3, 2016 15:06
>>> *To:* Eliran Bivas
>>> *Cc:* user @spark
>>> *Subject:* Re: multiple splits fails
>>>
>>> Hi Eliran,
>>>
>>> Many thanks for your input on this.
>>>
>>> I thought about what I was trying to achieve so I rewrote the logic as
>>> follows:
>>>
>>>
>>>1. Read the text file in
>>>2. Filter out empty lines (well not really needed here)
>>>3. Search for lines that contain "ASE 15" and further have sentence
>>>"UPDATE INDEX STATISTICS" in the said line
>>>4. Split the text by "\t" and ","
>>>5. Print the outcome
>>>
>>>
>>> This was what I did with your suggestions included
>>>
>>> val f = sc.textFile("/tmp/ASE15UpgradeGuide.txt")
>>> f.cache()
>>>

Re: multiple splits fails

2016-04-03 Thread Mich Talebzadeh
Thank you gents.

That should "\n" as carriage return

OK I am using spark streaming to analyse the message

It does the streaming

import _root_.kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka.KafkaUtils
//
scala> val sparkConf = new SparkConf().
 |  setAppName("StreamTest").
 |  setMaster("local[12]").
 |  set("spark.driver.allowMultipleContexts", "true").
 |  set("spark.hadoop.validateOutputSpecs", "false")
scala> val ssc = new StreamingContext(sparkConf, Seconds(55))
scala>
scala> val kafkaParams = Map[String, String]("bootstrap.servers" ->
"rhes564:9092", "schema.registry.url" -> "http://rhes564:8081";,
"zookeeper.connect" -> "rhes564:2181", "group.id" -> "StreamTest" )
kafkaParams: scala.collection.immutable.Map[String,String] =
Map(bootstrap.servers -> rhes564:9092, schema.registry.url ->
http://rhes564:8081, zookeeper.connect -> rhes564:2181, group.id ->
StreamTest)
scala> val topic = Set("newtopic")
topic: scala.collection.immutable.Set[String] = Set(newtopic)
scala> val messages = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](ssc, kafkaParams, topic)
messages: org.apache.spark.streaming.dstream.InputDStream[(String, String)]
= org.apache.spark.streaming.kafka.DirectKafkaInputDStream@5d8ccb6c

This part is tricky

scala> val showlines = messages.filter(_ contains("ASE 15")).filter(_
contains("UPDATE INDEX STATISTICS")).flatMap(line =>
line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
_).collect.foreach(println)
:47: error: value contains is not a member of (String, String)
 val showlines = messages.filter(_ contains("ASE 15")).filter(_
contains("UPDATE INDEX STATISTICS")).flatMap(line =>
line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
_).collect.foreach(println)


How does one refer to the content of the stream here?

Thanks










//
// Now want to do some analysis on the same text file
//



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com



On 3 April 2016 at 15:32, Ted Yu  wrote:

> bq. split"\t," splits the filter by carriage return
>
> Minor correction: "\t" denotes tab character.
>
> On Sun, Apr 3, 2016 at 7:24 AM, Eliran Bivas  wrote:
>
>> Hi Mich,
>>
>> 1. The first underscore in your filter call is refering to a line in the
>> file (as textFile() results in a collection of strings)
>> 2. You're correct. No need for it.
>> 3. Filter is expecting a Boolean result. So you can merge your contains
>> filters to one with AND (&&) statement.
>> 4. Correct. Each character in split() is used as a divider.
>>
>> Eliran Bivas
>>
>> *From:* Mich Talebzadeh 
>> *Sent:* Apr 3, 2016 15:06
>> *To:* Eliran Bivas
>> *Cc:* user @spark
>> *Subject:* Re: multiple splits fails
>>
>> Hi Eliran,
>>
>> Many thanks for your input on this.
>>
>> I thought about what I was trying to achieve so I rewrote the logic as
>> follows:
>>
>>
>>1. Read the text file in
>>2. Filter out empty lines (well not really needed here)
>>3. Search for lines that contain "ASE 15" and further have sentence
>>"UPDATE INDEX STATISTICS" in the said line
>>4. Split the text by "\t" and ","
>>5. Print the outcome
>>
>>
>> This was what I did with your suggestions included
>>
>> val f = sc.textFile("/tmp/ASE15UpgradeGuide.txt")
>> f.cache()
>>  f.filter(_.length > 0).filter(_ contains("ASE 15")).filter(_
>> contains("UPDATE INDEX STATISTICS")).flatMap(line =>
>> line.split("\t,")).map(word => (word, 1)).reduceByKey(_ +
>> _).collect.foreach(println)
>>
>>
>> Couple of questions if I may
>>
>>
>>1. I take that "_" refers to content of the file read in by default?
>>2. _.length > 0 basically filters out blank lines (not really needed
>>here)
>>3. Multiple filters are needed for each *contains* logic
>>4. split"\t," splits the filter by carriage return AND ,?
>>
>>
>> Regards
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>&g

Re: multiple splits fails

2016-04-03 Thread Ted Yu
bq. split"\t," splits the filter by carriage return

Minor correction: "\t" denotes tab character.

On Sun, Apr 3, 2016 at 7:24 AM, Eliran Bivas  wrote:

> Hi Mich,
>
> 1. The first underscore in your filter call is refering to a line in the
> file (as textFile() results in a collection of strings)
> 2. You're correct. No need for it.
> 3. Filter is expecting a Boolean result. So you can merge your contains
> filters to one with AND (&&) statement.
> 4. Correct. Each character in split() is used as a divider.
>
> Eliran Bivas
>
> *From:* Mich Talebzadeh 
> *Sent:* Apr 3, 2016 15:06
> *To:* Eliran Bivas
> *Cc:* user @spark
> *Subject:* Re: multiple splits fails
>
> Hi Eliran,
>
> Many thanks for your input on this.
>
> I thought about what I was trying to achieve so I rewrote the logic as
> follows:
>
>
>1. Read the text file in
>2. Filter out empty lines (well not really needed here)
>3. Search for lines that contain "ASE 15" and further have sentence
>"UPDATE INDEX STATISTICS" in the said line
>4. Split the text by "\t" and ","
>5. Print the outcome
>
>
> This was what I did with your suggestions included
>
> val f = sc.textFile("/tmp/ASE15UpgradeGuide.txt")
> f.cache()
>  f.filter(_.length > 0).filter(_ contains("ASE 15")).filter(_
> contains("UPDATE INDEX STATISTICS")).flatMap(line =>
> line.split("\t,")).map(word => (word, 1)).reduceByKey(_ +
> _).collect.foreach(println)
>
>
> Couple of questions if I may
>
>
>1. I take that "_" refers to content of the file read in by default?
>2. _.length > 0 basically filters out blank lines (not really needed
>here)
>3. Multiple filters are needed for each *contains* logic
>4. split"\t," splits the filter by carriage return AND ,?
>
>
> Regards
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 3 April 2016 at 12:35, Eliran Bivas  wrote:
>
>> Hi Mich,
>>
>> Few comments:
>>
>> When doing .filter(_ > “”) you’re actually doing a lexicographic
>> comparison and not filtering for empty lines (which could be achieved with
>> _.notEmpty or _.length > 0).
>> I think that filtering with _.contains should be sufficient and the
>> first filter can be omitted.
>>
>> As for line => line.split(“\t”).split(“,”):
>> You have to do a second map or (since split() requires a regex as input)
>> .split(“\t,”).
>> The problem is that your first split() call will generate an Array and
>> then your second call will result in an error.
>> e.g.
>>
>> val lines: Array[String] = line.split(“\t”)
>> lines.split(“,”) // Compilation error - no method split() exists for Array
>>
>> So either go with map(_.split(“\t”)).map(_.split(“,”)) or
>> map(_.split(“\t,”))
>>
>> Hope that helps.
>>
>> *Eliran Bivas*
>> Data Team | iguaz.io
>>
>>
>> On 3 Apr 2016, at 13:31, Mich Talebzadeh 
>> wrote:
>>
>> Hi,
>>
>> I am not sure this is the correct approach
>>
>> Read a text file in
>>
>> val f = sc.textFile("/tmp/ASE15UpgradeGuide.txt")
>>
>>
>> Now I want to get rid of empty lines and filter only the lines that
>> contain "ASE15"
>>
>>  f.filter(_ > "").filter(_ contains("ASE15")).
>>
>> The above works but I am not sure whether I need two filter
>> transformation above? Can it be done in one?
>>
>> Now I want to map the above filter to lines with carriage return ans
>> split them by ","
>>
>> f.filter(_ > "").filter(_ contains("ASE15")).map(line =>
>> (line.split("\t")))
>> res88: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[131] at
>> map at :30
>>
>> Now I want to split the output by ","
>>
>> scala> f.filter(_ > "").filter(_ contains("ASE15")).map(line =>
>> (line.split("\t").split(",")))
>> :30: error: value split is not a member of Array[String]
>>   f.filter(_ > "").filter(_ contains("ASE15")).map(line =>
>> (line.split("\t").split(",")))
>>
>> ^
>> Any advice will be appreciated
>>
>> Thanks
>>
>> Dr Mich Talebzadeh
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>>
>


Re: multiple splits fails

2016-04-03 Thread Eliran Bivas
Hi Mich,

1. The first underscore in your filter call is refering to a line in the file 
(as textFile() results in a collection of strings)
2. You're correct. No need for it.
3. Filter is expecting a Boolean result. So you can merge your contains filters 
to one with AND (&&) statement.
4. Correct. Each character in split() is used as a divider.

Eliran Bivas

From: Mich Talebzadeh 
Sent: Apr 3, 2016 15:06
To: Eliran Bivas
Cc: user @spark
Subject: Re: multiple splits fails

Hi Eliran,

Many thanks for your input on this.

I thought about what I was trying to achieve so I rewrote the logic as follows:


  1.  Read the text file in
  2.  Filter out empty lines (well not really needed here)
  3.  Search for lines that contain "ASE 15" and further have sentence "UPDATE 
INDEX STATISTICS" in the said line
  4.  Split the text by "\t" and ","
  5.  Print the outcome


This was what I did with your suggestions included

val f = sc.textFile("/tmp/ASE15UpgradeGuide.txt")
f.cache()
 f.filter(_.length > 0).filter(_ contains("ASE 15")).filter(_ contains("UPDATE 
INDEX STATISTICS")).flatMap(line => line.split("\t,")).map(word => (word, 
1)).reduceByKey(_ + _).collect.foreach(println)


Couple of questions if I may


  1.  I take that "_" refers to content of the file read in by default?
  2.  _.length > 0 basically filters out blank lines (not really needed here)
  3.  Multiple filters are needed for each *contains* logic
  4.  split"\t," splits the filter by carriage return AND ,?

Regards



Dr Mich Talebzadeh



LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com<http://talebzadehmich.wordpress.com/>



On 3 April 2016 at 12:35, Eliran Bivas 
mailto:elir...@iguaz.io>> wrote:
Hi Mich,

Few comments:

When doing .filter(_ > "") you're actually doing a lexicographic comparison and 
not filtering for empty lines (which could be achieved with _.notEmpty or 
_.length > 0).
I think that filtering with _.contains should be sufficient and the first 
filter can be omitted.

As for line => line.split("\t").split(","):
You have to do a second map or (since split() requires a regex as input) 
.split("\t,").
The problem is that your first split() call will generate an Array and then 
your second call will result in an error.
e.g.

val lines: Array[String] = line.split("\t")
lines.split(",") // Compilation error - no method split() exists for Array

So either go with map(_.split("\t")).map(_.split(",")) or map(_.split("\t,"))

Hope that helps.

Eliran Bivas
Data Team | iguaz.io<http://iguaz.io>


On 3 Apr 2016, at 13:31, Mich Talebzadeh 
mailto:mich.talebza...@gmail.com>> wrote:

Hi,

I am not sure this is the correct approach

Read a text file in

val f = sc.textFile("/tmp/ASE15UpgradeGuide.txt")


Now I want to get rid of empty lines and filter only the lines that contain 
"ASE15"

 f.filter(_ > "").filter(_ contains("ASE15")).

The above works but I am not sure whether I need two filter transformation 
above? Can it be done in one?

Now I want to map the above filter to lines with carriage return ans split them 
by ","

f.filter(_ > "").filter(_ contains("ASE15")).map(line => (line.split("\t")))
res88: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[131] at map 
at :30

Now I want to split the output by ","

scala> f.filter(_ > "").filter(_ contains("ASE15")).map(line => 
(line.split("\t").split(",")))
:30: error: value split is not a member of Array[String]
  f.filter(_ > "").filter(_ contains("ASE15")).map(line => 
(line.split("\t").split(",")))

 ^
Any advice will be appreciated

Thanks

Dr Mich Talebzadeh



LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com<http://talebzadehmich.wordpress.com/>






Re: multiple splits fails

2016-04-03 Thread Mich Talebzadeh
Hi Eliran,

Many thanks for your input on this.

I thought about what I was trying to achieve so I rewrote the logic as
follows:


   1. Read the text file in
   2. Filter out empty lines (well not really needed here)
   3. Search for lines that contain "ASE 15" and further have sentence
   "UPDATE INDEX STATISTICS" in the said line
   4. Split the text by "\t" and ","
   5. Print the outcome


This was what I did with your suggestions included

val f = sc.textFile("/tmp/ASE15UpgradeGuide.txt")
f.cache()
 f.filter(_.length > 0).filter(_ contains("ASE 15")).filter(_
contains("UPDATE INDEX STATISTICS")).flatMap(line =>
line.split("\t,")).map(word => (word, 1)).reduceByKey(_ +
_).collect.foreach(println)


Couple of questions if I may


   1. I take that "_" refers to content of the file read in by default?
   2. _.length > 0 basically filters out blank lines (not really needed
   here)
   3. Multiple filters are needed for each *contains* logic
   4. split"\t," splits the filter by carriage return AND ,?


Regards


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 3 April 2016 at 12:35, Eliran Bivas  wrote:

> Hi Mich,
>
> Few comments:
>
> When doing .filter(_ > “”) you’re actually doing a lexicographic
> comparison and not filtering for empty lines (which could be achieved with
> _.notEmpty or _.length > 0).
> I think that filtering with _.contains should be sufficient and the first
> filter can be omitted.
>
> As for line => line.split(“\t”).split(“,”):
> You have to do a second map or (since split() requires a regex as input)
> .split(“\t,”).
> The problem is that your first split() call will generate an Array and
> then your second call will result in an error.
> e.g.
>
> val lines: Array[String] = line.split(“\t”)
> lines.split(“,”) // Compilation error - no method split() exists for Array
>
> So either go with map(_.split(“\t”)).map(_.split(“,”)) or
> map(_.split(“\t,”))
>
> Hope that helps.
>
> *Eliran Bivas*
> Data Team | iguaz.io
>
>
> On 3 Apr 2016, at 13:31, Mich Talebzadeh 
> wrote:
>
> Hi,
>
> I am not sure this is the correct approach
>
> Read a text file in
>
> val f = sc.textFile("/tmp/ASE15UpgradeGuide.txt")
>
>
> Now I want to get rid of empty lines and filter only the lines that
> contain "ASE15"
>
>  f.filter(_ > "").filter(_ contains("ASE15")).
>
> The above works but I am not sure whether I need two filter transformation
> above? Can it be done in one?
>
> Now I want to map the above filter to lines with carriage return ans split
> them by ","
>
> f.filter(_ > "").filter(_ contains("ASE15")).map(line =>
> (line.split("\t")))
> res88: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[131] at
> map at :30
>
> Now I want to split the output by ","
>
> scala> f.filter(_ > "").filter(_ contains("ASE15")).map(line =>
> (line.split("\t").split(",")))
> :30: error: value split is not a member of Array[String]
>   f.filter(_ > "").filter(_ contains("ASE15")).map(line =>
> (line.split("\t").split(",")))
>
> ^
> Any advice will be appreciated
>
> Thanks
>
> Dr Mich Talebzadeh
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
> http://talebzadehmich.wordpress.com
>
>
>
>


Re: multiple splits fails

2016-04-03 Thread Eliran Bivas
Hi Mich,

Few comments:

When doing .filter(_ > “”) you’re actually doing a lexicographic comparison and 
not filtering for empty lines (which could be achieved with _.notEmpty or 
_.length > 0).
I think that filtering with _.contains should be sufficient and the first 
filter can be omitted.

As for line => line.split(“\t”).split(“,”):
You have to do a second map or (since split() requires a regex as input) 
.split(“\t,”).
The problem is that your first split() call will generate an Array and then 
your second call will result in an error.
e.g.

val lines: Array[String] = line.split(“\t”)
lines.split(“,”) // Compilation error - no method split() exists for Array

So either go with map(_.split(“\t”)).map(_.split(“,”)) or map(_.split(“\t,”))

Hope that helps.

Eliran Bivas
Data Team | iguaz.io


On 3 Apr 2016, at 13:31, Mich Talebzadeh 
mailto:mich.talebza...@gmail.com>> wrote:

Hi,

I am not sure this is the correct approach

Read a text file in

val f = sc.textFile("/tmp/ASE15UpgradeGuide.txt")


Now I want to get rid of empty lines and filter only the lines that contain 
"ASE15"

 f.filter(_ > "").filter(_ contains("ASE15")).

The above works but I am not sure whether I need two filter transformation 
above? Can it be done in one?

Now I want to map the above filter to lines with carriage return ans split them 
by ","

f.filter(_ > "").filter(_ contains("ASE15")).map(line => (line.split("\t")))
res88: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[131] at map 
at :30

Now I want to split the output by ","

scala> f.filter(_ > "").filter(_ contains("ASE15")).map(line => 
(line.split("\t").split(",")))
:30: error: value split is not a member of Array[String]
  f.filter(_ > "").filter(_ contains("ASE15")).map(line => 
(line.split("\t").split(",")))

 ^
Any advice will be appreciated

Thanks

Dr Mich Talebzadeh



LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com