spark streaming kafka not displaying data in local eclipse
Hi, I have a simple Java program to read data from kafka using spark streaming. When i run it from eclipse on my mac, it is connecting to the zookeeper, bootstrap nodes, But its not displaying any data. it does not give any error. it just shows 18/01/16 20:49:15 INFO Executor: Finished task 96.0 in stage 0.0 (TID 0). 1412 bytes result sent to driver 18/01/16 20:49:15 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 2, localhost, partition 1, ANY, 5832 bytes) 18/01/16 20:49:15 INFO Executor: Running task 1.0 in stage 0.0 (TID 2) 18/01/16 20:49:15 INFO TaskSetManager: Finished task 96.0 in stage 0.0 (TID 0) in 111 ms on localhost (1/97) 18/01/16 20:49:15 INFO KafkaRDD: Computing topic data_stream, partition 16 offsets 25624028 -> 25624097 18/01/16 20:49:15 INFO VerifiableProperties: Verifying properties 18/01/16 20:49:15 INFO VerifiableProperties: Property auto.offset.reset is overridden to largest 18/01/16 20:49:15 INFO VerifiableProperties: Property fetch.message.max.bytes is overridden to 20971520 18/01/16 20:49:15 INFO VerifiableProperties: Property group.id is overridden to VR-Test-Group 18/01/16 20:49:15 INFO VerifiableProperties: Property zookeeper.connect is overridden to zk.kafka-cluster...:8091 18/01/16 20:49:25 INFO JobScheduler: Added jobs for time 151616456 ms 18/01/16 20:49:36 INFO JobScheduler: Added jobs for time 151616457 ms 18/01/16 20:49:45 INFO JobScheduler: Added jobs for time 151616458 ms 18/01/16 20:49:55 INFO JobScheduler: Added jobs for time 151616459 ms 18/01/16 20:50:07 INFO JobScheduler: Added jobs for time 151616460 ms 18/01/16 20:50:15 INFO JobScheduler: Added jobs for time 151616461 ms But when i export it as jar and run it in a remote spark cluster , it does display the actual data. Please suggest what could be wrong. thanks VR
Re: covert local tsv file to orc file on distributed cloud storage(openstack).
Hi, The source file i have is on local machine and its pretty huge like 150 gb. How to go about it? On Sun, Nov 20, 2016 at 8:52 AM, Steve Loughran <ste...@hortonworks.com> wrote: > > On 19 Nov 2016, at 17:21, vr spark <vrspark...@gmail.com> wrote: > > Hi, > I am looking for scala or python code samples to covert local tsv file to > orc file and store on distributed cloud storage(openstack). > > So, need these 3 samples. Please suggest. > > 1. read tsv > 2. convert to orc > 3. store on distributed cloud storage > > > thanks > VR > > > all options, 9 lines of code, assuming a spark context has already been > setup with the permissions to write to AWS, and the relevant JARs for S3A > to work on the CP. The read operation is inefficient as to determine the > schema it scans the (here, remote) file twice; that may be OK for an > example, but I wouldn't do that in production. The source is a real file > belonging to amazon; dest a bucket of mine. > > More details at: http://www.slideshare.net/steve_l/apache-spark-and- > object-stores > > > val csvdata = spark.read.options(Map( > "header" -> "true", > "ignoreLeadingWhiteSpace" -> "true", > "ignoreTrailingWhiteSpace" -> "true", > "timestampFormat" -> "-MM-dd HH:mm:ss.SSSZZZ", > "inferSchema" -> "true", > "mode" -> "FAILFAST")) > .csv("s3a://landsat-pds/scene_list.gz") > csvdata.write.mode("overwrite").orc("s3a://hwdev-stevel-demo2/landsatOrc") >
covert local tsv file to orc file on distributed cloud storage(openstack).
Hi, I am looking for scala or python code samples to covert local tsv file to orc file and store on distributed cloud storage(openstack). So, need these 3 samples. Please suggest. 1. read tsv 2. convert to orc 3. store on distributed cloud storage thanks VR
receiving stream data options
Hi, I have a continuous rest api stream which keeps spitting out data in form of json. I access the stream using python requests.get(url, stream=True, headers=headers). I want to receive them using spark and do further processing. I am not sure which is best way to receive it in spark. What are the options i have. Some options i can think of 1. push data from rest api stream in to kakfa queue and use spark kafka streaming utilities to capture data and further process. 2. push data from rest api stream to a local socket and use spark socket stream utilities to capture data and further process. 3. is there any other way to receive it? thanks VR
Re: spark-submit failing but job running from scala ide
artition 1, PROCESS_LOCAL, 5474 bytes) 16/09/26 09:11:02 INFO Executor: Running task 1.0 in stage 0.0 (TID 1) 16/09/26 09:11:02 INFO Executor: Running task 0.0 in stage 0.0 (TID 0) 16/09/26 09:11:02 INFO Executor: Fetching spark:// 192.168.1.3:59323/jars/scopt_2.11-3.3.0.jar with timestamp 1474906261472 16/09/26 09:12:17 INFO Executor: Fetching spark:// 192.168.1.3:59323/jars/scopt_2.11-3.3.0.jar with timestamp 1474906261472 16/09/26 09:12:17 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) java.io.IOException: Failed to connect to /192.168.1.3:59323 java.io.IOException: Failed to connect to /192.168.1.3:59323 at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:228) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:179) at org.apache.spark.rpc.netty.NettyRpcEnv.downloadClient(NettyRpcEnv.scala:358) at org.apache.spark.rpc.netty.NettyRpcEnv.openChannel(NettyRpcEnv.scala:324) at org.apache.spark.util.Utils$.doFetchFile(Utils.scala:633) at org.apache.spark.util.Utils$.fetchFile(Utils.scala:459) at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:488) at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:480) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) at scala.collection.mutable.HashMap.foreach(HashMap.scala:99) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732) at org.apache.spark.executor.Executor.org $apache$spark$executor$Executor$$updateDependencies(Executor.scala:480) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:252) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.net.ConnectException: Operation timed out: / 192.168.1.3:59323 On Sun, Sep 25, 2016 at 8:32 AM, Jacek Laskowski <ja...@japila.pl> wrote: > Hi, > > How did you install Spark 1.6? It's usually as simple as rm -rf > $SPARK_1.6_HOME, but it really depends on how you installed it in the > first place. > > Pozdrawiam, > Jacek Laskowski > > https://medium.com/@jaceklaskowski/ > Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark > Follow me at https://twitter.com/jaceklaskowski > > > On Sun, Sep 25, 2016 at 4:32 PM, vr spark <vrspark...@gmail.com> wrote: > > yes, i have both spark 1.6 and spark 2.0. > > I unset the spark home environment variable and pointed spark submit to > 2.0. > > Its working now. > > > > How do i uninstall/remove spark 1.6 from mac? > > > > Thanks > > > > > > On Sun, Sep 25, 2016 at 4:28 AM, Jacek Laskowski <ja...@japila.pl> > wrote: > >> > >> Hi, > >> > >> Can you execute run-example SparkPi with your Spark installation? > >> > >> Also, see the logs: > >> > >> 16/09/24 23:15:15 WARN Utils: Service 'SparkUI' could not bind on port > >> 4040. Attempting port 4041. > >> > >> 16/09/24 23:15:15 INFO Utils: Successfully started service 'SparkUI' > >> on port 4041. > >> > >> You've got two Spark runtimes up that may or may not contribute to the > >> issue. > >> > >> Pozdrawiam, > >> Jacek Laskowski > >> > >> https://medium.com/@jaceklaskowski/ > >> Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark > >> Follow me at https://twitter.com/jaceklaskowski > >> > >> > >> On Sun, Sep 25, 2016 at 8:36 AM, vr spark <vrspark...@gmail.com> wrote: > >> > Hi, > >> > I have this simple scala app which works fine when i run it as scala > >> > application from the scala IDE for eclipse. > >> > But when i export is as jar and run it from spark-submit i am getting > >> > below > >> > error. Please suggest > >> > > >> > bin/spark-submit --class com.x.y.vr.spark.first.SimpleApp test.jar > >> > > >> > 16/09/24 23:15:15 WARN Utils: Service 'SparkUI' could not bind on port > >> > 4040. > >> > Attempting port 4041. > >> > > >> > 16/09/24 23:15:15 INFO Utils: Successfully sta
Running jobs against remote cluster from scala eclipse ide
Hi, I use scala IDE for eclipse. I usually run job against my local spark installed on my mac and then export the jars and copy it to spark cluster of my company and run spark submit on it. This works fine. But i want to run the jobs from scala ide directly using the spark cluster of my company. the spark master url of my company cluster is spark://spark-437-1-5963003:7077. one of the worker nodes of that cluster is 11.104.29.106 I tried this option, but getting error val conf = new SparkConf().setAppName("Simple Application").setMaster( "spark://spark-437-1-5963003:7077"). set("spark.driver.host","11.104.29.106" ) please let me know. 16/09/25 08:51:51 INFO SparkContext: Running Spark version 2.0.0 16/09/25 08:51:51 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 16/09/25 08:51:52 INFO SecurityManager: Changing view acls to: vr 16/09/25 08:51:52 INFO SecurityManager: Changing modify acls to: vr 16/09/25 08:51:52 INFO SecurityManager: Changing view acls groups to: 16/09/25 08:51:52 INFO SecurityManager: Changing modify acls groups to: 16/09/25 08:51:52 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(vr); groups with view permissions: Set(); users with modify permissions: Set(vr); groups with modify permissions: Set() 16/09/25 08:51:52 WARN Utils: Service 'sparkDriver' could not bind on port 0. Attempting port 1. 16/09/25 08:51:52 WARN Utils: Service 'sparkDriver' could not bind on port 0. Attempting port 1. 16/09/25 08:51:52 WARN Utils: Service 'sparkDriver' could not bind on port 0. Attempting port 1. 16/09/25 08:51:52 WARN Utils: Service 'sparkDriver' could not bind on port 0. Attempting port 1. 16/09/25 08:51:52 WARN Utils: Service 'sparkDriver' could not bind on port 0. Attempting port 1. 16/09/25 08:51:52 WARN Utils: Service 'sparkDriver' could not bind on port 0. Attempting port 1. 16/09/25 08:51:52 WARN Utils: Service 'sparkDriver' could not bind on port 0. Attempting port 1. 16/09/25 08:51:52 WARN Utils: Service 'sparkDriver' could not bind on port 0. Attempting port 1. 16/09/25 08:51:52 WARN Utils: Service 'sparkDriver' could not bind on port 0. Attempting port 1. 16/09/25 08:51:52 WARN Utils: Service 'sparkDriver' could not bind on port 0. Attempting port 1. 16/09/25 08:51:52 WARN Utils: Service 'sparkDriver' could not bind on port 0. Attempting port 1. 16/09/25 08:51:52 WARN Utils: Service 'sparkDriver' could not bind on port 0. Attempting port 1. 16/09/25 08:51:52 WARN Utils: Service 'sparkDriver' could not bind on port 0. Attempting port 1. 16/09/25 08:51:52 WARN Utils: Service 'sparkDriver' could not bind on port 0. Attempting port 1. 16/09/25 08:51:52 WARN Utils: Service 'sparkDriver' could not bind on port 0. Attempting port 1. 16/09/25 08:51:52 WARN Utils: Service 'sparkDriver' could not bind on port 0. Attempting port 1. 16/09/25 08:51:52 ERROR SparkContext: Error initializing SparkContext. java.net.BindException: Can't assign requested address: Service 'sparkDriver' failed after 16 retries! Consider explicitly setting the appropriate port for the service 'sparkDriver' (for example spark.ui.port for SparkUI) to an available port or increasing spark.port.maxRetries. at sun.nio.ch.Net.bind0(Native Method) at sun.nio.ch.Net.bind(Net.java:433) *full class code* object RatingsCounter { /** Our main function where the action happens */ def main(args: Array[String]) { Logger.getLogger("org").setLevel(Level.INFO) val conf = new SparkConf().setAppName("Simple Application"). setMaster("spark://spark-437-1-5963003:7077"). set("spark.driver.host", "11.104.29.106") val sc = new SparkContext(conf) val lines = sc.textFile("u.data") val ratings = lines.map(x => x.toString().split("\t")(2)) val results = ratings.countByValue() val sortedResults = results.toSeq.sortBy(_._1) sortedResults.foreach(println) } }
Re: spark-submit failing but job running from scala ide
yes, i have both spark 1.6 and spark 2.0. I unset the spark home environment variable and pointed spark submit to 2.0. Its working now. How do i uninstall/remove spark 1.6 from mac? Thanks On Sun, Sep 25, 2016 at 4:28 AM, Jacek Laskowski <ja...@japila.pl> wrote: > Hi, > > Can you execute run-example SparkPi with your Spark installation? > > Also, see the logs: > > 16/09/24 23:15:15 WARN Utils: Service 'SparkUI' could not bind on port > 4040. Attempting port 4041. > > 16/09/24 23:15:15 INFO Utils: Successfully started service 'SparkUI' > on port 4041. > > You've got two Spark runtimes up that may or may not contribute to the > issue. > > Pozdrawiam, > Jacek Laskowski > > https://medium.com/@jaceklaskowski/ > Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark > Follow me at https://twitter.com/jaceklaskowski > > > On Sun, Sep 25, 2016 at 8:36 AM, vr spark <vrspark...@gmail.com> wrote: > > Hi, > > I have this simple scala app which works fine when i run it as scala > > application from the scala IDE for eclipse. > > But when i export is as jar and run it from spark-submit i am getting > below > > error. Please suggest > > > > bin/spark-submit --class com.x.y.vr.spark.first.SimpleApp test.jar > > > > 16/09/24 23:15:15 WARN Utils: Service 'SparkUI' could not bind on port > 4040. > > Attempting port 4041. > > > > 16/09/24 23:15:15 INFO Utils: Successfully started service 'SparkUI' on > port > > 4041. > > > > 16/09/24 23:15:15 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at > > http://192.168.1.3:4041 > > > > 16/09/24 23:15:15 INFO SparkContext: Added JAR > > file:/Users/vr/Downloads/spark-2.0.0/test.jar at > > spark://192.168.1.3:59263/jars/test.jar with timestamp 1474784115210 > > > > 16/09/24 23:15:15 INFO Executor: Starting executor ID driver on host > > localhost > > > > 16/09/24 23:15:15 INFO Utils: Successfully started service > > 'org.apache.spark.network.netty.NettyBlockTransferService' on port > 59264. > > > > 16/09/24 23:15:15 INFO NettyBlockTransferService: Server created on > > 192.168.1.3:59264 > > > > 16/09/24 23:15:16 INFO TaskSetManager: Starting task 0.0 in stage 0.0 > (TID > > 0, localhost, partition 0, PROCESS_LOCAL, 5354 bytes) > > > > 16/09/24 23:15:16 INFO TaskSetManager: Starting task 1.0 in stage 0.0 > (TID > > 1, localhost, partition 1, PROCESS_LOCAL, 5354 bytes) > > > > 16/09/24 23:15:16 INFO Executor: Running task 0.0 in stage 0.0 (TID 0) > > > > 16/09/24 23:15:16 INFO Executor: Running task 1.0 in stage 0.0 (TID 1) > > > > 16/09/24 23:15:16 INFO Executor: Fetching > > spark://192.168.1.3:59263/jars/test.jar with timestamp 1474784115210 > > > > 16/09/24 23:16:31 INFO Executor: Fetching > > spark://192.168.1.3:59263/jars/test.jar with timestamp 1474784115210 > > > > 16/09/24 23:16:31 ERROR Executor: Exception in task 1.0 in stage 0.0 > (TID 1) > > > > java.io.IOException: Failed to connect to /192.168.1.3:59263 > > > > at > > org.apache.spark.network.client.TransportClientFactory.createClient( > TransportClientFactory.java:228) > > > > at > > org.apache.spark.network.client.TransportClientFactory.createClient( > TransportClientFactory.java:179) > > > > at > > org.apache.spark.rpc.netty.NettyRpcEnv.downloadClient( > NettyRpcEnv.scala:358) > > > > at org.apache.spark.rpc.netty.NettyRpcEnv.openChannel( > NettyRpcEnv.scala:324) > > > > at org.apache.spark.util.Utils$.doFetchFile(Utils.scala:633) > > > > at org.apache.spark.util.Utils$.fetchFile(Utils.scala:459) > > > > at > > org.apache.spark.executor.Executor$$anonfun$org$apache$ > spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:488) > > > > at > > org.apache.spark.executor.Executor$$anonfun$org$apache$ > spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:480) > > > > at > > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply( > TraversableLike.scala:733) > > > > at > > scala.collection.mutable.HashMap$$anonfun$foreach$1. > apply(HashMap.scala:99) > > > > at > > scala.collection.mutable.HashMap$$anonfun$foreach$1. > apply(HashMap.scala:99) > > > > at > > scala.collection.mutable.HashTable$class.foreachEntry( > HashTable.scala:230) > > > > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) > > > > at scala.collection.mutable.HashMap.foreach(HashMap.scala:99) > > > > at &g
spark-submit failing but job running from scala ide
Hi, I have this simple scala app which works fine when i run it as scala application from the scala IDE for eclipse. But when i export is as jar and run it from spark-submit i am getting below error. Please suggest *bin/spark-submit --class com.x.y.vr.spark.first.SimpleApp test.jar* 16/09/24 23:15:15 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041. 16/09/24 23:15:15 INFO Utils: Successfully started service 'SparkUI' on port 4041. 16/09/24 23:15:15 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://192.168.1.3:4041 16/09/24 23:15:15 INFO SparkContext: Added JAR file:/Users/vr/Downloads/spark-2.0.0/test.jar at spark:// 192.168.1.3:59263/jars/test.jar with timestamp 1474784115210 16/09/24 23:15:15 INFO Executor: Starting executor ID driver on host localhost 16/09/24 23:15:15 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 59264. 16/09/24 23:15:15 INFO NettyBlockTransferService: Server created on 192.168.1.3:59264 16/09/24 23:15:16 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, partition 0, PROCESS_LOCAL, 5354 bytes) 16/09/24 23:15:16 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, partition 1, PROCESS_LOCAL, 5354 bytes) 16/09/24 23:15:16 INFO Executor: Running task 0.0 in stage 0.0 (TID 0) 16/09/24 23:15:16 INFO Executor: Running task 1.0 in stage 0.0 (TID 1) 16/09/24 23:15:16 INFO Executor: Fetching spark:// 192.168.1.3:59263/jars/test.jar with timestamp 1474784115210 16/09/24 23:16:31 INFO Executor: Fetching spark:// 192.168.1.3:59263/jars/test.jar with timestamp 1474784115210 16/09/24 23:16:31 ERROR Executor: Exception in task 1.0 in stage 0.0 (TID 1) java.io.IOException: Failed to connect to /192.168.1.3:59263 at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:228) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:179) at org.apache.spark.rpc.netty.NettyRpcEnv.downloadClient(NettyRpcEnv.scala:358) at org.apache.spark.rpc.netty.NettyRpcEnv.openChannel(NettyRpcEnv.scala:324) at org.apache.spark.util.Utils$.doFetchFile(Utils.scala:633) at org.apache.spark.util.Utils$.fetchFile(Utils.scala:459) at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:488) at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:480) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) at scala.collection.mutable.HashMap.foreach(HashMap.scala:99) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732) at org.apache.spark.executor.Executor.org $apache$spark$executor$Executor$$updateDependencies(Executor.scala:480) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:252) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) *My Scala code* package com.x.y.vr.spark.first /* SimpleApp.scala */ import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf object SimpleApp { def main(args: Array[String]) { val logFile = "/Users/vttrich/Downloads/spark-2.0.0/README.md" // Should be some file on your system val conf = new SparkConf().setAppName("Simple Application") val sc = new SparkContext("local[*]", "RatingsCounter") //val sc = new SparkContext(conf) val logData = sc.textFile(logFile, 2).cache() val numAs = logData.filter(line => line.contains("a")).count() val numBs = logData.filter(line => line.contains("b")).count() println("Lines with a: %s, Lines with b: %s".format(numAs, numBs)) } }
Re: Undefined function json_array_to_map
Hi Ted/All, i did below to get fullstack and see below, not able to understand root cause.. except Exception as error: traceback.print_exc() and this what i get... File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/context.py", line 580, in sql return DataFrame(self._ssql_ctx.sql(sqlQuery), self) File "/usr/local/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in __call__ answer, self.gateway_client, self.target_id, self.name) File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 51, in deco raise AnalysisException(s.split(': ', 1)[1], stackTrace) AnalysisException: u'undefined function json_array_to_map; line 28 pos 73' On Wed, Aug 17, 2016 at 8:59 AM, vr spark <vrspark...@gmail.com> wrote: > spark 1.6.1 > python > > I0817 08:51:59.099356 15189 detector.cpp:481] A new leading master (UPID= > master@10.224.167.25:5050) is detected > I0817 08:51:59.099735 15188 sched.cpp:262] New master detected at > master@x.y.17.25:4550 > I0817 08:51:59.100888 15188 sched.cpp:272] No credentials provided. > Attempting to register without authentication > I0817 08:51:59.326017 15190 sched.cpp:641] Framework registered with > b859f266-9984-482d-8c0d-35bd88c1ad0a-6996 > 16/08/17 08:52:06 WARN ObjectStore: Version information not found in > metastore. hive.metastore.schema.verification is not enabled so recording > the schema version 1.2.0 > 16/08/17 08:52:06 WARN ObjectStore: Failed to get database default, > returning NoSuchObjectException > Traceback (most recent call last): > File "/data1/home/vttrich/spk/orig_qryhubb.py", line 17, in > res=sqlcont.sql("select parti_date FROM log_data WHERE parti_date >= > 408910 limit 10") > File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/context.py", > line 580, in sql > File "/usr/local/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", > line 813, in __call__ > File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", > line 51, in deco > pyspark.sql.utils.AnalysisException: u'undefined function > json_array_to_map; line 28 pos 73' > I0817 08:52:12.840224 15600 sched.cpp:1771] Asked to stop the driver > I0817 08:52:12.841198 15189 sched.cpp:1040] Stopping framework > 'b859f2f3-7484-482d-8c0d-35bd91c1ad0a-6326' > > > On Wed, Aug 17, 2016 at 8:50 AM, Ted Yu <yuzhih...@gmail.com> wrote: > >> Can you show the complete stack trace ? >> >> Which version of Spark are you using ? >> >> Thanks >> >> On Wed, Aug 17, 2016 at 8:46 AM, vr spark <vrspark...@gmail.com> wrote: >> >>> Hi, >>> I am getting error on below scenario. Please suggest. >>> >>> i have a virtual view in hive >>> >>> view name log_data >>> it has 2 columns >>> >>> query_map map<string,string> >>> >>> parti_date int >>> >>> >>> Here is my snippet for the spark data frame >>> >>> my dataframe >>> >>> res=sqlcont.sql("select parti_date FROM log_data WHERE parti_date >= >>> 408910 limit 10") >>> >>> df=res.collect() >>> >>> print 'after collect' >>> >>> print df >>> >>> >>> * File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", >>> line 51, in deco* >>> >>> *pyspark.sql.utils.AnalysisException: u'undefined function >>> json_array_to_map; line 28 pos 73'* >>> >>> >>> >>> >>> >> >
Re: Attempting to accept an unknown offer
My code is very simple, if i use other hive tables, my code works fine. This particular table (virtual view) is huge and might have more metadata. It has only two columns. virtual view name is : cluster_table # col_namedata_type ln string parti int here is snippet... from pyspark import SparkConf, SparkContext from pyspark.sql import HiveContext import pyspark.sql import json myconf=SparkConf().setAppName("sql") spcont=SparkContext(conf=myconf) sqlcont=HiveContext(spcont) res=sqlcont.sql("select parti FROM h.cluster_table WHERE parti > 408910 and parti <408911 limit 10") print res.printSchema() print 'res' print res df=res.collect() print 'after collect' print df Here is the ouput after i submit the job I0817 09:18:40.606465 31409 sched.cpp:262] New master detected at master@x.y.17.56:6750 I0817 09:18:40.607461 31409 sched.cpp:272] No credentials provided. Attempting to register without authentication I0817 09:18:40.612763 31409 sched.cpp:641] Framework registered with b859f2f3-7484-482d-8c0d-35bd91c1ad0a-6336 16/08/17 09:18:57 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0 16/08/17 09:18:57 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException root |-- parti: integer (nullable = true) None res DataFrame[partition_epoch_hourtenth: int] 2016-08-17 09:19:20,648:31315(0x7fafebfb1700):ZOO_WARN@zookeeper_interest@1557: Exceeded deadline by 19ms 2016-08-17 09:19:30,662:31315(0x7fafebfb1700):ZOO_WARN@zookeeper_interest@1557: Exceeded deadline by 13ms W0817 09:20:01.715824 31412 sched.cpp:1195] Attempting to accept an unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O168676564 W0817 09:20:01.716455 31412 sched.cpp:1195] Attempting to accept an unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O168676630 W0817 09:20:01.716645 31412 sched.cpp:1195] Attempting to accept an unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O168676713 W0817 09:20:01.724409 31412 sched.cpp:1195] Attempting to accept an unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O168676554 W0817 09:20:01.724728 31412 sched.cpp:1195] Attempting to accept an unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O168676555 W0817 09:20:01.724936 31412 sched.cpp:1195] Attempting to accept an unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O168676556 W0817 09:20:01.725126 31412 sched.cpp:1195] Attempting to accept an unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O168676557 W0817 09:20:01.725309 31412 sched.cpp:1195] Attempting to accept an unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O168676558. and many more lines like this on the screen with similar message On Wed, Aug 17, 2016 at 9:08 AM, Ted Yu <yuzhih...@gmail.com> wrote: > Please include user@ in your reply. > > Can you reveal the snippet of hive sql ? > > On Wed, Aug 17, 2016 at 9:04 AM, vr spark <vrspark...@gmail.com> wrote: > >> spark 1.6.1 >> mesos >> job is running for like 10-15 minutes and giving this message and i >> killed it. >> >> In this job, i am creating data frame from a hive sql. There are other >> similar jobs which work fine >> >> On Wed, Aug 17, 2016 at 8:52 AM, Ted Yu <yuzhih...@gmail.com> wrote: >> >>> Can you provide more information ? >>> >>> Were you running on YARN ? >>> Which version of Spark are you using ? >>> >>> Was your job failing ? >>> >>> Thanks >>> >>> On Wed, Aug 17, 2016 at 8:46 AM, vr spark <vrspark...@gmail.com> wrote: >>> >>>> >>>> W0816 23:17:01.984846 16360 sched.cpp:1195] Attempting to accept an >>>> unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O162910492 >>>> >>>> W0816 23:17:01.984987 16360 sched.cpp:1195] Attempting to accept an >>>> unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O162910493 >>>> >>>> W0816 23:17:01.985124 16360 sched.cpp:1195] Attempting to accept an >>>> unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O162910494 >>>> >>>> W0816 23:17:01.985339 16360 sched.cpp:1195] Attempting to accept an >>>> unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O162910495 >>>> >>>> W0816 23:17:01.985508 16360 sched.cpp:1195] Attempting to accept an >>>> unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O162910496 >>>> >>>> W0816 23:17:01.985651 16360 sched.cpp:1195] Attempting to accept an >>>> unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O162910497 >>>> >>>> W0816 23:17:01.985801 16360 sched.cpp:1195] Attempting to acce
Attempting to accept an unknown offer
W0816 23:17:01.984846 16360 sched.cpp:1195] Attempting to accept an unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O162910492 W0816 23:17:01.984987 16360 sched.cpp:1195] Attempting to accept an unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O162910493 W0816 23:17:01.985124 16360 sched.cpp:1195] Attempting to accept an unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O162910494 W0816 23:17:01.985339 16360 sched.cpp:1195] Attempting to accept an unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O162910495 W0816 23:17:01.985508 16360 sched.cpp:1195] Attempting to accept an unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O162910496 W0816 23:17:01.985651 16360 sched.cpp:1195] Attempting to accept an unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O162910497 W0816 23:17:01.985801 16360 sched.cpp:1195] Attempting to accept an unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O162910498 W0816 23:17:01.985961 16360 sched.cpp:1195] Attempting to accept an unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O162910499 W0816 23:17:01.986121 16360 sched.cpp:1195] Attempting to accept an unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O162910500 2016-08-16 23:18:41,877:16226(0x7f71271b6700):ZOO_WARN@ zookeeper_interest@1557: Exceeded deadline by 13ms 2016-08-16 23:21:12,007:16226(0x7f71271b6700):ZOO_WARN@ zookeeper_interest@1557: Exceeded deadline by 11ms
Undefined function json_array_to_map
Hi, I am getting error on below scenario. Please suggest. i have a virtual view in hive view name log_data it has 2 columns query_map mapparti_date int Here is my snippet for the spark data frame my dataframe res=sqlcont.sql("select parti_date FROM log_data WHERE parti_date >= 408910 limit 10") df=res.collect() print 'after collect' print df * File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 51, in deco* *pyspark.sql.utils.AnalysisException: u'undefined function json_array_to_map; line 28 pos 73'*
Re: dataframe row list question
Hi Experts, Please suggest On Thu, Aug 11, 2016 at 7:54 AM, vr spark <vrspark...@gmail.com> wrote: > > I have data which is json in this format > > myList: array > |||-- elem: struct > ||||-- nm: string (nullable = true) > ||||-- vList: array (nullable = true) > |||||-- element: string (containsNull = true) > > > from my kafka stream, i created a dataframe using sqlContext.jsonRDD > Then registred it as registerTempTable > selected mylist from this table and i see this output. It is a list of > rows > > [Row(nm=u'Apt', vList=[u'image']), Row(nm=u'Agent', vList=[u'Mozilla/5.0 > ']), Row(nm=u'Ip', vList=[u'xx.yy.106.25'])] > > My requirement is to get only rows with nm='IP' and its corresponding > value > I would need IP, xx.yy.106.25 > > > Please suggest >
dataframe row list question
I have data which is json in this format myList: array |||-- elem: struct ||||-- nm: string (nullable = true) ||||-- vList: array (nullable = true) |||||-- element: string (containsNull = true) from my kafka stream, i created a dataframe using sqlContext.jsonRDD Then registred it as registerTempTable selected mylist from this table and i see this output. It is a list of rows [Row(nm=u'Apt', vList=[u'image']), Row(nm=u'Agent', vList=[u'Mozilla/5.0 ']), Row(nm=u'Ip', vList=[u'xx.yy.106.25'])] My requirement is to get only rows with nm='IP' and its corresponding value I would need IP, xx.yy.106.25 Please suggest
Re: read only specific jsons
HI , I tried and getting exception still..any other suggestion? clickDF = cDF.filter(cDF['request.clientIP'].isNotNull()) It fails for some cases and errors our with below message AnalysisException: u'No such struct field clientIP in cookies, nscClientIP1, nscClientIP2, uAgent;' On Tue, Jul 26, 2016 at 12:05 PM, Cody Koeninger <c...@koeninger.org> wrote: > Have you tried filtering out corrupt records with something along the > lines of > > df.filter(df("_corrupt_record").isNull) > > On Tue, Jul 26, 2016 at 1:53 PM, vr spark <vrspark...@gmail.com> wrote: > > i am reading data from kafka using spark streaming. > > > > I am reading json and creating dataframe. > > I am using pyspark > > > > kvs = KafkaUtils.createDirectStream(ssc, kafkaTopic1, kafkaParams) > > > > lines = kvs.map(lambda x: x[1]) > > > > lines.foreachRDD(mReport) > > > > def mReport(clickRDD): > > > >clickDF = sqlContext.jsonRDD(clickRDD) > > > >clickDF.registerTempTable("clickstream") > > > >PagesDF = sqlContext.sql( > > > > "SELECT request.clientIP as ip " > > > > "FROM clickstream " > > > > "WHERE request.clientIP is not null " > > > > " limit 2000 " > > > > > > The problem is that not all the jsons from the stream have the same > format. > > > > It works when it reads a json which has ip. > > > > Some of the json strings do not have client ip in their schema. > > > > So i am getting error and my job is failing when it encounters such a > json. > > > > How do read only those json which has ip in their schema? > > > > Please suggest. >
read only specific jsons
i am reading data from kafka using spark streaming. I am reading json and creating dataframe. I am using pyspark kvs = KafkaUtils.createDirectStream(ssc, kafkaTopic1, kafkaParams) lines = kvs.map(lambda x: x[1]) lines.foreachRDD(mReport) def mReport(clickRDD): clickDF = sqlContext.jsonRDD(clickRDD) clickDF.registerTempTable("clickstream") PagesDF = sqlContext.sql( "SELECT request.clientIP as ip " "FROM clickstream " "WHERE request.clientIP is not null " " limit 2000 " The problem is that not all the jsons from the stream have the same format. It works when it reads a json which has ip. Some of the json strings do not have client ip in their schema. So i am getting error and my job is failing when it encounters such a json. How do read only those json which has ip in their schema? Please suggest.
read only specific jsons
i am reading data from kafka using spark streaming. I am reading json and creating dataframe. kvs = KafkaUtils.createDirectStream(ssc, kafkaTopic1, kafkaParams) lines = kvs.map(lambda x: x[1]) lines.foreachRDD(mReport) def mReport(clickRDD): clickDF = sqlContext.jsonRDD(clickRDD) clickDF.registerTempTable("clickstream") PagesDF = sqlContext.sql( "SELECT request.clientIP as ip " "FROM clickstream " "WHERE request.clientIP is not null " " limit 2000 " The problem is that not all the jsons from the stream have the same format. It works when it reads a json which has ip. Some of the json strings do not have client ip in their schema. So i am getting error and my job is failing when it encounters such a json. How do read only those json which has ip in their schema? Please suggest.