spark streaming kafka not displaying data in local eclipse

2018-01-16 Thread vr spark
Hi,
I have a simple Java program to read data from kafka using spark streaming.

When i run it from eclipse on my mac, it is connecting to the zookeeper,
bootstrap nodes,
But its not displaying any data. it does not give any error.


it just shows

18/01/16 20:49:15 INFO Executor: Finished task 96.0 in stage 0.0 (TID 0).
1412 bytes result sent to driver
18/01/16 20:49:15 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID
2, localhost, partition 1, ANY, 5832 bytes)
18/01/16 20:49:15 INFO Executor: Running task 1.0 in stage 0.0 (TID 2)
18/01/16 20:49:15 INFO TaskSetManager: Finished task 96.0 in stage 0.0 (TID
0) in 111 ms on localhost (1/97)
18/01/16 20:49:15 INFO KafkaRDD: Computing topic data_stream, partition 16
offsets 25624028 -> 25624097
18/01/16 20:49:15 INFO VerifiableProperties: Verifying properties
18/01/16 20:49:15 INFO VerifiableProperties: Property auto.offset.reset is
overridden to largest
18/01/16 20:49:15 INFO VerifiableProperties: Property
fetch.message.max.bytes is overridden to 20971520
18/01/16 20:49:15 INFO VerifiableProperties: Property group.id is
overridden to VR-Test-Group
18/01/16 20:49:15 INFO VerifiableProperties: Property zookeeper.connect is
overridden to zk.kafka-cluster...:8091
18/01/16 20:49:25 INFO JobScheduler: Added jobs for time 151616456 ms
18/01/16 20:49:36 INFO JobScheduler: Added jobs for time 151616457 ms
18/01/16 20:49:45 INFO JobScheduler: Added jobs for time 151616458 ms
18/01/16 20:49:55 INFO JobScheduler: Added jobs for time 151616459 ms
18/01/16 20:50:07 INFO JobScheduler: Added jobs for time 151616460 ms
18/01/16 20:50:15 INFO JobScheduler: Added jobs for time 151616461 ms

But when i export it as jar and run it in a remote spark cluster , it does
display the actual data.

Please suggest what could be wrong.

thanks
VR


Re: covert local tsv file to orc file on distributed cloud storage(openstack).

2016-11-24 Thread vr spark
Hi, The source file i have is on local machine and its pretty huge like 150
gb.  How to go about it?

On Sun, Nov 20, 2016 at 8:52 AM, Steve Loughran <ste...@hortonworks.com>
wrote:

>
> On 19 Nov 2016, at 17:21, vr spark <vrspark...@gmail.com> wrote:
>
> Hi,
> I am looking for scala or python code samples to covert local tsv file to
> orc file and store on distributed cloud storage(openstack).
>
> So, need these 3 samples. Please suggest.
>
> 1. read tsv
> 2. convert to orc
> 3. store on distributed cloud storage
>
>
> thanks
> VR
>
>
> all options, 9 lines of code, assuming a spark context has already been
> setup with the permissions to write to AWS, and the relevant JARs for S3A
> to work on the CP. The read operation is inefficient as to determine the
> schema it scans the (here, remote) file twice; that may be OK for an
> example, but I wouldn't do that in production. The source is a real file
> belonging to amazon; dest a bucket of mine.
>
> More details at: http://www.slideshare.net/steve_l/apache-spark-and-
> object-stores
>
>
> val csvdata = spark.read.options(Map(
>   "header" -> "true",
>   "ignoreLeadingWhiteSpace" -> "true",
>   "ignoreTrailingWhiteSpace" -> "true",
>   "timestampFormat" -> "-MM-dd HH:mm:ss.SSSZZZ",
>   "inferSchema" -> "true",
>   "mode" -> "FAILFAST"))
> .csv("s3a://landsat-pds/scene_list.gz")
> csvdata.write.mode("overwrite").orc("s3a://hwdev-stevel-demo2/landsatOrc")
>


covert local tsv file to orc file on distributed cloud storage(openstack).

2016-11-19 Thread vr spark
Hi,
I am looking for scala or python code samples to covert local tsv file to
orc file and store on distributed cloud storage(openstack).

So, need these 3 samples. Please suggest.

1. read tsv
2. convert to orc
3. store on distributed cloud storage


thanks
VR


receiving stream data options

2016-10-13 Thread vr spark
Hi,
 I have a continuous rest api stream which keeps spitting out data in form
of json.
I access the stream using python requests.get(url, stream=True,
headers=headers).

I want to receive them using spark and do further processing. I am not sure
which is best way to receive it in spark.

What are the options i have.

Some options i can think of

1.  push data from rest api stream in to kakfa queue and use spark kafka
streaming utilities to capture data and further process.

2. push data from rest api stream to a local socket and use spark socket
stream utilities to capture data and further process.

3.  is there any other way to receive it?

thanks

VR


Re: spark-submit failing but job running from scala ide

2016-09-26 Thread vr spark
artition 1, PROCESS_LOCAL, 5474 bytes)
16/09/26 09:11:02 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
16/09/26 09:11:02 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
16/09/26 09:11:02 INFO Executor: Fetching spark://
192.168.1.3:59323/jars/scopt_2.11-3.3.0.jar with timestamp 1474906261472
16/09/26 09:12:17 INFO Executor: Fetching spark://
192.168.1.3:59323/jars/scopt_2.11-3.3.0.jar with timestamp 1474906261472
16/09/26 09:12:17 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.io.IOException: Failed to connect to /192.168.1.3:59323
java.io.IOException: Failed to connect to /192.168.1.3:59323
at
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:228)
at
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:179)
at
org.apache.spark.rpc.netty.NettyRpcEnv.downloadClient(NettyRpcEnv.scala:358)
at org.apache.spark.rpc.netty.NettyRpcEnv.openChannel(NettyRpcEnv.scala:324)
at org.apache.spark.util.Utils$.doFetchFile(Utils.scala:633)
at org.apache.spark.util.Utils$.fetchFile(Utils.scala:459)
at
org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:488)
at
org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:480)
at
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
at
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
at
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
at org.apache.spark.executor.Executor.org
$apache$spark$executor$Executor$$updateDependencies(Executor.scala:480)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:252)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.ConnectException: Operation timed out: /
192.168.1.3:59323





On Sun, Sep 25, 2016 at 8:32 AM, Jacek Laskowski <ja...@japila.pl> wrote:

> Hi,
>
> How did you install Spark 1.6? It's usually as simple as rm -rf
> $SPARK_1.6_HOME, but it really depends on how you installed it in the
> first place.
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>
>
> On Sun, Sep 25, 2016 at 4:32 PM, vr spark <vrspark...@gmail.com> wrote:
> > yes, i have both spark 1.6 and spark 2.0.
> > I unset the spark home environment variable and pointed spark submit to
> 2.0.
> > Its working now.
> >
> > How do i uninstall/remove spark 1.6 from mac?
> >
> > Thanks
> >
> >
> > On Sun, Sep 25, 2016 at 4:28 AM, Jacek Laskowski <ja...@japila.pl>
> wrote:
> >>
> >> Hi,
> >>
> >> Can you execute run-example SparkPi with your Spark installation?
> >>
> >> Also, see the logs:
> >>
> >> 16/09/24 23:15:15 WARN Utils: Service 'SparkUI' could not bind on port
> >> 4040. Attempting port 4041.
> >>
> >> 16/09/24 23:15:15 INFO Utils: Successfully started service 'SparkUI'
> >> on port 4041.
> >>
> >> You've got two Spark runtimes up that may or may not contribute to the
> >> issue.
> >>
> >> Pozdrawiam,
> >> Jacek Laskowski
> >> 
> >> https://medium.com/@jaceklaskowski/
> >> Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
> >> Follow me at https://twitter.com/jaceklaskowski
> >>
> >>
> >> On Sun, Sep 25, 2016 at 8:36 AM, vr spark <vrspark...@gmail.com> wrote:
> >> > Hi,
> >> > I have this simple scala app which works fine when i run it as scala
> >> > application from the scala IDE for eclipse.
> >> > But when i export is as jar and run it from spark-submit i am getting
> >> > below
> >> > error. Please suggest
> >> >
> >> > bin/spark-submit --class com.x.y.vr.spark.first.SimpleApp test.jar
> >> >
> >> > 16/09/24 23:15:15 WARN Utils: Service 'SparkUI' could not bind on port
> >> > 4040.
> >> > Attempting port 4041.
> >> >
> >> > 16/09/24 23:15:15 INFO Utils: Successfully sta

Running jobs against remote cluster from scala eclipse ide

2016-09-26 Thread vr spark
Hi,
I use scala IDE for eclipse. I usually run job against my local spark
installed on my mac and then export the jars and copy it to spark cluster
of my company and run spark submit on it.
This works fine.

But i want to run the jobs from scala ide directly using the spark cluster
of my company.
the spark master url of my company cluster is
spark://spark-437-1-5963003:7077.
one of the worker nodes of that cluster is 11.104.29.106

I tried this option,  but getting error

  val conf = new SparkConf().setAppName("Simple Application").setMaster(
"spark://spark-437-1-5963003:7077"). set("spark.driver.host","11.104.29.106"
)

please let me know.

16/09/25 08:51:51 INFO SparkContext: Running Spark version 2.0.0

16/09/25 08:51:51 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable

16/09/25 08:51:52 INFO SecurityManager: Changing view acls to: vr

16/09/25 08:51:52 INFO SecurityManager: Changing modify acls to: vr

16/09/25 08:51:52 INFO SecurityManager: Changing view acls groups to:

16/09/25 08:51:52 INFO SecurityManager: Changing modify acls groups to:

16/09/25 08:51:52 INFO SecurityManager: SecurityManager: authentication
disabled; ui acls disabled; users  with view permissions: Set(vr); groups
with view permissions: Set(); users  with modify permissions: Set(vr);
groups with modify permissions: Set()

16/09/25 08:51:52 WARN Utils: Service 'sparkDriver' could not bind on port
0. Attempting port 1.

16/09/25 08:51:52 WARN Utils: Service 'sparkDriver' could not bind on port
0. Attempting port 1.

16/09/25 08:51:52 WARN Utils: Service 'sparkDriver' could not bind on port
0. Attempting port 1.

16/09/25 08:51:52 WARN Utils: Service 'sparkDriver' could not bind on port
0. Attempting port 1.

16/09/25 08:51:52 WARN Utils: Service 'sparkDriver' could not bind on port
0. Attempting port 1.

16/09/25 08:51:52 WARN Utils: Service 'sparkDriver' could not bind on port
0. Attempting port 1.

16/09/25 08:51:52 WARN Utils: Service 'sparkDriver' could not bind on port
0. Attempting port 1.

16/09/25 08:51:52 WARN Utils: Service 'sparkDriver' could not bind on port
0. Attempting port 1.

16/09/25 08:51:52 WARN Utils: Service 'sparkDriver' could not bind on port
0. Attempting port 1.

16/09/25 08:51:52 WARN Utils: Service 'sparkDriver' could not bind on port
0. Attempting port 1.

16/09/25 08:51:52 WARN Utils: Service 'sparkDriver' could not bind on port
0. Attempting port 1.

16/09/25 08:51:52 WARN Utils: Service 'sparkDriver' could not bind on port
0. Attempting port 1.

16/09/25 08:51:52 WARN Utils: Service 'sparkDriver' could not bind on port
0. Attempting port 1.

16/09/25 08:51:52 WARN Utils: Service 'sparkDriver' could not bind on port
0. Attempting port 1.

16/09/25 08:51:52 WARN Utils: Service 'sparkDriver' could not bind on port
0. Attempting port 1.

16/09/25 08:51:52 WARN Utils: Service 'sparkDriver' could not bind on port
0. Attempting port 1.

16/09/25 08:51:52 ERROR SparkContext: Error initializing SparkContext.

java.net.BindException: Can't assign requested address: Service
'sparkDriver' failed after 16 retries! Consider explicitly setting the
appropriate port for the service 'sparkDriver' (for example spark.ui.port
for SparkUI) to an available port or increasing spark.port.maxRetries.

at sun.nio.ch.Net.bind0(Native Method)

at sun.nio.ch.Net.bind(Net.java:433)




*full class code*

object RatingsCounter {

  /** Our main function where the action happens */

  def main(args: Array[String]) {

Logger.getLogger("org").setLevel(Level.INFO)

val conf = new SparkConf().setAppName("Simple Application").
setMaster("spark://spark-437-1-5963003:7077"). set("spark.driver.host",
"11.104.29.106")



   val sc = new SparkContext(conf)

val lines = sc.textFile("u.data")

val ratings = lines.map(x => x.toString().split("\t")(2))

val results = ratings.countByValue()

val sortedResults = results.toSeq.sortBy(_._1)

sortedResults.foreach(println)

  }

}


Re: spark-submit failing but job running from scala ide

2016-09-25 Thread vr spark
yes, i have both spark 1.6 and spark 2.0.
I unset the spark home environment variable and pointed spark submit to 2.0.
Its working now.

How do i uninstall/remove spark 1.6 from mac?

Thanks


On Sun, Sep 25, 2016 at 4:28 AM, Jacek Laskowski <ja...@japila.pl> wrote:

> Hi,
>
> Can you execute run-example SparkPi with your Spark installation?
>
> Also, see the logs:
>
> 16/09/24 23:15:15 WARN Utils: Service 'SparkUI' could not bind on port
> 4040. Attempting port 4041.
>
> 16/09/24 23:15:15 INFO Utils: Successfully started service 'SparkUI'
> on port 4041.
>
> You've got two Spark runtimes up that may or may not contribute to the
> issue.
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>
>
> On Sun, Sep 25, 2016 at 8:36 AM, vr spark <vrspark...@gmail.com> wrote:
> > Hi,
> > I have this simple scala app which works fine when i run it as scala
> > application from the scala IDE for eclipse.
> > But when i export is as jar and run it from spark-submit i am getting
> below
> > error. Please suggest
> >
> > bin/spark-submit --class com.x.y.vr.spark.first.SimpleApp test.jar
> >
> > 16/09/24 23:15:15 WARN Utils: Service 'SparkUI' could not bind on port
> 4040.
> > Attempting port 4041.
> >
> > 16/09/24 23:15:15 INFO Utils: Successfully started service 'SparkUI' on
> port
> > 4041.
> >
> > 16/09/24 23:15:15 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at
> > http://192.168.1.3:4041
> >
> > 16/09/24 23:15:15 INFO SparkContext: Added JAR
> > file:/Users/vr/Downloads/spark-2.0.0/test.jar at
> > spark://192.168.1.3:59263/jars/test.jar with timestamp 1474784115210
> >
> > 16/09/24 23:15:15 INFO Executor: Starting executor ID driver on host
> > localhost
> >
> > 16/09/24 23:15:15 INFO Utils: Successfully started service
> > 'org.apache.spark.network.netty.NettyBlockTransferService' on port
> 59264.
> >
> > 16/09/24 23:15:15 INFO NettyBlockTransferService: Server created on
> > 192.168.1.3:59264
> >
> > 16/09/24 23:15:16 INFO TaskSetManager: Starting task 0.0 in stage 0.0
> (TID
> > 0, localhost, partition 0, PROCESS_LOCAL, 5354 bytes)
> >
> > 16/09/24 23:15:16 INFO TaskSetManager: Starting task 1.0 in stage 0.0
> (TID
> > 1, localhost, partition 1, PROCESS_LOCAL, 5354 bytes)
> >
> > 16/09/24 23:15:16 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
> >
> > 16/09/24 23:15:16 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
> >
> > 16/09/24 23:15:16 INFO Executor: Fetching
> > spark://192.168.1.3:59263/jars/test.jar with timestamp 1474784115210
> >
> > 16/09/24 23:16:31 INFO Executor: Fetching
> > spark://192.168.1.3:59263/jars/test.jar with timestamp 1474784115210
> >
> > 16/09/24 23:16:31 ERROR Executor: Exception in task 1.0 in stage 0.0
> (TID 1)
> >
> > java.io.IOException: Failed to connect to /192.168.1.3:59263
> >
> > at
> > org.apache.spark.network.client.TransportClientFactory.createClient(
> TransportClientFactory.java:228)
> >
> > at
> > org.apache.spark.network.client.TransportClientFactory.createClient(
> TransportClientFactory.java:179)
> >
> > at
> > org.apache.spark.rpc.netty.NettyRpcEnv.downloadClient(
> NettyRpcEnv.scala:358)
> >
> > at org.apache.spark.rpc.netty.NettyRpcEnv.openChannel(
> NettyRpcEnv.scala:324)
> >
> > at org.apache.spark.util.Utils$.doFetchFile(Utils.scala:633)
> >
> > at org.apache.spark.util.Utils$.fetchFile(Utils.scala:459)
> >
> > at
> > org.apache.spark.executor.Executor$$anonfun$org$apache$
> spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:488)
> >
> > at
> > org.apache.spark.executor.Executor$$anonfun$org$apache$
> spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:480)
> >
> > at
> > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(
> TraversableLike.scala:733)
> >
> > at
> > scala.collection.mutable.HashMap$$anonfun$foreach$1.
> apply(HashMap.scala:99)
> >
> > at
> > scala.collection.mutable.HashMap$$anonfun$foreach$1.
> apply(HashMap.scala:99)
> >
> > at
> > scala.collection.mutable.HashTable$class.foreachEntry(
> HashTable.scala:230)
> >
> > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
> >
> > at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
> >
> > at
&g

spark-submit failing but job running from scala ide

2016-09-25 Thread vr spark
Hi,
I have this simple scala app which works fine when i run it as scala
application from the scala IDE for eclipse.
But when i export is as jar and run it from spark-submit i am getting below
error. Please suggest

*bin/spark-submit --class com.x.y.vr.spark.first.SimpleApp test.jar*

16/09/24 23:15:15 WARN Utils: Service 'SparkUI' could not bind on port
4040. Attempting port 4041.

16/09/24 23:15:15 INFO Utils: Successfully started service 'SparkUI' on
port 4041.

16/09/24 23:15:15 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at
http://192.168.1.3:4041

16/09/24 23:15:15 INFO SparkContext: Added JAR
file:/Users/vr/Downloads/spark-2.0.0/test.jar at spark://
192.168.1.3:59263/jars/test.jar with timestamp 1474784115210

16/09/24 23:15:15 INFO Executor: Starting executor ID driver on host
localhost

16/09/24 23:15:15 INFO Utils: Successfully started service
'org.apache.spark.network.netty.NettyBlockTransferService' on port 59264.

16/09/24 23:15:15 INFO NettyBlockTransferService: Server created on
192.168.1.3:59264

16/09/24 23:15:16 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID
0, localhost, partition 0, PROCESS_LOCAL, 5354 bytes)

16/09/24 23:15:16 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID
1, localhost, partition 1, PROCESS_LOCAL, 5354 bytes)

16/09/24 23:15:16 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)

16/09/24 23:15:16 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)

16/09/24 23:15:16 INFO Executor: Fetching spark://
192.168.1.3:59263/jars/test.jar with timestamp 1474784115210

16/09/24 23:16:31 INFO Executor: Fetching spark://
192.168.1.3:59263/jars/test.jar with timestamp 1474784115210

16/09/24 23:16:31 ERROR Executor: Exception in task 1.0 in stage 0.0 (TID 1)

java.io.IOException: Failed to connect to /192.168.1.3:59263

at
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:228)

at
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:179)

at
org.apache.spark.rpc.netty.NettyRpcEnv.downloadClient(NettyRpcEnv.scala:358)

at org.apache.spark.rpc.netty.NettyRpcEnv.openChannel(NettyRpcEnv.scala:324)

at org.apache.spark.util.Utils$.doFetchFile(Utils.scala:633)

at org.apache.spark.util.Utils$.fetchFile(Utils.scala:459)

at
org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:488)

at
org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:480)

at
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)

at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)

at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)

at
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)

at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)

at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)

at
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)

at org.apache.spark.executor.Executor.org
$apache$spark$executor$Executor$$updateDependencies(Executor.scala:480)

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:252)

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:745)




*My Scala code*


package com.x.y.vr.spark.first

/* SimpleApp.scala */

import org.apache.spark.SparkContext

import org.apache.spark.SparkContext._

import org.apache.spark.SparkConf

object SimpleApp {

  def main(args: Array[String]) {

val logFile = "/Users/vttrich/Downloads/spark-2.0.0/README.md" //
Should be some file on your system

val conf = new SparkConf().setAppName("Simple Application")

val sc = new SparkContext("local[*]", "RatingsCounter")

//val sc = new SparkContext(conf)

val logData = sc.textFile(logFile, 2).cache()

val numAs = logData.filter(line => line.contains("a")).count()

val numBs = logData.filter(line => line.contains("b")).count()

println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))

  }

}


Re: Undefined function json_array_to_map

2016-08-17 Thread vr spark
Hi Ted/All,
i did below to get fullstack and see below, not able to understand root
cause..

except Exception as error:

traceback.print_exc()

and this what i get...


 File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/context.py",
line 580, in sql

return DataFrame(self._ssql_ctx.sql(sqlQuery), self)

  File "/usr/local/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py",
line 813, in __call__

answer, self.gateway_client, self.target_id, self.name)

  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line
51, in deco

raise AnalysisException(s.split(': ', 1)[1], stackTrace)

AnalysisException: u'undefined function json_array_to_map; line 28 pos 73'

On Wed, Aug 17, 2016 at 8:59 AM, vr spark <vrspark...@gmail.com> wrote:

> spark 1.6.1
> python
>
> I0817 08:51:59.099356 15189 detector.cpp:481] A new leading master (UPID=
> master@10.224.167.25:5050) is detected
> I0817 08:51:59.099735 15188 sched.cpp:262] New master detected at
> master@x.y.17.25:4550
> I0817 08:51:59.100888 15188 sched.cpp:272] No credentials provided.
> Attempting to register without authentication
> I0817 08:51:59.326017 15190 sched.cpp:641] Framework registered with
> b859f266-9984-482d-8c0d-35bd88c1ad0a-6996
> 16/08/17 08:52:06 WARN ObjectStore: Version information not found in
> metastore. hive.metastore.schema.verification is not enabled so recording
> the schema version 1.2.0
> 16/08/17 08:52:06 WARN ObjectStore: Failed to get database default,
> returning NoSuchObjectException
> Traceback (most recent call last):
>   File "/data1/home/vttrich/spk/orig_qryhubb.py", line 17, in 
> res=sqlcont.sql("select parti_date FROM log_data WHERE parti_date  >=
> 408910 limit 10")
>   File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/context.py",
> line 580, in sql
>   File "/usr/local/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py",
> line 813, in __call__
>   File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/utils.py",
> line 51, in deco
> pyspark.sql.utils.AnalysisException: u'undefined function
> json_array_to_map; line 28 pos 73'
> I0817 08:52:12.840224 15600 sched.cpp:1771] Asked to stop the driver
> I0817 08:52:12.841198 15189 sched.cpp:1040] Stopping framework
> 'b859f2f3-7484-482d-8c0d-35bd91c1ad0a-6326'
>
>
> On Wed, Aug 17, 2016 at 8:50 AM, Ted Yu <yuzhih...@gmail.com> wrote:
>
>> Can you show the complete stack trace ?
>>
>> Which version of Spark are you using ?
>>
>> Thanks
>>
>> On Wed, Aug 17, 2016 at 8:46 AM, vr spark <vrspark...@gmail.com> wrote:
>>
>>> Hi,
>>> I am getting error on below scenario. Please suggest.
>>>
>>> i have  a virtual view in hive
>>>
>>> view name log_data
>>> it has 2 columns
>>>
>>> query_map   map<string,string>
>>>
>>> parti_date int
>>>
>>>
>>> Here is my snippet for the spark data frame
>>>
>>> my dataframe
>>>
>>> res=sqlcont.sql("select parti_date FROM log_data WHERE parti_date  >=
>>> 408910 limit 10")
>>>
>>> df=res.collect()
>>>
>>> print 'after collect'
>>>
>>> print df
>>>
>>>
>>> * File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/utils.py",
>>> line 51, in deco*
>>>
>>> *pyspark.sql.utils.AnalysisException: u'undefined function
>>> json_array_to_map; line 28 pos 73'*
>>>
>>>
>>>
>>>
>>>
>>
>


Re: Attempting to accept an unknown offer

2016-08-17 Thread vr spark
My code is very simple, if i use other hive tables, my code works fine. This
 particular table (virtual view) is huge and might have  more metadata.

It has only two columns.

virtual view name is : cluster_table

# col_namedata_type

 ln   string

 parti  int


here is snippet...

from pyspark import SparkConf, SparkContext

from pyspark.sql import HiveContext

import pyspark.sql

import json

myconf=SparkConf().setAppName("sql")

spcont=SparkContext(conf=myconf)

sqlcont=HiveContext(spcont)

res=sqlcont.sql("select  parti FROM h.cluster_table WHERE parti > 408910
and parti <408911  limit 10")

print res.printSchema()

print 'res'

print res

df=res.collect()

print 'after collect'

print df


Here is the ouput after i submit the job

I0817 09:18:40.606465 31409 sched.cpp:262] New master detected at
master@x.y.17.56:6750

I0817 09:18:40.607461 31409 sched.cpp:272] No credentials provided.
Attempting to register without authentication

I0817 09:18:40.612763 31409 sched.cpp:641] Framework registered with
b859f2f3-7484-482d-8c0d-35bd91c1ad0a-6336

16/08/17 09:18:57 WARN ObjectStore: Version information not found in
metastore. hive.metastore.schema.verification is not enabled so recording
the schema version 1.2.0

16/08/17 09:18:57 WARN ObjectStore: Failed to get database default,
returning NoSuchObjectException

root

 |-- parti: integer (nullable = true)


None

res

DataFrame[partition_epoch_hourtenth: int]

2016-08-17 09:19:20,648:31315(0x7fafebfb1700):ZOO_WARN@zookeeper_interest@1557:
Exceeded deadline by 19ms

2016-08-17 09:19:30,662:31315(0x7fafebfb1700):ZOO_WARN@zookeeper_interest@1557:
Exceeded deadline by 13ms

W0817 09:20:01.715824 31412 sched.cpp:1195] Attempting to accept an unknown
offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O168676564

W0817 09:20:01.716455 31412 sched.cpp:1195] Attempting to accept an unknown
offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O168676630

W0817 09:20:01.716645 31412 sched.cpp:1195] Attempting to accept an unknown
offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O168676713

W0817 09:20:01.724409 31412 sched.cpp:1195] Attempting to accept an unknown
offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O168676554

W0817 09:20:01.724728 31412 sched.cpp:1195] Attempting to accept an unknown
offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O168676555

W0817 09:20:01.724936 31412 sched.cpp:1195] Attempting to accept an unknown
offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O168676556

W0817 09:20:01.725126 31412 sched.cpp:1195] Attempting to accept an unknown
offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O168676557

W0817 09:20:01.725309 31412 sched.cpp:1195] Attempting to accept an unknown
offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O168676558.

and many more lines like this on the screen with similar message



On Wed, Aug 17, 2016 at 9:08 AM, Ted Yu <yuzhih...@gmail.com> wrote:

> Please include user@ in your reply.
>
> Can you reveal the snippet of hive sql ?
>
> On Wed, Aug 17, 2016 at 9:04 AM, vr spark <vrspark...@gmail.com> wrote:
>
>> spark 1.6.1
>> mesos
>> job is running for like 10-15 minutes and giving this message and i
>> killed it.
>>
>> In this job, i am creating data frame from a hive sql. There are other
>> similar jobs which work fine
>>
>> On Wed, Aug 17, 2016 at 8:52 AM, Ted Yu <yuzhih...@gmail.com> wrote:
>>
>>> Can you provide more information ?
>>>
>>> Were you running on YARN ?
>>> Which version of Spark are you using ?
>>>
>>> Was your job failing ?
>>>
>>> Thanks
>>>
>>> On Wed, Aug 17, 2016 at 8:46 AM, vr spark <vrspark...@gmail.com> wrote:
>>>
>>>>
>>>> W0816 23:17:01.984846 16360 sched.cpp:1195] Attempting to accept an
>>>> unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O162910492
>>>>
>>>> W0816 23:17:01.984987 16360 sched.cpp:1195] Attempting to accept an
>>>> unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O162910493
>>>>
>>>> W0816 23:17:01.985124 16360 sched.cpp:1195] Attempting to accept an
>>>> unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O162910494
>>>>
>>>> W0816 23:17:01.985339 16360 sched.cpp:1195] Attempting to accept an
>>>> unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O162910495
>>>>
>>>> W0816 23:17:01.985508 16360 sched.cpp:1195] Attempting to accept an
>>>> unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O162910496
>>>>
>>>> W0816 23:17:01.985651 16360 sched.cpp:1195] Attempting to accept an
>>>> unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O162910497
>>>>
>>>> W0816 23:17:01.985801 16360 sched.cpp:1195] Attempting to acce

Attempting to accept an unknown offer

2016-08-17 Thread vr spark
W0816 23:17:01.984846 16360 sched.cpp:1195] Attempting to accept an unknown
offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O162910492

W0816 23:17:01.984987 16360 sched.cpp:1195] Attempting to accept an unknown
offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O162910493

W0816 23:17:01.985124 16360 sched.cpp:1195] Attempting to accept an unknown
offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O162910494

W0816 23:17:01.985339 16360 sched.cpp:1195] Attempting to accept an unknown
offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O162910495

W0816 23:17:01.985508 16360 sched.cpp:1195] Attempting to accept an unknown
offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O162910496

W0816 23:17:01.985651 16360 sched.cpp:1195] Attempting to accept an unknown
offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O162910497

W0816 23:17:01.985801 16360 sched.cpp:1195] Attempting to accept an unknown
offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O162910498

W0816 23:17:01.985961 16360 sched.cpp:1195] Attempting to accept an unknown
offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O162910499

W0816 23:17:01.986121 16360 sched.cpp:1195] Attempting to accept an unknown
offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O162910500

2016-08-16 23:18:41,877:16226(0x7f71271b6700):ZOO_WARN@
zookeeper_interest@1557: Exceeded deadline by 13ms

2016-08-16 23:21:12,007:16226(0x7f71271b6700):ZOO_WARN@
zookeeper_interest@1557: Exceeded deadline by 11ms


Undefined function json_array_to_map

2016-08-17 Thread vr spark
Hi,
I am getting error on below scenario. Please suggest.

i have  a virtual view in hive

view name log_data
it has 2 columns

query_map   map

parti_date int


Here is my snippet for the spark data frame

my dataframe

res=sqlcont.sql("select parti_date FROM log_data WHERE parti_date  >=
408910 limit 10")

df=res.collect()

print 'after collect'

print df


* File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line
51, in deco*

*pyspark.sql.utils.AnalysisException: u'undefined function
json_array_to_map; line 28 pos 73'*


Re: dataframe row list question

2016-08-12 Thread vr spark
Hi Experts,
 Please suggest

On Thu, Aug 11, 2016 at 7:54 AM, vr spark <vrspark...@gmail.com> wrote:

>
> I have data which is json in this format
>
>  myList: array
>  |||-- elem: struct
>  ||||-- nm: string (nullable = true)
>  ||||-- vList: array (nullable = true)
>  |||||-- element: string (containsNull = true)
>
>
>  from my kafka stream, i created a dataframe using sqlContext.jsonRDD
>  Then registred it as registerTempTable
>  selected mylist from this table and i see this output. It is a list of
> rows
>
> [Row(nm=u'Apt', vList=[u'image']), Row(nm=u'Agent', vList=[u'Mozilla/5.0
> ']), Row(nm=u'Ip', vList=[u'xx.yy.106.25'])]
>
>  My requirement is to get only rows with nm='IP' and its corresponding
> value
> I would need IP, xx.yy.106.25
>
>
> Please suggest
>


dataframe row list question

2016-08-11 Thread vr spark
I have data which is json in this format

 myList: array
 |||-- elem: struct
 ||||-- nm: string (nullable = true)
 ||||-- vList: array (nullable = true)
 |||||-- element: string (containsNull = true)


 from my kafka stream, i created a dataframe using sqlContext.jsonRDD
 Then registred it as registerTempTable
 selected mylist from this table and i see this output. It is a list of rows

[Row(nm=u'Apt', vList=[u'image']), Row(nm=u'Agent', vList=[u'Mozilla/5.0
']), Row(nm=u'Ip', vList=[u'xx.yy.106.25'])]

 My requirement is to get only rows with nm='IP' and its corresponding value
I would need IP, xx.yy.106.25


Please suggest


Re: read only specific jsons

2016-07-27 Thread vr spark
HI ,
I tried and getting exception still..any other suggestion?

clickDF = cDF.filter(cDF['request.clientIP'].isNotNull())

It fails for some cases and errors our with below message

AnalysisException: u'No such struct field clientIP in cookies,
nscClientIP1, nscClientIP2, uAgent;'

On Tue, Jul 26, 2016 at 12:05 PM, Cody Koeninger <c...@koeninger.org> wrote:

> Have you tried filtering out corrupt records with something along the
> lines of
>
>  df.filter(df("_corrupt_record").isNull)
>
> On Tue, Jul 26, 2016 at 1:53 PM, vr spark <vrspark...@gmail.com> wrote:
> > i am reading data from kafka using spark streaming.
> >
> > I am reading json and creating dataframe.
> > I am using pyspark
> >
> > kvs = KafkaUtils.createDirectStream(ssc, kafkaTopic1, kafkaParams)
> >
> > lines = kvs.map(lambda x: x[1])
> >
> > lines.foreachRDD(mReport)
> >
> > def mReport(clickRDD):
> >
> >clickDF = sqlContext.jsonRDD(clickRDD)
> >
> >clickDF.registerTempTable("clickstream")
> >
> >PagesDF = sqlContext.sql(
> >
> > "SELECT   request.clientIP as ip "
> >
> > "FROM clickstream "
> >
> > "WHERE request.clientIP is not null "
> >
> > " limit 2000 "
> >
> >
> > The problem is that not all the jsons from the stream have the same
> format.
> >
> > It works when it reads a json which has ip.
> >
> > Some of the json strings do not have client ip in their schema.
> >
> > So i am getting error and my job is failing when it encounters such a
> json.
> >
> > How do read only those json which has ip in their schema?
> >
> > Please suggest.
>


read only specific jsons

2016-07-26 Thread vr spark
i am reading data from kafka using spark streaming.

I am reading json and creating dataframe.
I am using pyspark

kvs = KafkaUtils.createDirectStream(ssc, kafkaTopic1, kafkaParams)

lines = kvs.map(lambda x: x[1])

lines.foreachRDD(mReport)

def mReport(clickRDD):

   clickDF = sqlContext.jsonRDD(clickRDD)

   clickDF.registerTempTable("clickstream")

   PagesDF = sqlContext.sql(

"SELECT   request.clientIP as ip "

"FROM clickstream "

"WHERE request.clientIP is not null "

" limit 2000 "


The problem is that not all the jsons from the stream have the same format.

It works when it reads a json which has ip.

Some of the json strings do not have client ip in their schema.

So i am getting error and my job is failing when it encounters such a json.

How do read only those json which has ip in their schema?

Please suggest.


read only specific jsons

2016-07-26 Thread vr spark
i am reading data from kafka using spark streaming.

I am reading json and creating dataframe.

kvs = KafkaUtils.createDirectStream(ssc, kafkaTopic1, kafkaParams)

lines = kvs.map(lambda x: x[1])

lines.foreachRDD(mReport)

def mReport(clickRDD):

clickDF = sqlContext.jsonRDD(clickRDD)

clickDF.registerTempTable("clickstream")

PagesDF = sqlContext.sql(

"SELECT   request.clientIP as ip "

"FROM clickstream "

"WHERE request.clientIP is not null "

" limit 2000 "


The problem is that not all the jsons from the stream have the same format.

It works when it reads a json which has ip.

Some of the json strings do not have client ip in their schema.

So i am getting error and my job is failing when it encounters such a json.

How do read only those json which has ip in their schema?

Please suggest.